Browse code

nats: move nats connection handling into a struct in order to extend features

Emmanuel Schmidbauer authored on 19/11/2021 12:24:29
Showing 1 changed files
... ...
@@ -29,6 +29,7 @@ MODULE_VERSION
29 29
 init_nats_sub_ptr _init_nats_sc = NULL;
30 30
 init_nats_server_ptr _init_nats_srv = NULL;
31 31
 nats_consumer_worker_t *nats_workers = NULL;
32
+nats_connection_ptr _nats_connection = NULL;
32 33
 int _nats_proc_count;
33 34
 char *eventData = NULL;
34 35
 
... ...
@@ -103,60 +104,10 @@ static void closedCB(natsConnection *nc, void *closure)
103 104
 }
104 105
 
105 106
 void nats_consumer_worker_proc(
106
-		nats_consumer_worker_t *worker, const char *servers[])
107
+		nats_consumer_worker_t *worker, nats_connection_ptr c)
107 108
 {
108
-	natsStatus s;
109
-	bool closed = false;
110
-
111
-	LM_INFO("nats worker connecting to subject [%s] queue group [%s]\n",
112
-			worker->subject, worker->queue_group);
109
+	natsStatus s = NATS_OK;
113 110
 
114
-	s = natsOptions_Create(&worker->opts);
115
-	if(s != NATS_OK) {
116
-		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
117
-		return;
118
-	}
119
-	// use these defaults
120
-	natsOptions_SetAllowReconnect(worker->opts, true);
121
-	natsOptions_SetSecure(worker->opts, false);
122
-	natsOptions_SetMaxReconnect(worker->opts, 10000);
123
-	natsOptions_SetReconnectWait(worker->opts, 2 * 1000);	  // 2s
124
-	natsOptions_SetPingInterval(worker->opts, 2 * 60 * 1000); // 2m
125
-	natsOptions_SetMaxPingsOut(worker->opts, 2);
126
-	natsOptions_SetIOBufSize(worker->opts, 32 * 1024); // 32 KB
127
-	natsOptions_SetMaxPendingMsgs(worker->opts, 65536);
128
-	natsOptions_SetTimeout(worker->opts, 2 * 1000);					// 2s
129
-	natsOptions_SetReconnectBufSize(worker->opts, 8 * 1024 * 1024); // 8 MB;
130
-	natsOptions_SetReconnectJitter(worker->opts, 100, 1000); // 100ms, 1s;
131
-	s = natsOptions_SetServers(worker->opts, servers, 1);
132
-	if(s != NATS_OK) {
133
-		LM_ERR("could not set nats server [%s]\n", natsStatus_GetText(s));
134
-	}
135
-	s = natsOptions_SetDisconnectedCB(worker->opts, disconnectedCb, NULL);
136
-	if(s != NATS_OK) {
137
-		LM_ERR("could not set disconnect callback [%s]\n",
138
-				natsStatus_GetText(s));
139
-	}
140
-	s = natsOptions_SetReconnectedCB(worker->opts, reconnectedCb, NULL);
141
-	if(s != NATS_OK) {
142
-		LM_ERR("could not set reconnect callback [%s]\n",
143
-				natsStatus_GetText(s));
144
-	}
145
-	s = natsOptions_SetRetryOnFailedConnect(
146
-			worker->opts, true, connectedCB, NULL);
147
-	if(s != NATS_OK) {
148
-		LM_ERR("could not set retry on failed callback [%s]\n",
149
-				natsStatus_GetText(s));
150
-	}
151
-	s = natsOptions_SetClosedCB(worker->opts, closedCB, (void *)&closed);
152
-	if(s != NATS_OK) {
153
-		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
154
-	}
155
-
156
-	s = natsConnection_Connect(&worker->conn, worker->opts);
157
-	if(s != NATS_OK) {
158
-		LM_ERR("could not connect [%s]\n", natsStatus_GetText(s));
159
-	}
160 111
 	// create a loop
161 112
 	natsLibuv_Init();
162 113
 	worker->uvLoop = uv_default_loop();
... ...
@@ -165,8 +116,15 @@ void nats_consumer_worker_proc(
165 116
 	} else {
166 117
 		s = NATS_ERR;
167 118
 	}
119
+	if(s != NATS_OK) {
120
+		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
121
+	}
122
+	if((s = natsConnection_Connect(&worker->conn, c->opts)) != NATS_OK) {
123
+		LM_ERR("could not connect to nats servers [%s]\n",
124
+				natsStatus_GetText(s));
125
+	}
168 126
 
169
-	s = natsOptions_SetEventLoop(worker->opts, (void *)worker->uvLoop,
127
+	s = natsOptions_SetEventLoop(c->opts, (void *)worker->uvLoop,
170 128
 			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
171 129
 			natsLibuv_Detach);
172 130
 	if(s != NATS_OK) {
... ...
@@ -206,6 +164,11 @@ static int mod_init(void)
206 164
 		return -1;
207 165
 	}
208 166
 	nats_init_environment();
167
+	_nats_connection = _init_nats_connection();
168
+	if(nats_init_connection(_nats_connection) < 0) {
169
+		LM_ERR("failed to init nat connections\n");
170
+		return -1;
171
+	}
209 172
 	register_procs(_nats_proc_count);
210 173
 	nats_workers =
211 174
 			shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t));
... ...
@@ -223,10 +186,6 @@ int init_worker(
223 186
 	int buffsize = strlen(subject) + 6;
224 187
 	char routename[buffsize];
225 188
 	int rt;
226
-	int len;
227
-	char *sc;
228
-	int num_servers = 0;
229
-	init_nats_server_ptr s0;
230 189
 
231 190
 	memset(worker, 0, sizeof(*worker));
232 191
 	worker->subject = shm_malloc(strlen(subject) + 1);
... ...
@@ -235,31 +194,10 @@ int init_worker(
235 194
 	worker->queue_group = shm_malloc(strlen(queue_group) + 1);
236 195
 	strcpy(worker->queue_group, queue_group);
237 196
 	worker->queue_group[strlen(queue_group)] = '\0';
238
-	memset(worker->init_nats_servers, 0, sizeof(worker->init_nats_servers));
239 197
 	worker->on_message =
240 198
 			(nats_on_message_ptr)shm_malloc(sizeof(nats_on_message));
241 199
 	memset(worker->on_message, 0, sizeof(nats_on_message));
242 200
 
243
-	s0 = _init_nats_srv;
244
-	while(s0) {
245
-		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
246
-			len = strlen(s0->url);
247
-			sc = shm_malloc(len + 1);
248
-			if(!sc) {
249
-				LM_ERR("no shm memory left\n");
250
-				return -1;
251
-			}
252
-			strcpy(sc, s0->url);
253
-			sc[len] = '\0';
254
-			worker->init_nats_servers[num_servers++] = sc;
255
-		}
256
-		s0 = s0->next;
257
-	}
258
-	if(num_servers == 0) {
259
-		worker->init_nats_servers[0] = NATS_DEFAULT_URL;
260
-		LM_INFO("using default server [%s]\n", NATS_DEFAULT_URL);
261
-	}
262
-
263 201
 	snprintf(routename, buffsize, "nats:%s", subject);
264 202
 	routename[buffsize] = '\0';
265 203
 
... ...
@@ -273,10 +211,10 @@ int init_worker(
273 211
 	return 0;
274 212
 }
275 213
 
276
-void worker_loop(int id)
214
+void worker_loop(int id, nats_connection_ptr c)
277 215
 {
278 216
 	nats_consumer_worker_t *worker = &nats_workers[id];
279
-	nats_consumer_worker_proc(worker, (const char **)worker->init_nats_servers);
217
+	nats_consumer_worker_proc(worker, c);
280 218
 	for(;;) {
281 219
 		sleep(1000);
282 220
 	}
... ...
@@ -301,12 +239,6 @@ static int mod_child_init(int rank)
301 239
 			n = n->next;
302 240
 			i++;
303 241
 		}
304
-		if(nats_cleanup_init_sub() < 0) {
305
-			LM_INFO("could not cleanup init data\n");
306
-		}
307
-		if(nats_cleanup_init_servers() < 0) {
308
-			LM_INFO("could not cleanup init server data\n");
309
-		}
310 242
 		return 0;
311 243
 	}
312 244
 
... ...
@@ -317,7 +249,7 @@ static int mod_child_init(int rank)
317 249
 				LM_ERR("failed to fork worker process %d\n", i);
318 250
 				return -1;
319 251
 			} else if(newpid == 0) {
320
-				worker_loop(i);
252
+				worker_loop(i, _nats_connection);
321 253
 			} else {
322 254
 				nats_workers[i].pid = newpid;
323 255
 			}
... ...
@@ -348,6 +280,97 @@ int nats_cleanup_init_sub()
348 280
 	return 0;
349 281
 }
350 282
 
283
+int nats_init_connection(nats_connection_ptr c)
284
+{
285
+	natsStatus s = NATS_OK;
286
+	bool closed = false;
287
+	int len;
288
+	char *sc;
289
+	int num_servers = 0;
290
+	init_nats_server_ptr s0;
291
+
292
+	s0 = _init_nats_srv;
293
+	while(s0) {
294
+		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
295
+			len = strlen(s0->url);
296
+			sc = shm_malloc(len + 1);
297
+			if(!sc) {
298
+				LM_ERR("no shm memory left\n");
299
+				return -1;
300
+			}
301
+			strcpy(sc, s0->url);
302
+			sc[len] = '\0';
303
+			c->servers[num_servers++] = sc;
304
+			LM_INFO("adding server [%s] [%d]\n", sc, num_servers);
305
+		}
306
+		s0 = s0->next;
307
+	}
308
+	if(num_servers == 0) {
309
+		len = strlen(NATS_DEFAULT_URL);
310
+		sc = shm_malloc(len + 1);
311
+		if(!sc) {
312
+			LM_ERR("no shm memory left\n");
313
+			return -1;
314
+		}
315
+		strcpy(sc, NATS_DEFAULT_URL);
316
+		sc[len] = '\0';
317
+		c->servers[0] = sc;
318
+		LM_INFO("using default server [%s]\n", sc);
319
+	}
320
+
321
+	// nats create options
322
+	if((s = natsOptions_Create(&c->opts)) != NATS_OK) {
323
+		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
324
+		return -1;
325
+	}
326
+
327
+	// use these defaults
328
+	natsOptions_SetAllowReconnect(c->opts, true);
329
+	natsOptions_SetSecure(c->opts, false);
330
+	natsOptions_SetMaxReconnect(c->opts, 10000);
331
+	natsOptions_SetReconnectWait(c->opts, 2 * 1000);	 // 2s
332
+	natsOptions_SetPingInterval(c->opts, 2 * 60 * 1000); // 2m
333
+	natsOptions_SetMaxPingsOut(c->opts, 2);
334
+	natsOptions_SetIOBufSize(c->opts, 32 * 1024); // 32 KB
335
+	natsOptions_SetMaxPendingMsgs(c->opts, 65536);
336
+	natsOptions_SetTimeout(c->opts, 2 * 1000);				   // 2s
337
+	natsOptions_SetReconnectBufSize(c->opts, 8 * 1024 * 1024); // 8 MB;
338
+	natsOptions_SetReconnectJitter(c->opts, 100, 1000);		   // 100ms, 1s;
339
+
340
+	// nats set servers and options
341
+	if((s = natsOptions_SetServers(
342
+				c->opts, (const char **)c->servers, num_servers))
343
+			!= NATS_OK) {
344
+		LM_ERR("could not set nats server[%s]\n", natsStatus_GetText(s));
345
+		return -1;
346
+	}
347
+
348
+	// nats set callbacks
349
+	s = natsOptions_SetDisconnectedCB(c->opts, disconnectedCb, NULL);
350
+	if(s != NATS_OK) {
351
+		LM_ERR("could not set disconnect callback [%s]\n",
352
+				natsStatus_GetText(s));
353
+	}
354
+
355
+	s = natsOptions_SetReconnectedCB(c->opts, reconnectedCb, NULL);
356
+	if(s != NATS_OK) {
357
+		LM_ERR("could not set reconnect callback [%s]\n",
358
+				natsStatus_GetText(s));
359
+	}
360
+
361
+	s = natsOptions_SetRetryOnFailedConnect(c->opts, true, connectedCB, NULL);
362
+	if(s != NATS_OK) {
363
+		LM_ERR("could not set retry on failed callback [%s]\n",
364
+				natsStatus_GetText(s));
365
+	}
366
+
367
+	s = natsOptions_SetClosedCB(c->opts, closedCB, (void *)&closed);
368
+	if(s != NATS_OK) {
369
+		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
370
+	}
371
+	return 0;
372
+}
373
+
351 374
 int nats_cleanup_init_servers()
352 375
 {
353 376
 	init_nats_server_ptr s0;
... ...
@@ -358,17 +381,35 @@ int nats_cleanup_init_servers()
358 381
 		if(s0->url != NULL) {
359 382
 			shm_free(s0->url);
360 383
 		}
384
+
361 385
 		shm_free(s0);
362 386
 		s0 = s1;
363 387
 	}
388
+
389
+	// To silence reports of memory still in used with valgrind
390
+	nats_Close();
391
+
364 392
 	_init_nats_srv = NULL;
365 393
 	return 0;
366 394
 }
367 395
 
396
+int nats_cleanup_connection(nats_connection_ptr c)
397
+{
398
+	if(c->opts != NULL) {
399
+		natsOptions_Destroy(c->opts);
400
+	}
401
+	for(int s = 0; s < NATS_MAX_SERVERS; s++) {
402
+		if(c->servers[s]) {
403
+			shm_free(c->servers[s]);
404
+		}
405
+	}
406
+	shm_free(c);
407
+	return 0;
408
+}
409
+
368 410
 int nats_destroy_workers()
369 411
 {
370 412
 	int i;
371
-	int s;
372 413
 	nats_consumer_worker_t *worker;
373 414
 	for(i = 0; i < _nats_proc_count; i++) {
374 415
 		worker = &nats_workers[i];
... ...
@@ -381,13 +422,9 @@ int nats_destroy_workers()
381 422
 				natsConnection_Close(worker->conn);
382 423
 				natsConnection_Destroy(worker->conn);
383 424
 			}
384
-			if(worker->opts != NULL) {
385
-				natsOptions_Destroy(worker->opts);
386
-			}
387 425
 			if(worker->uvLoop != NULL) {
388 426
 				uv_loop_close(worker->uvLoop);
389 427
 			}
390
-			nats_Close();
391 428
 			if(worker->subject != NULL) {
392 429
 				shm_free(worker->subject);
393 430
 			}
... ...
@@ -397,11 +434,6 @@ int nats_destroy_workers()
397 434
 			if(worker->on_message != NULL) {
398 435
 				shm_free(worker->on_message);
399 436
 			}
400
-			for(s = 0; s < NATS_MAX_SERVERS; s++) {
401
-				if(worker->init_nats_servers[s]) {
402
-					shm_free(worker->init_nats_servers[s]);
403
-				}
404
-			}
405 437
 			shm_free(worker);
406 438
 		}
407 439
 	}
... ...
@@ -416,6 +448,15 @@ static void mod_destroy(void)
416 448
 	if(nats_destroy_workers() < 0) {
417 449
 		LM_ERR("could not cleanup workers\n");
418 450
 	}
451
+	if(nats_cleanup_init_sub() < 0) {
452
+		LM_INFO("could not cleanup init data\n");
453
+	}
454
+	if(nats_cleanup_connection(_nats_connection) < 0) {
455
+		LM_INFO("could not cleanup connection\n");
456
+	}
457
+	if(nats_cleanup_init_servers() < 0) {
458
+		LM_INFO("could not cleanup init server data\n");
459
+	}
419 460
 }
420 461
 
421 462
 int _init_nats_server_url_add(modparam_t type, void *val)
... ...
@@ -511,6 +552,7 @@ init_nats_server_ptr _init_nats_server_list_new(char *url)
511 552
 	return p;
512 553
 }
513 554
 
555
+
514 556
 int init_nats_server_url_add(char *url)
515 557
 {
516 558
 	init_nats_server_ptr n;
... ...
@@ -524,6 +566,14 @@ int init_nats_server_url_add(char *url)
524 566
 	return 0;
525 567
 }
526 568
 
569
+nats_connection_ptr _init_nats_connection()
570
+{
571
+	nats_connection_ptr p =
572
+			(nats_connection_ptr)shm_malloc(sizeof(nats_connection));
573
+	memset(p, 0, sizeof(nats_connection));
574
+	return p;
575
+}
576
+
527 577
 init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group)
528 578
 {
529 579
 	init_nats_sub_ptr p = (init_nats_sub_ptr)shm_malloc(sizeof(init_nats_sub));
Browse code

nats: make sure pkg & shm memory is allocated; check pointers before freeing memory

Emmanuel Schmidbauer authored on 28/06/2021 11:30:40
Showing 1 changed files
... ...
@@ -245,6 +245,10 @@ int init_worker(
245 245
 		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
246 246
 			len = strlen(s0->url);
247 247
 			sc = shm_malloc(len + 1);
248
+			if(!sc) {
249
+				LM_ERR("no shm memory left\n");
250
+				return -1;
251
+			}
248 252
 			strcpy(sc, s0->url);
249 253
 			sc[len] = '\0';
250 254
 			worker->init_nats_servers[num_servers++] = sc;
... ...
@@ -368,30 +372,38 @@ int nats_destroy_workers()
368 372
 	nats_consumer_worker_t *worker;
369 373
 	for(i = 0; i < _nats_proc_count; i++) {
370 374
 		worker = &nats_workers[i];
371
-		natsSubscription_Unsubscribe(worker->subscription);
372
-		natsSubscription_Destroy(worker->subscription);
373
-		natsConnection_Close(worker->conn);
374
-		natsConnection_Destroy(worker->conn);
375
-		natsOptions_Destroy(worker->opts);
376
-		if(worker->uvLoop != NULL) {
377
-			uv_loop_close(worker->uvLoop);
378
-		}
379
-		nats_Close();
380
-		if(worker->subject != NULL) {
381
-			shm_free(worker->subject);
382
-		}
383
-		if(worker->queue_group != NULL) {
384
-			shm_free(worker->queue_group);
385
-		}
386
-		if(worker->on_message != NULL) {
387
-			shm_free(worker->on_message);
388
-		}
389
-		for(s = 0; s < NATS_MAX_SERVERS; s++) {
390
-			if(worker->init_nats_servers[s]) {
391
-				shm_free(worker->init_nats_servers[s]);
375
+		if(worker != NULL) {
376
+			if(worker->subscription != NULL) {
377
+				natsSubscription_Unsubscribe(worker->subscription);
378
+				natsSubscription_Destroy(worker->subscription);
379
+			}
380
+			if(worker->conn != NULL) {
381
+				natsConnection_Close(worker->conn);
382
+				natsConnection_Destroy(worker->conn);
383
+			}
384
+			if(worker->opts != NULL) {
385
+				natsOptions_Destroy(worker->opts);
386
+			}
387
+			if(worker->uvLoop != NULL) {
388
+				uv_loop_close(worker->uvLoop);
389
+			}
390
+			nats_Close();
391
+			if(worker->subject != NULL) {
392
+				shm_free(worker->subject);
392 393
 			}
394
+			if(worker->queue_group != NULL) {
395
+				shm_free(worker->queue_group);
396
+			}
397
+			if(worker->on_message != NULL) {
398
+				shm_free(worker->on_message);
399
+			}
400
+			for(s = 0; s < NATS_MAX_SERVERS; s++) {
401
+				if(worker->init_nats_servers[s]) {
402
+					shm_free(worker->init_nats_servers[s]);
403
+				}
404
+			}
405
+			shm_free(worker);
393 406
 		}
394
-		shm_free(worker);
395 407
 	}
396 408
 	return 0;
397 409
 }
... ...
@@ -420,6 +432,10 @@ int _init_nats_server_url_add(modparam_t type, void *val)
420 432
 		return -1;
421 433
 	}
422 434
 	value = pkg_malloc(len + 1);
435
+	if(!value) {
436
+		LM_ERR("no pkg memory left\n");
437
+		return -1;
438
+	}
423 439
 	strcpy(value, url);
424 440
 	value[len] = '\0';
425 441
 	if(init_nats_server_url_add(url) < 0) {
... ...
@@ -434,6 +450,11 @@ int _init_nats_sub_add(modparam_t type, void *val)
434 450
 	char *sub = (char *)val;
435 451
 	int len = strlen(sub);
436 452
 	char *s = pkg_malloc(len + 1);
453
+	if(!s) {
454
+		LM_ERR("no pkg memory left\n");
455
+		return -1;
456
+	}
457
+
437 458
 	strcpy(s, sub);
438 459
 	s[len] = '\0';
439 460
 	if(init_nats_sub_add(s) < 0) {
... ...
@@ -529,6 +550,10 @@ int init_nats_sub_add(char *sc)
529 550
 
530 551
 	len = strlen(sc);
531 552
 	s = pkg_malloc(len + 1);
553
+	if(!s) {
554
+		LM_ERR("no pkg memory left\n");
555
+		return -1;
556
+	}
532 557
 	strcpy(s, sc);
533 558
 	s[len] = '\0';
534 559
 
Browse code

nats: new nats message consumer module

Emmanuel Schmidbauer authored on 27/06/2021 11:58:58
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,569 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#include "nats_mod.h"
26
+
27
+MODULE_VERSION
28
+
29
+init_nats_sub_ptr _init_nats_sc = NULL;
30
+init_nats_server_ptr _init_nats_srv = NULL;
31
+nats_consumer_worker_t *nats_workers = NULL;
32
+int _nats_proc_count;
33
+char *eventData = NULL;
34
+
35
+static pv_export_t nats_mod_pvs[] = {
36
+		{{"natsData", (sizeof("natsData") - 1)}, PVT_OTHER,
37
+				nats_pv_get_event_payload, 0, 0, 0, 0, 0},
38
+		{{0, 0}, 0, 0, 0, 0, 0, 0, 0}};
39
+
40
+static param_export_t params[] = {{"nats_url", PARAM_STRING | USE_FUNC_PARAM,
41
+										  (void *)_init_nats_server_url_add},
42
+		{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
43
+				(void *)_init_nats_sub_add}};
44
+
45
+struct module_exports exports = {
46
+		"nats", DEFAULT_DLFLAGS, /* dlopen flags */
47
+		0,						 /* Exported functions */
48
+		params,					 /* Exported parameters */
49
+		0,						 /* exported MI functions */
50
+		nats_mod_pvs,			 /* exported pseudo-variables */
51
+		0,						 /* response function*/
52
+		mod_init,				 /* module initialization function */
53
+		mod_child_init,			 /* per-child init function */
54
+		mod_destroy				 /* destroy function */
55
+};
56
+
57
+static void onMsg(
58
+		natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
59
+{
60
+	nats_on_message_ptr on_message = (nats_on_message_ptr)closure;
61
+	char *s = (char *)natsMsg_GetSubject(msg);
62
+	char *data = (char *)natsMsg_GetData(msg);
63
+	if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) {
64
+		LM_INFO("event-route [nats:%s] does not exist\n", s);
65
+		goto end;
66
+	}
67
+	eventData = data;
68
+	nats_run_cfg_route(on_message->rt);
69
+
70
+end:
71
+	eventData = NULL;
72
+	natsMsg_Destroy(msg);
73
+}
74
+
75
+static void connectedCB(natsConnection *nc, void *closure)
76
+{
77
+	char url[NATS_URL_MAX_SIZE];
78
+	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
79
+	nats_run_cfg_route(_nats_rts.connected);
80
+}
81
+
82
+static void disconnectedCb(natsConnection *nc, void *closure)
83
+{
84
+	char url[NATS_URL_MAX_SIZE];
85
+	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
86
+	nats_run_cfg_route(_nats_rts.disconnected);
87
+}
88
+
89
+static void reconnectedCb(natsConnection *nc, void *closure)
90
+{
91
+	char url[NATS_URL_MAX_SIZE];
92
+	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
93
+	nats_run_cfg_route(_nats_rts.connected);
94
+}
95
+
96
+static void closedCB(natsConnection *nc, void *closure)
97
+{
98
+	bool *closed = (bool *)closure;
99
+	const char *err = NULL;
100
+	natsConnection_GetLastError(nc, &err);
101
+	LM_INFO("connect failed: %s\n", err);
102
+	*closed = true;
103
+}
104
+
105
+void nats_consumer_worker_proc(
106
+		nats_consumer_worker_t *worker, const char *servers[])
107
+{
108
+	natsStatus s;
109
+	bool closed = false;
110
+
111
+	LM_INFO("nats worker connecting to subject [%s] queue group [%s]\n",
112
+			worker->subject, worker->queue_group);
113
+
114
+	s = natsOptions_Create(&worker->opts);
115
+	if(s != NATS_OK) {
116
+		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
117
+		return;
118
+	}
119
+	// use these defaults
120
+	natsOptions_SetAllowReconnect(worker->opts, true);
121
+	natsOptions_SetSecure(worker->opts, false);
122
+	natsOptions_SetMaxReconnect(worker->opts, 10000);
123
+	natsOptions_SetReconnectWait(worker->opts, 2 * 1000);	  // 2s
124
+	natsOptions_SetPingInterval(worker->opts, 2 * 60 * 1000); // 2m
125
+	natsOptions_SetMaxPingsOut(worker->opts, 2);
126
+	natsOptions_SetIOBufSize(worker->opts, 32 * 1024); // 32 KB
127
+	natsOptions_SetMaxPendingMsgs(worker->opts, 65536);
128
+	natsOptions_SetTimeout(worker->opts, 2 * 1000);					// 2s
129
+	natsOptions_SetReconnectBufSize(worker->opts, 8 * 1024 * 1024); // 8 MB;
130
+	natsOptions_SetReconnectJitter(worker->opts, 100, 1000); // 100ms, 1s;
131
+	s = natsOptions_SetServers(worker->opts, servers, 1);
132
+	if(s != NATS_OK) {
133
+		LM_ERR("could not set nats server [%s]\n", natsStatus_GetText(s));
134
+	}
135
+	s = natsOptions_SetDisconnectedCB(worker->opts, disconnectedCb, NULL);
136
+	if(s != NATS_OK) {
137
+		LM_ERR("could not set disconnect callback [%s]\n",
138
+				natsStatus_GetText(s));
139
+	}
140
+	s = natsOptions_SetReconnectedCB(worker->opts, reconnectedCb, NULL);
141
+	if(s != NATS_OK) {
142
+		LM_ERR("could not set reconnect callback [%s]\n",
143
+				natsStatus_GetText(s));
144
+	}
145
+	s = natsOptions_SetRetryOnFailedConnect(
146
+			worker->opts, true, connectedCB, NULL);
147
+	if(s != NATS_OK) {
148
+		LM_ERR("could not set retry on failed callback [%s]\n",
149
+				natsStatus_GetText(s));
150
+	}
151
+	s = natsOptions_SetClosedCB(worker->opts, closedCB, (void *)&closed);
152
+	if(s != NATS_OK) {
153
+		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
154
+	}
155
+
156
+	s = natsConnection_Connect(&worker->conn, worker->opts);
157
+	if(s != NATS_OK) {
158
+		LM_ERR("could not connect [%s]\n", natsStatus_GetText(s));
159
+	}
160
+	// create a loop
161
+	natsLibuv_Init();
162
+	worker->uvLoop = uv_default_loop();
163
+	if(worker->uvLoop != NULL) {
164
+		natsLibuv_SetThreadLocalLoop(worker->uvLoop);
165
+	} else {
166
+		s = NATS_ERR;
167
+	}
168
+
169
+	s = natsOptions_SetEventLoop(worker->opts, (void *)worker->uvLoop,
170
+			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
171
+			natsLibuv_Detach);
172
+	if(s != NATS_OK) {
173
+		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
174
+	}
175
+
176
+	if(s) {
177
+		LM_ERR("error setting options [%s]\n", natsStatus_GetText(s));
178
+	}
179
+
180
+	s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn,
181
+			worker->subject, worker->queue_group, onMsg, worker->on_message);
182
+	if(s != NATS_OK) {
183
+		LM_ERR("could not subscribe [%s]\n", natsStatus_GetText(s));
184
+	}
185
+
186
+	s = natsSubscription_SetPendingLimits(worker->subscription, -1, -1);
187
+	if(s != NATS_OK) {
188
+		LM_ERR("could not set pending limits [%s]\n", natsStatus_GetText(s));
189
+	}
190
+
191
+	// Run the event loop.
192
+	// This call will return when the connection is closed (either after
193
+	// receiving all messages, or disconnected and unable to reconnect).
194
+	if(s == NATS_OK) {
195
+		uv_run(worker->uvLoop, UV_RUN_DEFAULT);
196
+	}
197
+	if(s != NATS_OK) {
198
+		LM_ERR("nats error [%s]\n", natsStatus_GetText(s));
199
+	}
200
+}
201
+
202
+static int mod_init(void)
203
+{
204
+	if(faked_msg_init() < 0) {
205
+		LM_ERR("failed to init faked sip message\n");
206
+		return -1;
207
+	}
208
+	nats_init_environment();
209
+	register_procs(_nats_proc_count);
210
+	nats_workers =
211
+			shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t));
212
+	if(nats_workers == NULL) {
213
+		LM_ERR("error in shm_malloc\n");
214
+		return -1;
215
+	}
216
+	memset(nats_workers, 0, _nats_proc_count * sizeof(nats_consumer_worker_t));
217
+	return 0;
218
+}
219
+
220
+int init_worker(
221
+		nats_consumer_worker_t *worker, char *subject, char *queue_group)
222
+{
223
+	int buffsize = strlen(subject) + 6;
224
+	char routename[buffsize];
225
+	int rt;
226
+	int len;
227
+	char *sc;
228
+	int num_servers = 0;
229
+	init_nats_server_ptr s0;
230
+
231
+	memset(worker, 0, sizeof(*worker));
232
+	worker->subject = shm_malloc(strlen(subject) + 1);
233
+	strcpy(worker->subject, subject);
234
+	worker->subject[strlen(subject)] = '\0';
235
+	worker->queue_group = shm_malloc(strlen(queue_group) + 1);
236
+	strcpy(worker->queue_group, queue_group);
237
+	worker->queue_group[strlen(queue_group)] = '\0';
238
+	memset(worker->init_nats_servers, 0, sizeof(worker->init_nats_servers));
239
+	worker->on_message =
240
+			(nats_on_message_ptr)shm_malloc(sizeof(nats_on_message));
241
+	memset(worker->on_message, 0, sizeof(nats_on_message));
242
+
243
+	s0 = _init_nats_srv;
244
+	while(s0) {
245
+		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
246
+			len = strlen(s0->url);
247
+			sc = shm_malloc(len + 1);
248
+			strcpy(sc, s0->url);
249
+			sc[len] = '\0';
250
+			worker->init_nats_servers[num_servers++] = sc;
251
+		}
252
+		s0 = s0->next;
253
+	}
254
+	if(num_servers == 0) {
255
+		worker->init_nats_servers[0] = NATS_DEFAULT_URL;
256
+		LM_INFO("using default server [%s]\n", NATS_DEFAULT_URL);
257
+	}
258
+
259
+	snprintf(routename, buffsize, "nats:%s", subject);
260
+	routename[buffsize] = '\0';
261
+
262
+	rt = route_get(&event_rt, routename);
263
+	if(rt < 0 || event_rt.rlist[rt] == NULL) {
264
+		LM_INFO("route [%s] does not exist\n", routename);
265
+		worker->on_message->rt = -1;
266
+		return 0;
267
+	}
268
+	worker->on_message->rt = rt;
269
+	return 0;
270
+}
271
+
272
+void worker_loop(int id)
273
+{
274
+	nats_consumer_worker_t *worker = &nats_workers[id];
275
+	nats_consumer_worker_proc(worker, (const char **)worker->init_nats_servers);
276
+	for(;;) {
277
+		sleep(1000);
278
+	}
279
+}
280
+
281
+/**
282
+ * @brief Initialize async module children
283
+ */
284
+static int mod_child_init(int rank)
285
+{
286
+	init_nats_sub_ptr n;
287
+	int i = 0;
288
+	int newpid;
289
+
290
+	if(rank == PROC_INIT) {
291
+		n = _init_nats_sc;
292
+		while(n) {
293
+			if(init_worker(&nats_workers[i], n->sub, n->queue_group) < 0) {
294
+				LM_ERR("failed to init struct for worker[%d]\n", i);
295
+				return -1;
296
+			}
297
+			n = n->next;
298
+			i++;
299
+		}
300
+		if(nats_cleanup_init_sub() < 0) {
301
+			LM_INFO("could not cleanup init data\n");
302
+		}
303
+		if(nats_cleanup_init_servers() < 0) {
304
+			LM_INFO("could not cleanup init server data\n");
305
+		}
306
+		return 0;
307
+	}
308
+
309
+	if(rank == PROC_MAIN) {
310
+		for(i = 0; i < _nats_proc_count; i++) {
311
+			newpid = fork_process(PROC_RPC, "NATS WORKER", 1);
312
+			if(newpid < 0) {
313
+				LM_ERR("failed to fork worker process %d\n", i);
314
+				return -1;
315
+			} else if(newpid == 0) {
316
+				worker_loop(i);
317
+			} else {
318
+				nats_workers[i].pid = newpid;
319
+			}
320
+		}
321
+		return 0;
322
+	}
323
+
324
+	return 0;
325
+}
326
+
327
+int nats_cleanup_init_sub()
328
+{
329
+	init_nats_sub_ptr n0;
330
+	init_nats_sub_ptr n1;
331
+	n0 = _init_nats_sc;
332
+	while(n0) {
333
+		n1 = n0->next;
334
+		if(n0->sub != NULL) {
335
+			shm_free(n0->sub);
336
+		}
337
+		if(n0->queue_group != NULL) {
338
+			shm_free(n0->queue_group);
339
+		}
340
+		shm_free(n0);
341
+		n0 = n1;
342
+	}
343
+	_init_nats_sc = NULL;
344
+	return 0;
345
+}
346
+
347
+int nats_cleanup_init_servers()
348
+{
349
+	init_nats_server_ptr s0;
350
+	init_nats_server_ptr s1;
351
+	s0 = _init_nats_srv;
352
+	while(s0) {
353
+		s1 = s0->next;
354
+		if(s0->url != NULL) {
355
+			shm_free(s0->url);
356
+		}
357
+		shm_free(s0);
358
+		s0 = s1;
359
+	}
360
+	_init_nats_srv = NULL;
361
+	return 0;
362
+}
363
+
364
+int nats_destroy_workers()
365
+{
366
+	int i;
367
+	int s;
368
+	nats_consumer_worker_t *worker;
369
+	for(i = 0; i < _nats_proc_count; i++) {
370
+		worker = &nats_workers[i];
371
+		natsSubscription_Unsubscribe(worker->subscription);
372
+		natsSubscription_Destroy(worker->subscription);
373
+		natsConnection_Close(worker->conn);
374
+		natsConnection_Destroy(worker->conn);
375
+		natsOptions_Destroy(worker->opts);
376
+		if(worker->uvLoop != NULL) {
377
+			uv_loop_close(worker->uvLoop);
378
+		}
379
+		nats_Close();
380
+		if(worker->subject != NULL) {
381
+			shm_free(worker->subject);
382
+		}
383
+		if(worker->queue_group != NULL) {
384
+			shm_free(worker->queue_group);
385
+		}
386
+		if(worker->on_message != NULL) {
387
+			shm_free(worker->on_message);
388
+		}
389
+		for(s = 0; s < NATS_MAX_SERVERS; s++) {
390
+			if(worker->init_nats_servers[s]) {
391
+				shm_free(worker->init_nats_servers[s]);
392
+			}
393
+		}
394
+		shm_free(worker);
395
+	}
396
+	return 0;
397
+}
398
+
399
+/**
400
+ * destroy module function
401
+ */
402
+static void mod_destroy(void)
403
+{
404
+	if(nats_destroy_workers() < 0) {
405
+		LM_ERR("could not cleanup workers\n");
406
+	}
407
+}
408
+
409
+int _init_nats_server_url_add(modparam_t type, void *val)
410
+{
411
+	char *url = (char *)val;
412
+	int len = strlen(url);
413
+	char *value;
414
+	if(len > NATS_URL_MAX_SIZE) {
415
+		LM_ERR("connection url exceeds max size %d\n", NATS_URL_MAX_SIZE);
416
+		return -1;
417
+	}
418
+	if(strncmp(url, "nats://", 7)) {
419
+		LM_ERR("invalid nats url [%s]\n", url);
420
+		return -1;
421
+	}
422
+	value = pkg_malloc(len + 1);
423
+	strcpy(value, url);
424
+	value[len] = '\0';
425
+	if(init_nats_server_url_add(url) < 0) {
426