Browse code

Merge 82d006ebe8727d520ef34c29639d0ef06249bed7 into e74cb2371ab879874a8981818139024f1c2beb9e

Seven Du authored on 22/06/2022 23:45:49 • GitHub committed on 22/06/2022 23:45:49
Showing 6 changed files
... ...
@@ -27,6 +27,7 @@
27 27
 
28 28
 #include <nats/nats.h>
29 29
 #include <uv.h>
30
+#include "../../core/str.h"
30 31
 
31 32
 #define NATS_DEFAULT_URL "nats://localhost:4222"
32 33
 #define NATS_MAX_SERVERS 10
... ...
@@ -63,6 +64,8 @@ typedef struct _init_nats_server
63 64
 typedef struct _nats_on_message
64 65
 {
65 66
 	int rt;
67
+	char *_evname;
68
+	str evname;
66 69
 } nats_on_message, *nats_on_message_ptr;
67 70
 
68 71
 struct nats_consumer_worker
... ...
@@ -173,6 +173,37 @@ modparam("nats", "subject_queue_group", "Kamailio-World:2020")
173 173
 modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject
174 174
 modparam("nats", "subject_queue_group", "MyQueue1:2021")
175 175
 modparam("nats", "subject_queue_group", "MyQueue2:2021")
176
+...
177
+				</programlisting>
178
+			</example>
179
+		</section>
180
+		<section>
181
+			<title>
182
+				<varname>event_callback</varname>
183
+				(str)
184
+			</title>
185
+			<para>
186
+				Name of the KEMI function to be executed instead of the event route.
187
+			</para>
188
+			<para>
189
+				<emphasis>Default value is not set.</emphasis>
190
+			</para>
191
+			<example>
192
+				<title>
193
+					Set
194
+					<varname>event_callback</varname>
195
+					parameter
196
+				</title>
197
+				<programlisting format="linespecific">
198
+...
199
+modparam("nats", "event_callback", "ksr_nats_event")
200
+
201
+-- event callback function implemented in Lua
202
+function ksr_nats_event(evname)
203
+	KSR.info("===== nats module received event: " .. evname ..
204
+		", data:" .. KSR.pv.gete('$natsData') .. "\n");
205
+	return 1;
206
+end
176 207
 ...
177 208
 				</programlisting>
178 209
 			</example>
... ...
@@ -254,4 +285,4 @@ event_route[nats:MyQueue1]
254 285
 	</section>
255 286
 
256 287
 
257
-</chapter>
258 288
\ No newline at end of file
289
+</chapter>
... ...
@@ -24,6 +24,8 @@
24 24
 
25 25
 #include "defs.h"
26 26
 #include "nats_mod.h"
27
+#include "nats_pub.h"
28
+#include "../../core/kemi.h"
27 29
 
28 30
 MODULE_VERSION
29 31
 
... ...
@@ -38,6 +40,7 @@ char *eventData = NULL;
38 40
 
39 41
 int *nats_pub_worker_pipes_fds = NULL;
40 42
 int *nats_pub_worker_pipes = NULL;
43
+static str nats_event_callback = STR_NULL;
41 44
 
42 45
 static nats_evroutes_t _nats_rts;
43 46
 
... ...
@@ -50,7 +53,10 @@ static param_export_t params[] = {
50 53
 		{"nats_url", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_server_url_add},
51 54
 		{"num_publish_workers", INT_PARAM, &nats_pub_workers_num},
52 55
 		{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
53
-				(void *)_init_nats_sub_add}};
56
+				(void *)_init_nats_sub_add},
57
+		{"event_callback", PARAM_STR,   &nats_event_callback},
58
+		{0, 0, 0}
59
+};
54 60
 
55 61
 static cmd_export_t cmds[] = {{"nats_publish", (cmd_function)w_nats_publish_f,
56 62
 									  2, fixup_publish_get_value,
... ...
@@ -73,16 +79,9 @@ static void onMsg(
73 79
 		natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
74 80
 {
75 81
 	nats_on_message_ptr on_message = (nats_on_message_ptr)closure;
76
-	char *s = (char *)natsMsg_GetSubject(msg);
77 82
 	char *data = (char *)natsMsg_GetData(msg);
78
-	if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) {
79
-		LM_INFO("event-route [nats:%s] does not exist\n", s);
80
-		goto end;
81
-	}
82 83
 	eventData = data;
83
-	nats_run_cfg_route(on_message->rt);
84
-
85
-end:
84
+	nats_run_cfg_route(on_message->rt, &on_message->evname);
86 85
 	eventData = NULL;
87 86
 	natsMsg_Destroy(msg);
88 87
 }
... ...
@@ -90,22 +89,25 @@ end:
90 89
 static void connectedCB(natsConnection *nc, void *closure)
91 90
 {
92 91
 	char url[NATS_URL_MAX_SIZE];
92
+	str evname = str_init("nats:connected");
93 93
 	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
94
-	nats_run_cfg_route(_nats_rts.connected);
94
+	nats_run_cfg_route(_nats_rts.connected, &evname);
95 95
 }
96 96
 
97 97
 static void disconnectedCb(natsConnection *nc, void *closure)
98 98
 {
99 99
 	char url[NATS_URL_MAX_SIZE];
100
+	str evname = str_init("nats:disconnected");
100 101
 	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
101
-	nats_run_cfg_route(_nats_rts.disconnected);
102
+	nats_run_cfg_route(_nats_rts.disconnected, &evname);
102 103
 }
103 104
 
104 105
 static void reconnectedCb(natsConnection *nc, void *closure)
105 106
 {
106 107
 	char url[NATS_URL_MAX_SIZE];
108
+	str evname = str_init("nats:connected");
107 109
 	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
108
-	nats_run_cfg_route(_nats_rts.connected);
110
+	nats_run_cfg_route(_nats_rts.connected, &evname);
109 111
 }
110 112
 
111 113
 static void closedCB(natsConnection *nc, void *closure)
... ...
@@ -247,9 +249,13 @@ int init_worker(
247 249
 	if(rt < 0 || event_rt.rlist[rt] == NULL) {
248 250
 		LM_INFO("route [%s] does not exist\n", routename);
249 251
 		worker->on_message->rt = -1;
250
-		return 0;
252
+	} else {
253
+		worker->on_message->rt = rt;
251 254
 	}
252
-	worker->on_message->rt = rt;
255
+	worker->on_message->_evname = malloc(buffsize);
256
+	strcpy(worker->on_message->_evname, routename);
257
+	worker->on_message->evname.s = worker->on_message->_evname;
258
+	worker->on_message->evname.len = strlen(worker->on_message->_evname);
253 259
 	worker->nc = nc;
254 260
 	return 0;
255 261
 }
... ...
@@ -565,6 +571,9 @@ int nats_destroy_workers()
565 571
 				}
566 572
 			}
567 573
 			if(worker->on_message != NULL) {
574
+				if (worker->on_message->_evname) {
575
+					free(worker->on_message->_evname);
576
+				}
568 577
 				shm_free(worker->on_message);
569 578
 			}
570 579
 			shm_free(worker);
... ...
@@ -657,15 +666,18 @@ int _init_nats_sub_add(modparam_t type, void *val)
657 666
 /**
658 667
  * Invoke a event route block
659 668
  */
660
-int nats_run_cfg_route(int rt)
669
+int nats_run_cfg_route(int rt, str *evname)
661 670
 {
662 671
 	struct run_act_ctx ctx;
672
+	sr_kemi_eng_t *keng = NULL;
663 673
 	sip_msg_t *fmsg;
664 674
 	sip_msg_t tmsg;
665 675
 
676
+	keng = sr_kemi_eng_get();
677
+
666 678
 	// check for valid route pointer
667
-	if(rt < 0) {
668
-		return 0;
679
+	if(rt < 0 || !event_rt.rlist[rt]) {
680
+		if (keng == NULL) return 0;
669 681
 	}
670 682
 
671 683
 	fmsg = faked_msg_next();
... ...
@@ -673,6 +685,13 @@ int nats_run_cfg_route(int rt)
673 685
 	fmsg = &tmsg;
674 686
 	set_route_type(EVENT_ROUTE);
675 687
 	init_run_actions_ctx(&ctx);
688
+	if (rt < 0 && keng) {
689
+		if (sr_kemi_route(keng, fmsg, EVENT_ROUTE,
690
+			&nats_event_callback, evname) < 0) {
691
+			LM_ERR("error running event route kemi callback\n");
692
+		}
693
+		return 0;
694
+	}
676 695
 	run_top_route(event_rt.rlist[rt], fmsg, 0);
677 696
 	return 0;
678 697
 }
... ...
@@ -791,3 +810,35 @@ int nats_pv_get_event_payload(
791 810
 	return eventData == NULL ? pv_get_null(msg, param, res)
792 811
 							 : pv_get_strzval(msg, param, res, eventData);
793 812
 }
813
+
814
+/**
815
+ *
816
+ */
817
+int ki_nats_publish(sip_msg_t *msg, str *subject, str *payload)
818
+{
819
+	return w_nats_publish(msg, *subject, *payload);
820
+}
821
+
822
+/**
823
+ *
824
+ */
825
+/* clang-format off */
826
+static sr_kemi_t sr_kemi_nats_exports[] = {
827
+	{ str_init("nats"), str_init("publish"),
828
+		SR_KEMIP_INT, ki_nats_publish,
829
+		{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
830
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
831
+	},
832
+
833
+	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
834
+};
835
+/* clang-format on */
836
+
837
+/**
838
+ *
839
+ */
840
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
841
+{
842
+	sr_kemi_modules_add(sr_kemi_nats_exports);
843
+	return 0;
844
+}
... ...
@@ -40,7 +40,7 @@ extern int fixup_publish_get_value(void **param, int param_no);
40 40
 extern int fixup_publish_get_value_free(void **param, int param_no);
41 41
 extern void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events);
42 42
 
43
-int nats_run_cfg_route(int rt);
43
+int nats_run_cfg_route(int rt, str *evname);
44 44
 void nats_init_environment();
45 45
 
46 46
 int _init_nats_server_url_add(modparam_t type, void *val);
... ...
@@ -90,6 +90,11 @@ int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload)
90 90
 		return -1;
91 91
 	}
92 92
 
93
+	return w_nats_publish(msg, subj_s, payload_s);
94
+}
95
+
96
+int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s)
97
+{
93 98
 	// round-robin pub workers
94 99
 	pub_worker++;
95 100
 	if(pub_worker >= nats_pub_workers_num) {
... ...
@@ -38,6 +38,7 @@ typedef struct _nats_pub_delivery
38 38
 nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload);
39 39
 void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr);
40 40
 int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload);
41
+int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s);
41 42
 int fixup_publish_get_value(void **param, int param_no);
42 43
 int fixup_publish_get_value_free(void **param, int param_no);
43 44