src/modules/nats/nats_mod.c
eef6fb7a
 /*
  * NATS module interface
  *
  * Copyright (C) 2016 Weave Communications
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  *
  */
 
 #include "nats_mod.h"
 
 MODULE_VERSION
 
 init_nats_sub_ptr _init_nats_sc = NULL;
 init_nats_server_ptr _init_nats_srv = NULL;
 nats_consumer_worker_t *nats_workers = NULL;
 int _nats_proc_count;
 char *eventData = NULL;
 
 static pv_export_t nats_mod_pvs[] = {
 		{{"natsData", (sizeof("natsData") - 1)}, PVT_OTHER,
 				nats_pv_get_event_payload, 0, 0, 0, 0, 0},
 		{{0, 0}, 0, 0, 0, 0, 0, 0, 0}};
 
 static param_export_t params[] = {{"nats_url", PARAM_STRING | USE_FUNC_PARAM,
 										  (void *)_init_nats_server_url_add},
 		{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
 				(void *)_init_nats_sub_add}};
 
 struct module_exports exports = {
 		"nats", DEFAULT_DLFLAGS, /* dlopen flags */
 		0,						 /* Exported functions */
 		params,					 /* Exported parameters */
 		0,						 /* exported MI functions */
 		nats_mod_pvs,			 /* exported pseudo-variables */
 		0,						 /* response function*/
 		mod_init,				 /* module initialization function */
 		mod_child_init,			 /* per-child init function */
 		mod_destroy				 /* destroy function */
 };
 
 static void onMsg(
 		natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
 {
 	nats_on_message_ptr on_message = (nats_on_message_ptr)closure;
 	char *s = (char *)natsMsg_GetSubject(msg);
 	char *data = (char *)natsMsg_GetData(msg);
 	if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) {
 		LM_INFO("event-route [nats:%s] does not exist\n", s);
 		goto end;
 	}
 	eventData = data;
 	nats_run_cfg_route(on_message->rt);
 
 end:
 	eventData = NULL;
 	natsMsg_Destroy(msg);
 }
 
 static void connectedCB(natsConnection *nc, void *closure)
 {
 	char url[NATS_URL_MAX_SIZE];
 	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
 	nats_run_cfg_route(_nats_rts.connected);
 }
 
 static void disconnectedCb(natsConnection *nc, void *closure)
 {
 	char url[NATS_URL_MAX_SIZE];
 	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
 	nats_run_cfg_route(_nats_rts.disconnected);
 }
 
 static void reconnectedCb(natsConnection *nc, void *closure)
 {
 	char url[NATS_URL_MAX_SIZE];
 	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
 	nats_run_cfg_route(_nats_rts.connected);
 }
 
 static void closedCB(natsConnection *nc, void *closure)
 {
 	bool *closed = (bool *)closure;
 	const char *err = NULL;
 	natsConnection_GetLastError(nc, &err);
 	LM_INFO("connect failed: %s\n", err);
 	*closed = true;
 }
 
 void nats_consumer_worker_proc(
 		nats_consumer_worker_t *worker, const char *servers[])
 {
 	natsStatus s;
 	bool closed = false;
 
 	LM_INFO("nats worker connecting to subject [%s] queue group [%s]\n",
 			worker->subject, worker->queue_group);
 
 	s = natsOptions_Create(&worker->opts);
 	if(s != NATS_OK) {
 		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
 		return;
 	}
 	// use these defaults
 	natsOptions_SetAllowReconnect(worker->opts, true);
 	natsOptions_SetSecure(worker->opts, false);
 	natsOptions_SetMaxReconnect(worker->opts, 10000);
 	natsOptions_SetReconnectWait(worker->opts, 2 * 1000);	  // 2s
 	natsOptions_SetPingInterval(worker->opts, 2 * 60 * 1000); // 2m
 	natsOptions_SetMaxPingsOut(worker->opts, 2);
 	natsOptions_SetIOBufSize(worker->opts, 32 * 1024); // 32 KB
 	natsOptions_SetMaxPendingMsgs(worker->opts, 65536);
 	natsOptions_SetTimeout(worker->opts, 2 * 1000);					// 2s
 	natsOptions_SetReconnectBufSize(worker->opts, 8 * 1024 * 1024); // 8 MB;
 	natsOptions_SetReconnectJitter(worker->opts, 100, 1000); // 100ms, 1s;
 	s = natsOptions_SetServers(worker->opts, servers, 1);
 	if(s != NATS_OK) {
 		LM_ERR("could not set nats server [%s]\n", natsStatus_GetText(s));
 	}
 	s = natsOptions_SetDisconnectedCB(worker->opts, disconnectedCb, NULL);
 	if(s != NATS_OK) {
 		LM_ERR("could not set disconnect callback [%s]\n",
 				natsStatus_GetText(s));
 	}
 	s = natsOptions_SetReconnectedCB(worker->opts, reconnectedCb, NULL);
 	if(s != NATS_OK) {
 		LM_ERR("could not set reconnect callback [%s]\n",
 				natsStatus_GetText(s));
 	}
 	s = natsOptions_SetRetryOnFailedConnect(
 			worker->opts, true, connectedCB, NULL);
 	if(s != NATS_OK) {
 		LM_ERR("could not set retry on failed callback [%s]\n",
 				natsStatus_GetText(s));
 	}
 	s = natsOptions_SetClosedCB(worker->opts, closedCB, (void *)&closed);
 	if(s != NATS_OK) {
 		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
 	}
 
 	s = natsConnection_Connect(&worker->conn, worker->opts);
 	if(s != NATS_OK) {
 		LM_ERR("could not connect [%s]\n", natsStatus_GetText(s));
 	}
 	// create a loop
 	natsLibuv_Init();
 	worker->uvLoop = uv_default_loop();
 	if(worker->uvLoop != NULL) {
 		natsLibuv_SetThreadLocalLoop(worker->uvLoop);
 	} else {
 		s = NATS_ERR;
 	}
 
 	s = natsOptions_SetEventLoop(worker->opts, (void *)worker->uvLoop,
 			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
 			natsLibuv_Detach);
 	if(s != NATS_OK) {
 		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
 	}
 
 	if(s) {
 		LM_ERR("error setting options [%s]\n", natsStatus_GetText(s));
 	}
 
 	s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn,
 			worker->subject, worker->queue_group, onMsg, worker->on_message);
 	if(s != NATS_OK) {
 		LM_ERR("could not subscribe [%s]\n", natsStatus_GetText(s));
 	}
 
 	s = natsSubscription_SetPendingLimits(worker->subscription, -1, -1);
 	if(s != NATS_OK) {
 		LM_ERR("could not set pending limits [%s]\n", natsStatus_GetText(s));
 	}
 
 	// Run the event loop.
 	// This call will return when the connection is closed (either after
 	// receiving all messages, or disconnected and unable to reconnect).
 	if(s == NATS_OK) {
 		uv_run(worker->uvLoop, UV_RUN_DEFAULT);
 	}
 	if(s != NATS_OK) {
 		LM_ERR("nats error [%s]\n", natsStatus_GetText(s));
 	}
 }
 
 static int mod_init(void)
 {
 	if(faked_msg_init() < 0) {
 		LM_ERR("failed to init faked sip message\n");
 		return -1;
 	}
 	nats_init_environment();
 	register_procs(_nats_proc_count);
 	nats_workers =
 			shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t));
 	if(nats_workers == NULL) {
 		LM_ERR("error in shm_malloc\n");
 		return -1;
 	}
 	memset(nats_workers, 0, _nats_proc_count * sizeof(nats_consumer_worker_t));
 	return 0;
 }
 
 int init_worker(
 		nats_consumer_worker_t *worker, char *subject, char *queue_group)
 {
 	int buffsize = strlen(subject) + 6;
 	char routename[buffsize];
 	int rt;
 	int len;
 	char *sc;
 	int num_servers = 0;
 	init_nats_server_ptr s0;
 
 	memset(worker, 0, sizeof(*worker));
 	worker->subject = shm_malloc(strlen(subject) + 1);
 	strcpy(worker->subject, subject);
 	worker->subject[strlen(subject)] = '\0';
 	worker->queue_group = shm_malloc(strlen(queue_group) + 1);
 	strcpy(worker->queue_group, queue_group);
 	worker->queue_group[strlen(queue_group)] = '\0';
 	memset(worker->init_nats_servers, 0, sizeof(worker->init_nats_servers));
 	worker->on_message =
 			(nats_on_message_ptr)shm_malloc(sizeof(nats_on_message));
 	memset(worker->on_message, 0, sizeof(nats_on_message));
 
 	s0 = _init_nats_srv;
 	while(s0) {
 		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
 			len = strlen(s0->url);
 			sc = shm_malloc(len + 1);
 			strcpy(sc, s0->url);
 			sc[len] = '\0';
 			worker->init_nats_servers[num_servers++] = sc;
 		}
 		s0 = s0->next;
 	}
 	if(num_servers == 0) {
 		worker->init_nats_servers[0] = NATS_DEFAULT_URL;
 		LM_INFO("using default server [%s]\n", NATS_DEFAULT_URL);
 	}
 
 	snprintf(routename, buffsize, "nats:%s", subject);
 	routename[buffsize] = '\0';
 
 	rt = route_get(&event_rt, routename);
 	if(rt < 0 || event_rt.rlist[rt] == NULL) {
 		LM_INFO("route [%s] does not exist\n", routename);
 		worker->on_message->rt = -1;
 		return 0;
 	}
 	worker->on_message->rt = rt;
 	return 0;
 }
 
 void worker_loop(int id)
 {
 	nats_consumer_worker_t *worker = &nats_workers[id];
 	nats_consumer_worker_proc(worker, (const char **)worker->init_nats_servers);
 	for(;;) {
 		sleep(1000);
 	}
 }
 
 /**
  * @brief Initialize async module children
  */
 static int mod_child_init(int rank)
 {
 	init_nats_sub_ptr n;
 	int i = 0;
 	int newpid;
 
 	if(rank == PROC_INIT) {
 		n = _init_nats_sc;
 		while(n) {
 			if(init_worker(&nats_workers[i], n->sub, n->queue_group) < 0) {
 				LM_ERR("failed to init struct for worker[%d]\n", i);
 				return -1;
 			}
 			n = n->next;
 			i++;
 		}
 		if(nats_cleanup_init_sub() < 0) {
 			LM_INFO("could not cleanup init data\n");
 		}
 		if(nats_cleanup_init_servers() < 0) {
 			LM_INFO("could not cleanup init server data\n");
 		}
 		return 0;
 	}
 
 	if(rank == PROC_MAIN) {
 		for(i = 0; i < _nats_proc_count; i++) {
 			newpid = fork_process(PROC_RPC, "NATS WORKER", 1);
 			if(newpid < 0) {
 				LM_ERR("failed to fork worker process %d\n", i);
 				return -1;
 			} else if(newpid == 0) {
 				worker_loop(i);
 			} else {
 				nats_workers[i].pid = newpid;
 			}
 		}
 		return 0;
 	}
 
 	return 0;
 }
 
 int nats_cleanup_init_sub()
 {
 	init_nats_sub_ptr n0;
 	init_nats_sub_ptr n1;
 	n0 = _init_nats_sc;
 	while(n0) {
 		n1 = n0->next;
 		if(n0->sub != NULL) {
 			shm_free(n0->sub);
 		}
 		if(n0->queue_group != NULL) {
 			shm_free(n0->queue_group);
 		}
 		shm_free(n0);
 		n0 = n1;
 	}
 	_init_nats_sc = NULL;
 	return 0;
 }
 
 int nats_cleanup_init_servers()
 {
 	init_nats_server_ptr s0;
 	init_nats_server_ptr s1;
 	s0 = _init_nats_srv;
 	while(s0) {
 		s1 = s0->next;
 		if(s0->url != NULL) {
 			shm_free(s0->url);
 		}
 		shm_free(s0);
 		s0 = s1;
 	}
 	_init_nats_srv = NULL;
 	return 0;
 }
 
 int nats_destroy_workers()
 {
 	int i;
 	int s;
 	nats_consumer_worker_t *worker;
 	for(i = 0; i < _nats_proc_count; i++) {
 		worker = &nats_workers[i];
 		natsSubscription_Unsubscribe(worker->subscription);
 		natsSubscription_Destroy(worker->subscription);
 		natsConnection_Close(worker->conn);
 		natsConnection_Destroy(worker->conn);
 		natsOptions_Destroy(worker->opts);
 		if(worker->uvLoop != NULL) {
 			uv_loop_close(worker->uvLoop);
 		}
 		nats_Close();
 		if(worker->subject != NULL) {
 			shm_free(worker->subject);
 		}
 		if(worker->queue_group != NULL) {
 			shm_free(worker->queue_group);
 		}
 		if(worker->on_message != NULL) {
 			shm_free(worker->on_message);
 		}
 		for(s = 0; s < NATS_MAX_SERVERS; s++) {
 			if(worker->init_nats_servers[s]) {
 				shm_free(worker->init_nats_servers[s]);
 			}
 		}
 		shm_free(worker);
 	}
 	return 0;
 }
 
 /**
  * destroy module function
  */
 static void mod_destroy(void)
 {
 	if(nats_destroy_workers() < 0) {
 		LM_ERR("could not cleanup workers\n");
 	}
 }
 
 int _init_nats_server_url_add(modparam_t type, void *val)
 {
 	char *url = (char *)val;
 	int len = strlen(url);
 	char *value;
 	if(len > NATS_URL_MAX_SIZE) {
 		LM_ERR("connection url exceeds max size %d\n", NATS_URL_MAX_SIZE);
 		return -1;
 	}
 	if(strncmp(url, "nats://", 7)) {
 		LM_ERR("invalid nats url [%s]\n", url);
 		return -1;
 	}
 	value = pkg_malloc(len + 1);
 	strcpy(value, url);
 	value[len] = '\0';
 	if(init_nats_server_url_add(url) < 0) {
 		LM_ERR("could not add server\n");
 	}
 	pkg_free(value);
 	return 0;
 }
 
 int _init_nats_sub_add(modparam_t type, void *val)
 {
 	char *sub = (char *)val;
 	int len = strlen(sub);
 	char *s = pkg_malloc(len + 1);
 	strcpy(s, sub);
 	s[len] = '\0';
 	if(init_nats_sub_add(s) < 0) {
 		LM_ERR("could not add init data\n");
 	}
 	pkg_free(s);
 	return 0;
 }
 
 /**
  * Invoke a event route block
  */
 int nats_run_cfg_route(int rt)
 {
 	struct run_act_ctx ctx;
 	sip_msg_t *fmsg;
 	sip_msg_t tmsg;
 
 	// check for valid route pointer
 	if(rt < 0) {
 		return 0;
 	}
 
 	fmsg = faked_msg_next();
 	memcpy(&tmsg, fmsg, sizeof(sip_msg_t));
 	fmsg = &tmsg;
 	set_route_type(EVENT_ROUTE);
 	init_run_actions_ctx(&ctx);
 	run_top_route(event_rt.rlist[rt], fmsg, 0);
 	return 0;
 }
 
 void nats_init_environment()
 {
 	memset(&_nats_rts, 0, sizeof(nats_evroutes_t));
 	_nats_rts.connected = route_lookup(&event_rt, "nats:connected");
 	if(_nats_rts.connected < 0 || event_rt.rlist[_nats_rts.connected] == NULL)
 		_nats_rts.connected = -1;
 
 	_nats_rts.disconnected = route_lookup(&event_rt, "nats:disconnected");
 	if(_nats_rts.disconnected < 0
 			|| event_rt.rlist[_nats_rts.disconnected] == NULL)
 		_nats_rts.disconnected = -1;
 }
 
 init_nats_server_ptr _init_nats_server_list_new(char *url)
 {
 	init_nats_server_ptr p =
 			(init_nats_server_ptr)shm_malloc(sizeof(init_nats_server));
 	memset(p, 0, sizeof(init_nats_server));
 	p->url = shm_malloc(strlen(url) + 1);
 	strcpy(p->url, url);
 	p->url[strlen(url)] = '\0';
 	return p;
 }
 
 int init_nats_server_url_add(char *url)
 {
 	init_nats_server_ptr n;
 	n = _init_nats_srv;
 	while(n != NULL) {
 		n = n->next;
 	}
 	n = _init_nats_server_list_new(url);
 	n->next = _init_nats_srv;
 	_init_nats_srv = n;
 	return 0;
 }
 
 init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group)
 {
 	init_nats_sub_ptr p = (init_nats_sub_ptr)shm_malloc(sizeof(init_nats_sub));
 	memset(p, 0, sizeof(init_nats_sub));
 	p->sub = shm_malloc(strlen(sub) + 1);
 	strcpy(p->sub, sub);
 	p->sub[strlen(sub)] = '\0';
 	p->queue_group = shm_malloc(strlen(queue_group) + 1);
 	strcpy(p->queue_group, queue_group);
 	p->queue_group[strlen(queue_group)] = '\0';
 	return p;
 }
 
 int init_nats_sub_add(char *sc)
 {
 	int len;
 	char *s;
 	char *c;
 	init_nats_sub_ptr n;
 
 	if(sc == NULL) {
 		return -1;
 	}
 
 	len = strlen(sc);
 	s = pkg_malloc(len + 1);
 	strcpy(s, sc);
 	s[len] = '\0';
 
 	if((c = strchr(s, ':')) != 0) {
 		*c = 0;
 		for(c = c + 1; !*c; c++)
 			;
 	}
 	if(s == NULL) {
 		goto error;
 		return -1;
 	}
 	if(c == NULL) {
 		goto error;
 		return -1;
 	}
 
 	n = _init_nats_sc;
 	while(n != NULL) {
 		n = n->next;
 	}
 	n = _init_nats_sub_new(s, c);
 	n->next = _init_nats_sc;
 	_init_nats_sc = n;
 	_nats_proc_count++;
 
 
 error:
 	pkg_free(s);
 	return 0;
 }
 
 int nats_pv_get_event_payload(
 		struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
 {
 	return eventData == NULL ? pv_get_null(msg, param, res)
 							 : pv_get_strzval(msg, param, res, eventData);
 }