Browse code

Merge 78b1fa4c8d764f6774bd949c176c3394e018e8cf into adb4d30a985b8f1425934f5bf22295b7ad62c249

Stefan Mititelu authored on 16/11/2021 08:12:49 • GitHub committed on 16/11/2021 08:12:49
Showing 3 changed files
... ...
@@ -151,6 +151,43 @@ modparam("nats", "subject_queue_group", "MyQueue2:2021")
151 151
 			</example>
152 152
 		</section>
153 153
 	</section>
154
+
155
+	<section>
156
+		<title>Functions</title>
157
+		<section id="nats.f.nats_publish">
158
+			<title>
159
+				<function moreinfo="none">nats_publish(payload[, url, subject])</function>
160
+			</title>
161
+			<para>
162
+				Publishes the payload text to all configured nats urls, to all configured subject queues.
163
+			</para>
164
+			<para>
165
+				If "url" optional parameter is specified, will publish just to that url. Note that the url must be configured in the first place.
166
+			</para>
167
+			<para>
168
+				If "subject" optional parameter is specified, will publish just to that subject. Note that the subject must be configured in the first place.
169
+			</para>
170
+			<para>
171
+				Mixing of optional parameters are allowed, as in the example below.
172
+			</para>
173
+			<example>
174
+				<title><function>nats_publish</function> usage</title>
175
+				<programlisting format="linespecific">
176
+...
177
+$var(my_info)="$ci=" + $ci + " $fU=" + $fU;
178
+
179
+nats_publish($var(my_info));					# publish to all configured urls, all configured subjects
180
+nats_publish($var(my_info), "", "");				# same as the above ^
181
+
182
+nats_publish($var(my_info), "nats://127.0.0.3:4222", "foo");	# publish to "nats://127.0.0.3:4222" url, "foo" subject
183
+nats_publish($var(my_info), "", "foo");				# publish to all configured urls, "foo" subject
184
+nats_publish($var(my_info), "nats://127.0.0.3:4222", "");	# publish to "nats://127.0.0.3:4222" url, all configured subjects
185
+...
186
+				</programlisting>
187
+			</example>
188
+		</section>
189
+	</section>
190
+
154 191
 	<section>
155 192
 		<title>Pseudo Variables</title>
156 193
 		<itemizedlist>
... ...
@@ -204,4 +241,4 @@ event_route[nats:MyQueue1]
204 241
 	</section>
205 242
 
206 243
 
207
-</chapter>
208 244
\ No newline at end of file
245
+</chapter>
... ...
@@ -32,6 +32,10 @@ nats_consumer_worker_t *nats_workers = NULL;
32 32
 int _nats_proc_count;
33 33
 char *eventData = NULL;
34 34
 
35
+static int nats_publish_1_f(struct sip_msg * msg, char *payload);
36
+static int nats_publish_2_f(struct sip_msg * msg, char *payload, char *url);
37
+static int nats_publish_3_f(struct sip_msg * msg, char *payload, char *url, char *subj);
38
+
35 39
 static pv_export_t nats_mod_pvs[] = {
36 40
 		{{"natsData", (sizeof("natsData") - 1)}, PVT_OTHER,
37 41
 				nats_pv_get_event_payload, 0, 0, 0, 0, 0},
... ...
@@ -42,9 +46,15 @@ static param_export_t params[] = {{"nats_url", PARAM_STRING | USE_FUNC_PARAM,
42 46
 		{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
43 47
 				(void *)_init_nats_sub_add}};
44 48
 
49
+static cmd_export_t functions [] = {
50
+        {"nats_publish", (cmd_function)nats_publish_1_f, 1, 0, 0, ANY_ROUTE},
51
+        {"nats_publish", (cmd_function)nats_publish_2_f, 2, 0, 0, ANY_ROUTE},
52
+        {"nats_publish", (cmd_function)nats_publish_3_f, 3, 0, 0, ANY_ROUTE},
53
+};
54
+
45 55
 struct module_exports exports = {
46 56
 		"nats", DEFAULT_DLFLAGS, /* dlopen flags */
47
-		0,						 /* Exported functions */
57
+		functions,					 /* Exported functions */
48 58
 		params,					 /* Exported parameters */
49 59
 		0,						 /* exported MI functions */
50 60
 		nats_mod_pvs,			 /* exported pseudo-variables */
... ...
@@ -301,12 +311,6 @@ static int mod_child_init(int rank)
301 311
 			n = n->next;
302 312
 			i++;
303 313
 		}
304
-		if(nats_cleanup_init_sub() < 0) {
305
-			LM_INFO("could not cleanup init data\n");
306
-		}
307
-		if(nats_cleanup_init_servers() < 0) {
308
-			LM_INFO("could not cleanup init server data\n");
309
-		}
310 314
 		return 0;
311 315
 	}
312 316
 
... ...
@@ -358,9 +362,19 @@ int nats_cleanup_init_servers()
358 362
 		if(s0->url != NULL) {
359 363
 			shm_free(s0->url);
360 364
 		}
365
+
366
+		if(s0->conn != NULL) {
367
+			// Destroy all our objects to avoid report of memory leak
368
+			natsConnection_Destroy(s0->conn);
369
+		}
370
+
361 371
 		shm_free(s0);
362 372
 		s0 = s1;
363 373
 	}
374
+
375
+	// To silence reports of memory still in used with valgrind
376
+	nats_Close();
377
+
364 378
 	_init_nats_srv = NULL;
365 379
 	return 0;
366 380
 }
... ...
@@ -416,6 +430,14 @@ static void mod_destroy(void)
416 430
 	if(nats_destroy_workers() < 0) {
417 431
 		LM_ERR("could not cleanup workers\n");
418 432
 	}
433
+
434
+	if(nats_cleanup_init_sub() < 0) {
435
+		LM_INFO("could not cleanup init data\n");
436
+	}
437
+
438
+	if(nats_cleanup_init_servers() < 0) {
439
+		LM_INFO("could not cleanup init server data\n");
440
+	}
419 441
 }
420 442
 
421 443
 int _init_nats_server_url_add(modparam_t type, void *val)
... ...
@@ -511,6 +533,79 @@ init_nats_server_ptr _init_nats_server_list_new(char *url)
511 533
 	return p;
512 534
 }
513 535
 
536
+void _init_nats_server_conn (init_nats_server_ptr p)
537
+{
538
+	natsOptions *opts  = NULL;
539
+	natsStatus s = NATS_OK;
540
+	const char *servers[1];
541
+	bool closed = false;
542
+
543
+	// nats create options
544
+	if ((s = natsOptions_Create(&opts)) != NATS_OK) {
545
+		LM_ERR("could not create nats options for %s [%s]\n", p->url, natsStatus_GetText(s));
546
+		return ;
547
+	}
548
+
549
+	// use these defaults
550
+	natsOptions_SetAllowReconnect(opts, true);
551
+	natsOptions_SetSecure(opts, false);
552
+	natsOptions_SetMaxReconnect(opts, 10000);
553
+	natsOptions_SetReconnectWait(opts, 2 * 1000); // 2s
554
+	natsOptions_SetPingInterval(opts, 2 * 60 * 1000); // 2m
555
+	natsOptions_SetMaxPingsOut(opts, 2);
556
+	natsOptions_SetIOBufSize(opts, 32 * 1024); // 32 KB
557
+	natsOptions_SetMaxPendingMsgs(opts, 65536);
558
+	natsOptions_SetTimeout(opts, 2 * 1000); // 2s
559
+	natsOptions_SetReconnectBufSize(opts, 8 * 1024 * 1024); // 8 MB;
560
+	natsOptions_SetReconnectJitter(opts, 100, 1000); // 100ms, 1s;
561
+
562
+	// nats set servers and options
563
+	servers[0] = p->url;
564
+	if ((s = natsOptions_SetServers(opts, servers, 1)) != NATS_OK)
565
+	{
566
+		LM_ERR("could not set nats server %s [%s]\n",
567
+			p->url, natsStatus_GetText(s));
568
+		return ;
569
+	}
570
+
571
+	// nats set publisher callbacks
572
+	s = natsOptions_SetDisconnectedCB(opts, disconnectedCb, NULL);
573
+	if(s != NATS_OK) {
574
+		LM_ERR("could not set disconnect callback for %s [%s]\n",
575
+				p->url, natsStatus_GetText(s));
576
+	}
577
+
578
+	s = natsOptions_SetReconnectedCB(opts, reconnectedCb, NULL);
579
+	if(s != NATS_OK) {
580
+		LM_ERR("could not set reconnect callback for %s [%s]\n",
581
+				p->url, natsStatus_GetText(s));
582
+	}
583
+
584
+	s = natsOptions_SetRetryOnFailedConnect(
585
+			opts, true, connectedCB, NULL);
586
+	if(s != NATS_OK) {
587
+		LM_ERR("could not set retry on failed callback for %s [%s]\n",
588
+				p->url, natsStatus_GetText(s));
589
+	}
590
+
591
+	s = natsOptions_SetClosedCB(opts, closedCB, (void *)&closed);
592
+	if(s != NATS_OK) {
593
+		LM_ERR("could not set closed callback for %s [%s]\n",
594
+				p->url, natsStatus_GetText(s));
595
+	}
596
+
597
+	// nats connect to the server
598
+	if ((s = natsConnection_Connect(&p->conn, opts)) != NATS_OK)
599
+	{
600
+		LM_ERR("could not connect to nats server %s [%s]\n",
601
+				p->url, natsStatus_GetText(s));
602
+	}
603
+
604
+	natsOptions_Destroy(opts);
605
+	return ;
606
+}
607
+
608
+
514 609
 int init_nats_server_url_add(char *url)
515 610
 {
516 611
 	init_nats_server_ptr n;
... ...
@@ -519,6 +614,7 @@ int init_nats_server_url_add(char *url)
519 614
 		n = n->next;
520 615
 	}
521 616
 	n = _init_nats_server_list_new(url);
617
+	_init_nats_server_conn(n);
522 618
 	n->next = _init_nats_srv;
523 619
 	_init_nats_srv = n;
524 620
 	return 0;
... ...
@@ -592,3 +688,60 @@ int nats_pv_get_event_payload(
592 688
 	return eventData == NULL ? pv_get_null(msg, param, res)
593 689
 							 : pv_get_strzval(msg, param, res, eventData);
594 690
 }
691
+
692
+static int nats_publish_1_f(struct sip_msg *msg, char *payload)
693
+{
694
+	return nats_publish_3_f(msg, payload, "", "");
695
+}
696
+
697
+static int nats_publish_2_f(struct sip_msg *msg, char *payload, char *url)
698
+{
699
+	return nats_publish_3_f(msg, payload, url, "");
700
+}
701
+
702
+static int nats_publish_3_f(struct sip_msg *msg, char *payload, char *url, char *subj)
703
+{
704
+	natsStatus status = NATS_OK;
705
+	init_nats_server_ptr n;
706
+	init_nats_sub_ptr s;
707
+
708
+	int dataLen = 0;
709
+	dataLen = (int) strlen(payload);
710
+
711
+	n = _init_nats_srv;
712
+	while (n) {
713
+		// publish to specific url only
714
+		if (strcmp(url, "") != 0) {
715
+			if (strcmp(n->url, url) != 0) {
716
+				n = n->next;
717
+				continue;
718
+			}
719
+		}
720
+		
721
+		// publish to specific subject queue only
722
+		if (strcmp(subj, "") != 0) {
723
+			if ((status = natsConnection_Publish(n->conn, subj, (const void*) payload, dataLen)) != NATS_OK)
724
+			{
725
+				LM_ERR("could not publish to nats server %s [%s]\n", n->url, natsStatus_GetText(status));
726
+			}
727
+
728
+			n = n->next;
729
+			continue;
730
+		}
731
+
732
+		// publish to all configured subject queues
733
+		s = _init_nats_sc;
734
+		while (s) {
735
+			if ((status = natsConnection_Publish(n->conn, s->sub, (const void*) payload, dataLen)) != NATS_OK)
736
+			{
737
+				LM_ERR("could not publish to nats server %s [%s]\n", n->url, natsStatus_GetText(status));
738
+			}
739
+
740
+			s = s->next;
741
+		}
742
+
743
+		n = n->next;
744
+	}
745
+
746
+	return 1;
747
+}
... ...
@@ -53,6 +53,7 @@ typedef struct _init_nats_sub
53 53
 typedef struct _init_nats_server
54 54
 {
55 55
 	char *url;
56
+	natsConnection *conn;
56 57
 	struct _init_nats_server *next;
57 58
 } init_nats_server, *init_nats_server_ptr;
58 59