Browse code

Merge ce21bd8a0cd4c74227bc43bef2bdd8f4a8006171 into 6ce9c3c897055ff9942634c493caf79780ccf71b

Emmanuel Schmidbauer authored on 05/01/2022 09:33:26 • GitHub committed on 05/01/2022 09:33:26
Showing 6 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,92 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#ifndef __NATS_DEFS_H_
26
+#define __NATS_DEFS_H_
27
+
28
+#include <nats/nats.h>
29
+#include <uv.h>
30
+
31
+#define NATS_DEFAULT_URL "nats://localhost:4222"
32
+#define NATS_MAX_SERVERS 10
33
+#define NATS_URL_MAX_SIZE 256
34
+#define DEFAULT_NUM_PUB_WORKERS 2
35
+
36
+typedef struct _nats_connection
37
+{
38
+	natsConnection *conn;
39
+	natsOptions *opts;
40
+	char *servers[NATS_MAX_SERVERS];
41
+} nats_connection, *nats_connection_ptr;
42
+
43
+
44
+typedef struct _nats_evroutes
45
+{
46
+	int connected;
47
+	int disconnected;
48
+} nats_evroutes_t;
49
+static nats_evroutes_t _nats_rts;
50
+
51
+typedef struct _init_nats_sub
52
+{
53
+	char *sub;
54
+	char *queue_group;
55
+	struct _init_nats_sub *next;
56
+} init_nats_sub, *init_nats_sub_ptr;
57
+
58
+typedef struct _init_nats_server
59
+{
60
+	char *url;
61
+	struct _init_nats_server *next;
62
+} init_nats_server, *init_nats_server_ptr;
63
+
64
+typedef struct _nats_on_message
65
+{
66
+	int rt;
67
+} nats_on_message, *nats_on_message_ptr;
68
+
69
+struct nats_consumer_worker
70
+{
71
+	char *subject;
72
+	char *queue_group;
73
+	int pid;
74
+	natsSubscription *subscription;
75
+	uv_loop_t *uvLoop;
76
+	nats_connection_ptr nc;
77
+	nats_on_message_ptr on_message;
78
+};
79
+typedef struct nats_consumer_worker nats_consumer_worker_t;
80
+
81
+struct nats_pub_worker
82
+{
83
+	int pid;
84
+	int fd;
85
+	uv_loop_t *uvLoop;
86
+	uv_pipe_t pipe;
87
+	uv_poll_t poll;
88
+	nats_connection_ptr nc;
89
+};
90
+typedef struct nats_pub_worker nats_pub_worker_t;
91
+
92
+#endif
... ...
@@ -117,6 +117,33 @@ modparam("nats", "nats_url", "nats://127.0.0.1:4222")
117 117
 modparam("nats", "nats_url", "nats://user1:pass1127.0.1.2:4222") // with auth
118 118
 modparam("nats", "nats_url", "nats://127.1.2.3:4222")
119 119
 ...
120
+</programlisting>
121
+			</example>
122
+		</section>
123
+		<section>
124
+			<title>
125
+				<varname>num_publish_workers</varname>
126
+				(int)
127
+			</title>
128
+			<para>
129
+				The number of worker threads for publishing messages.
130
+			</para>
131
+			<para>
132
+				Usage: nats related.
133
+			</para>
134
+			<para>
135
+				<emphasis>Default value is <quote>2</quote>.</emphasis>
136
+			</para>
137
+			<example>
138
+				<title>
139
+					Set
140
+					<varname>num_publish_workers</varname>
141
+					parameter
142
+				</title>
143
+				<programlisting format="linespecific">
144
+...
145
+modparam("nats", "num_publish_workers", 4)
146
+...
120 147
 </programlisting>
121 148
 			</example>
122 149
 		</section>
... ...
@@ -146,6 +173,29 @@ modparam("nats", "subject_queue_group", "Kamailio-World:2020")
146 173
 modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject
147 174
 modparam("nats", "subject_queue_group", "MyQueue1:2021")
148 175
 modparam("nats", "subject_queue_group", "MyQueue2:2021")
176
+...
177
+				</programlisting>
178
+			</example>
179
+		</section>
180
+	</section>
181
+	<section>
182
+		<title>Functions</title>
183
+		<section id="nats.f.nats_publish">
184
+			<title>
185
+				<function moreinfo="none">nats_publish(subject, payload)</function>
186
+			</title>
187
+			<para>
188
+				Publishes the payload to subject.
189
+			</para>
190
+			<example>
191
+				<title>
192
+					<function>nats_publish</function>
193
+					usage
194
+				</title>
195
+				<programlisting format="linespecific">
196
+...
197
+$var(my_info) = "$ci=" + $ci + " $fU=" + $fU;
198
+nats_publish("mysubject", "$var(my_info)"); # publish $var(my_info) to "mysubject"
149 199
 ...
150 200
 				</programlisting>
151 201
 			</example>
... ...
@@ -22,6 +22,7 @@
22 22
  *
23 23
  */
24 24
 
25
+#include "defs.h"
25 26
 #include "nats_mod.h"
26 27
 
27 28
 MODULE_VERSION
... ...
@@ -29,23 +30,34 @@ MODULE_VERSION
29 30
 init_nats_sub_ptr _init_nats_sc = NULL;
30 31
 init_nats_server_ptr _init_nats_srv = NULL;
31 32
 nats_consumer_worker_t *nats_workers = NULL;
32
-nats_connection_ptr _nats_connection = NULL;
33
+nats_pub_worker_t *nats_pub_workers = NULL;
34
+int nats_pub_workers_num = DEFAULT_NUM_PUB_WORKERS;
35
+
33 36
 int _nats_proc_count;
34 37
 char *eventData = NULL;
35 38
 
39
+int *nats_pub_worker_pipes_fds = NULL;
40
+int *nats_pub_worker_pipes = NULL;
41
+
36 42
 static pv_export_t nats_mod_pvs[] = {
37 43
 		{{"natsData", (sizeof("natsData") - 1)}, PVT_OTHER,
38 44
 				nats_pv_get_event_payload, 0, 0, 0, 0, 0},
39 45
 		{{0, 0}, 0, 0, 0, 0, 0, 0, 0}};
40 46
 
41
-static param_export_t params[] = {{"nats_url", PARAM_STRING | USE_FUNC_PARAM,
42
-										  (void *)_init_nats_server_url_add},
47
+static param_export_t params[] = {
48
+		{"nats_url", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_server_url_add},
49
+		{"num_publish_workers", INT_PARAM, &nats_pub_workers_num},
43 50
 		{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
44 51
 				(void *)_init_nats_sub_add}};
45 52
 
53
+static cmd_export_t cmds[] = {{"nats_publish", (cmd_function)w_nats_publish_f,
54
+									  2, fixup_publish_get_value,
55
+									  fixup_publish_get_value_free, ANY_ROUTE},
56
+		{0, 0, 0, 0, 0, 0}};
57
+
46 58
 struct module_exports exports = {
47 59
 		"nats", DEFAULT_DLFLAGS, /* dlopen flags */
48
-		0,						 /* Exported functions */
60
+		cmds,					 /* Exported functions */
49 61
 		params,					 /* Exported parameters */
50 62
 		0,						 /* exported MI functions */
51 63
 		nats_mod_pvs,			 /* exported pseudo-variables */
... ...
@@ -104,12 +116,13 @@ static void closedCB(natsConnection *nc, void *closure)
104 116
 }
105 117
 
106 118
 void nats_consumer_worker_proc(
107
-		nats_consumer_worker_t *worker, nats_connection_ptr c)
119
+		nats_consumer_worker_t *worker)
108 120
 {
109 121
 	natsStatus s = NATS_OK;
110 122
 
111 123
 	// create a loop
112 124
 	natsLibuv_Init();
125
+
113 126
 	worker->uvLoop = uv_default_loop();
114 127
 	if(worker->uvLoop != NULL) {
115 128
 		natsLibuv_SetThreadLocalLoop(worker->uvLoop);
... ...
@@ -119,23 +132,20 @@ void nats_consumer_worker_proc(
119 132
 	if(s != NATS_OK) {
120 133
 		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
121 134
 	}
122
-	if((s = natsConnection_Connect(&worker->conn, c->opts)) != NATS_OK) {
135
+	if((s = natsConnection_Connect(&worker->nc->conn, worker->nc->opts))
136
+			!= NATS_OK) {
123 137
 		LM_ERR("could not connect to nats servers [%s]\n",
124 138
 				natsStatus_GetText(s));
125 139
 	}
126 140
 
127
-	s = natsOptions_SetEventLoop(c->opts, (void *)worker->uvLoop,
141
+	s = natsOptions_SetEventLoop(worker->nc->opts, (void *)worker->uvLoop,
128 142
 			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
129 143
 			natsLibuv_Detach);
130 144
 	if(s != NATS_OK) {
131 145
 		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
132 146
 	}
133 147
 
134
-	if(s) {
135
-		LM_ERR("error setting options [%s]\n", natsStatus_GetText(s));
136
-	}
137
-
138
-	s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn,
148
+	s = natsConnection_QueueSubscribe(&worker->subscription, worker->nc->conn,
139 149
 			worker->subject, worker->queue_group, onMsg, worker->on_message);
140 150
 	if(s != NATS_OK) {
141 151
 		LM_ERR("could not subscribe [%s]\n", natsStatus_GetText(s));
... ...
@@ -159,17 +169,30 @@ void nats_consumer_worker_proc(
159 169
 
160 170
 static int mod_init(void)
161 171
 {
172
+	int i = 0;
173
+	int total_procs = _nats_proc_count + nats_pub_workers_num;
162 174
 	if(faked_msg_init() < 0) {
163 175
 		LM_ERR("failed to init faked sip message\n");
164 176
 		return -1;
165 177
 	}
166
-	nats_init_environment();
167
-	_nats_connection = _init_nats_connection();
168
-	if(nats_init_connection(_nats_connection) < 0) {
169
-		LM_ERR("failed to init nat connections\n");
170
-		return -1;
178
+	register_procs(total_procs);
179
+
180
+	nats_pub_worker_pipes_fds =
181
+			(int *)shm_malloc(sizeof(int) * (nats_pub_workers_num)*2);
182
+	nats_pub_worker_pipes =
183
+			(int *)shm_malloc(sizeof(int) * nats_pub_workers_num);
184
+	for(i = 0; i < nats_pub_workers_num; i++) {
185
+		nats_pub_worker_pipes_fds[i * 2] =
186
+				nats_pub_worker_pipes_fds[i * 2 + 1] = -1;
187
+		if(pipe(&nats_pub_worker_pipes_fds[i * 2]) < 0) {
188
+			LM_ERR("worker pipe(%d) failed\n", i);
189
+			return -1;
190
+		}
191
+	}
192
+	for(i = 0; i < nats_pub_workers_num; i++) {
193
+		nats_pub_worker_pipes[i] = nats_pub_worker_pipes_fds[i * 2 + 1];
171 194
 	}
172
-	register_procs(_nats_proc_count);
195
+
173 196
 	nats_workers =
174 197
 			shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t));
175 198
 	if(nats_workers == NULL) {
... ...
@@ -177,6 +200,15 @@ static int mod_init(void)
177 200
 		return -1;
178 201
 	}
179 202
 	memset(nats_workers, 0, _nats_proc_count * sizeof(nats_consumer_worker_t));
203
+
204
+	nats_pub_workers =
205
+			shm_malloc(nats_pub_workers_num * sizeof(nats_pub_worker_t));
206
+	if(nats_pub_workers == NULL) {
207
+		LM_ERR("error in shm_malloc\n");
208
+		return -1;
209
+	}
210
+	memset(nats_pub_workers, 0,
211
+			nats_pub_workers_num * sizeof(nats_pub_worker_t));
180 212
 	return 0;
181 213
 }
182 214
 
... ...
@@ -186,6 +218,14 @@ int init_worker(
186 218
 	int buffsize = strlen(subject) + 6;
187 219
 	char routename[buffsize];
188 220
 	int rt;
221
+	nats_connection_ptr nc = NULL;
222
+
223
+	nats_init_environment();
224
+	nc = _init_nats_connection();
225
+	if(nats_init_connection(nc) < 0) {
226
+		LM_ERR("failed to init nat connections\n");
227
+		return -1;
228
+	}
189 229
 
190 230
 	memset(worker, 0, sizeof(*worker));
191 231
 	worker->subject = shm_malloc(strlen(subject) + 1);
... ...
@@ -208,18 +248,77 @@ int init_worker(
208 248
 		return 0;
209 249
 	}
210 250
 	worker->on_message->rt = rt;
251
+	worker->nc = nc;
211 252
 	return 0;
212 253
 }
213 254
 
214
-void worker_loop(int id, nats_connection_ptr c)
255
+int init_pub_worker(
256
+		nats_pub_worker_t *worker)
257
+{
258
+	nats_connection_ptr nc = NULL;
259
+	nc = _init_nats_connection();
260
+	if(nats_init_connection(nc) < 0) {
261
+		LM_ERR("failed to init nat connections\n");
262
+		return -1;
263
+	}
264
+	memset(worker, 0, sizeof(*worker));
265
+	worker->nc = nc;
266
+	return 0;
267
+}
268
+
269
+void worker_loop(int id)
215 270
 {
216 271
 	nats_consumer_worker_t *worker = &nats_workers[id];
217
-	nats_consumer_worker_proc(worker, c);
272
+	nats_consumer_worker_proc(worker);
218 273
 	for(;;) {
219 274
 		sleep(1000);
220 275
 	}
221 276
 }
222 277
 
278
+int _nats_pub_worker_proc(
279
+		nats_pub_worker_t *worker, int fd)
280
+{
281
+	natsStatus s = NATS_OK;
282
+
283
+	natsLibuv_Init();
284
+	worker->fd = fd;
285
+	worker->uvLoop = uv_default_loop();
286
+	if(worker->uvLoop != NULL) {
287
+		natsLibuv_SetThreadLocalLoop(worker->uvLoop);
288
+	} else {
289
+		s = NATS_ERR;
290
+	}
291
+	if(s != NATS_OK) {
292
+		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
293
+	}
294
+
295
+	if((s = natsConnection_Connect(&worker->nc->conn, worker->nc->opts))
296
+			!= NATS_OK) {
297
+		LM_ERR("could not connect to nats servers [%s]\n",
298
+				natsStatus_GetText(s));
299
+	}
300
+	s = natsOptions_SetEventLoop(worker->nc->opts, (void *)worker->uvLoop,
301
+			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
302
+			natsLibuv_Detach);
303
+	if(s != NATS_OK) {
304
+		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
305
+	}
306
+
307
+	uv_pipe_init(worker->uvLoop, &worker->pipe, 0);
308
+	uv_pipe_open(&worker->pipe, worker->fd);
309
+	if(uv_poll_init(worker->uvLoop, &worker->poll, worker->fd) < 0) {
310
+		LM_ERR("uv_poll_init failed\n");
311
+		return 0;
312
+	}
313
+	uv_handle_set_data((uv_handle_t *)&worker->poll, (nats_pub_worker_t *)worker);
314
+	if(uv_poll_start(&worker->poll, UV_READABLE | UV_DISCONNECT, _nats_pub_worker_cb)
315
+			< 0) {
316
+		LM_ERR("uv_poll_start failed\n");
317
+		return 0;
318
+	}
319
+	return uv_run(worker->uvLoop, UV_RUN_DEFAULT);
320
+}
321
+
223 322
 /**
224 323
  * @brief Initialize async module children
225 324
  */
... ...
@@ -233,28 +332,51 @@ static int mod_child_init(int rank)
233 332
 		n = _init_nats_sc;
234 333
 		while(n) {
235 334
 			if(init_worker(&nats_workers[i], n->sub, n->queue_group) < 0) {
236
-				LM_ERR("failed to init struct for worker[%d]\n", i);
335
+				LM_ERR("failed to init struct for worker [%d]\n", i);
237 336
 				return -1;
238 337
 			}
239 338
 			n = n->next;
240 339
 			i++;
241 340
 		}
341
+
342
+		for(i = 0; i < nats_pub_workers_num; i++) {
343
+			if(init_pub_worker(&nats_pub_workers[i]) < 0) {
344
+				LM_ERR("failed to init struct for pub worker[%d]\n", i);
345
+				return -1;
346
+			}
347
+		}
348
+
242 349
 		return 0;
243 350
 	}
244 351
 
245 352
 	if(rank == PROC_MAIN) {
246 353
 		for(i = 0; i < _nats_proc_count; i++) {
247
-			newpid = fork_process(PROC_RPC, "NATS WORKER", 1);
354
+			newpid = fork_process(PROC_RPC, "NATS Subscriber", 1);
248 355
 			if(newpid < 0) {
249 356
 				LM_ERR("failed to fork worker process %d\n", i);
250 357
 				return -1;
251 358
 			} else if(newpid == 0) {
252
-				worker_loop(i, _nats_connection);
359
+				worker_loop(i);
253 360
 			} else {
254 361
 				nats_workers[i].pid = newpid;
255 362
 			}
256 363
 		}
257
-		return 0;
364
+
365
+		for(i = 0; i < nats_pub_workers_num; i++) {
366
+			newpid = fork_process(PROC_NOCHLDINIT, "NATS Publisher", 1);
367
+			if(newpid < 0) {
368
+				LM_ERR("failed to fork worker process %d\n", i);
369
+				return -1;
370
+			} else if(newpid == 0) {
371
+				if(cfg_child_init())
372
+					return -1;
373
+				close(nats_pub_worker_pipes_fds[i * 2 + 1]);
374
+				cfg_update();
375
+				return (_nats_pub_worker_proc(&nats_pub_workers[i], nats_pub_worker_pipes_fds[i * 2]));
376
+			} else {
377
+				nats_pub_workers[i].pid = newpid;
378
+			}
379
+		}
258 380
 	}
259 381
 
260 382
 	return 0;
... ...
@@ -395,6 +517,10 @@ int nats_cleanup_init_servers()
395 517
 
396 518
 int nats_cleanup_connection(nats_connection_ptr c)
397 519
 {
520
+	if(c->conn != NULL) {
521
+		natsConnection_Close(c->conn);
522
+		natsConnection_Destroy(c->conn);
523
+	}
398 524
 	if(c->opts != NULL) {
399 525
 		natsOptions_Destroy(c->opts);
400 526
 	}
... ...
@@ -411,6 +537,7 @@ int nats_destroy_workers()
411 537
 {
412 538
 	int i;
413 539
 	nats_consumer_worker_t *worker;
540
+	nats_pub_worker_t *pub_worker;
414 541
 	for(i = 0; i < _nats_proc_count; i++) {
415 542
 		worker = &nats_workers[i];
416 543
 		if(worker != NULL) {
... ...
@@ -418,10 +545,6 @@ int nats_destroy_workers()
418 545
 				natsSubscription_Unsubscribe(worker->subscription);
419 546
 				natsSubscription_Destroy(worker->subscription);
420 547
 			}
421
-			if(worker->conn != NULL) {
422
-				natsConnection_Close(worker->conn);
423
-				natsConnection_Destroy(worker->conn);
424
-			}
425 548
 			if(worker->uvLoop != NULL) {
426 549
 				uv_loop_close(worker->uvLoop);
427 550
 			}
... ...
@@ -431,12 +554,30 @@ int nats_destroy_workers()
431 554
 			if(worker->queue_group != NULL) {
432 555
 				shm_free(worker->queue_group);
433 556
 			}
557
+			if(worker->nc != NULL) {
558
+				if(nats_cleanup_connection(worker->nc) < 0) {
559
+					LM_ERR("could not cleanup worker connection\n");
560
+				}
561
+			}
434 562
 			if(worker->on_message != NULL) {
435 563
 				shm_free(worker->on_message);
436 564
 			}
437 565
 			shm_free(worker);
438 566
 		}
439 567
 	}
568
+
569
+	for(i = 0; i < nats_pub_workers_num; i++) {
570
+		pub_worker = &nats_pub_workers[i];
571
+		if(pub_worker != NULL) {
572
+			if(pub_worker->nc != NULL) {
573
+				if(nats_cleanup_connection(pub_worker->nc) < 0) {
574
+					LM_ERR("could not cleanup worker connection\n");
575
+				}
576
+			}
577
+			uv_poll_stop(&pub_worker->poll);
578
+			shm_free(pub_worker);
579
+		}
580
+	}
440 581
 	return 0;
441 582
 }
442 583
 
... ...
@@ -451,12 +592,15 @@ static void mod_destroy(void)
451 592
 	if(nats_cleanup_init_sub() < 0) {
452 593
 		LM_INFO("could not cleanup init data\n");
453 594
 	}
454
-	if(nats_cleanup_connection(_nats_connection) < 0) {
455
-		LM_INFO("could not cleanup connection\n");
456
-	}
457 595
 	if(nats_cleanup_init_servers() < 0) {
458 596
 		LM_INFO("could not cleanup init server data\n");
459 597
 	}
598
+	if(nats_pub_worker_pipes_fds) {
599
+		shm_free(nats_pub_worker_pipes_fds);
600
+	}
601
+	if(nats_pub_worker_pipes) {
602
+		shm_free(nats_pub_worker_pipes);
603
+	}
460 604
 }
461 605
 
462 606
 int _init_nats_server_url_add(modparam_t type, void *val)
... ...
@@ -26,63 +26,20 @@
26 26
 #define __NATS_MOD_H_
27 27
 
28 28
 #include <stdio.h>
29
-#include <nats/nats.h>
30 29
 #include <nats/adapters/libuv.h>
31 30
 #include "../json/api.h"
32 31
 #include "../../core/cfg/cfg_struct.h"
33 32
 #include "../../core/fmsg.h"
34 33
 
35
-#define NATS_DEFAULT_URL "nats://localhost:4222"
36
-#define NATS_MAX_SERVERS 10
37
-#define NATS_URL_MAX_SIZE 256
38
-
39
-typedef struct _nats_evroutes
40
-{
41
-	int connected;
42
-	int disconnected;
43
-} nats_evroutes_t;
44
-static nats_evroutes_t _nats_rts;
45
-
46
-typedef struct _init_nats_sub
47
-{
48
-	char *sub;
49
-	char *queue_group;
50
-	struct _init_nats_sub *next;
51
-} init_nats_sub, *init_nats_sub_ptr;
52
-
53
-typedef struct _init_nats_server
54
-{
55
-	char *url;
56
-	struct _init_nats_server *next;
57
-} init_nats_server, *init_nats_server_ptr;
58
-
59
-typedef struct _nats_on_message
60
-{
61
-	int rt;
62
-} nats_on_message, *nats_on_message_ptr;
63
-
64
-typedef struct _nats_connection
65
-{
66
-	natsOptions *opts;
67
-	char *servers[NATS_MAX_SERVERS];
68
-} nats_connection, *nats_connection_ptr;
69
-
70
-struct nats_consumer_worker
71
-{
72
-	char *subject;
73
-	char *queue_group;
74
-	int pid;
75
-	natsConnection *conn;
76
-	natsSubscription *subscription;
77
-	uv_loop_t *uvLoop;
78
-	nats_on_message_ptr on_message;
79
-};
80
-typedef struct nats_consumer_worker nats_consumer_worker_t;
81
-
82 34
 static int mod_init(void);
83 35
 static int mod_child_init(int);
84 36
 static void mod_destroy(void);
85 37
 
38
+extern int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload);
39
+extern int fixup_publish_get_value(void **param, int param_no);
40
+extern int fixup_publish_get_value_free(void **param, int param_no);
41
+extern void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events);
42
+
86 43
 int nats_run_cfg_route(int rt);
87 44
 void nats_init_environment();
88 45
 
... ...
@@ -100,7 +57,8 @@ int init_nats_sub_add(char *sub);
100 57
 int nats_cleanup_init_sub();
101 58
 
102 59
 void nats_consumer_worker_proc(
103
-		nats_consumer_worker_t *worker, nats_connection_ptr c);
60
+		nats_consumer_worker_t *worker);
104 61
 int nats_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *);
105 62
 
63
+
106 64
 #endif
107 65
new file mode 100644
... ...
@@ -0,0 +1,130 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#include "defs.h"
26
+#include "nats_pub.h"
27
+
28
+extern int *nats_pub_worker_pipes;
29
+extern int nats_pub_workers_num;
30
+extern nats_pub_worker_t *nats_pub_workers;
31
+int pub_worker = 0;
32
+
33
+int fixup_publish_get_value(void **param, int param_no)
34
+{
35
+	if(param_no == 1 || param_no == 2) {
36
+		return fixup_spve_null(param, 1);
37
+	}
38
+	LM_ERR("invalid parameter number <%d>\n", param_no);
39
+	return -1;
40
+}
41
+
42
+int fixup_publish_get_value_free(void **param, int param_no)
43
+{
44
+	if(param_no == 1 || param_no == 2) {
45
+		fixup_free_spve_null(param, 1);
46
+		return 0;
47
+	}
48
+	LM_ERR("invalid parameter number <%d>\n", param_no);
49
+	return -1;
50
+}
51
+
52
+nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload)
53
+{
54
+	nats_pub_delivery_ptr p =
55
+			(nats_pub_delivery_ptr)shm_malloc(sizeof(nats_pub_delivery));
56
+	memset(p, 0, sizeof(nats_pub_delivery));
57
+
58
+	p->subject = shm_malloc(subject.len + 1);
59
+	strcpy(p->subject, subject.s);
60
+	p->subject[subject.len] = '\0';
61
+
62
+	p->payload = shm_malloc(payload.len + 1);
63
+	strcpy(p->payload, payload.s);
64
+	p->payload[payload.len] = '\0';
65
+
66
+	return p;
67
+}
68
+
69
+static int _w_nats_publish_f(str subj, str payload, int worker)
70
+{
71
+	nats_pub_delivery_ptr ptr = _nats_pub_delivery_new(subj, payload);
72
+	if(write(nats_pub_worker_pipes[worker], &ptr, sizeof(ptr)) != sizeof(ptr)) {
73
+		LM_ERR("failed to publish message %d, write to "
74
+			   "command pipe: %s\n",
75
+				getpid(), strerror(errno));
76
+	}
77
+	return 1;
78
+}
79
+
80
+int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload)
81
+{
82
+	str subj_s = STR_NULL;
83
+	str payload_s = STR_NULL;
84
+	if(fixup_get_svalue(msg, (gparam_t *)subj, &subj_s) < 0) {
85
+		LM_ERR("failed to get subj value\n");
86
+		return -1;
87
+	}
88
+	if(fixup_get_svalue(msg, (gparam_t *)payload, &payload_s) < 0) {
89
+		LM_ERR("failed to get subj value\n");
90
+		return -1;
91
+	}
92
+
93
+	// round-robin pub workers
94
+	pub_worker++;
95
+	if(pub_worker >= nats_pub_workers_num) {
96
+		pub_worker = 0;
97
+	}
98
+
99
+	return _w_nats_publish_f(subj_s, payload_s, pub_worker);
100
+}
101
+
102
+void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events)
103
+{
104
+	natsStatus s = NATS_OK;
105
+	nats_pub_delivery_ptr ptr;
106
+	nats_pub_worker_t *worker =
107
+			(nats_pub_worker_t *)uv_handle_get_data((uv_handle_t *)handle);
108
+
109
+	if(read(worker->fd, &ptr, sizeof(ptr)) != sizeof(ptr)) {
110
+		LM_ERR("failed to read from command pipe: %s\n", strerror(errno));
111
+		return;
112
+	}
113
+	if((s = natsConnection_PublishString(worker->nc->conn, ptr->subject, ptr->payload))
114
+			!= NATS_OK) {
115
+		LM_ERR("could not publish to subject [%s] payload [%s] error [%s]\n", ptr->subject, ptr->payload,
116
+				natsStatus_GetText(s));
117
+	}
118
+	nats_pub_free_delivery_ptr(ptr);
119
+}
120
+
121
+void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr)
122
+{
123
+	if(ptr == NULL)
124
+		return;
125
+	if(ptr->subject)
126
+		shm_free(ptr->subject);
127
+	if(ptr->payload)
128
+		shm_free(ptr->payload);
129
+	shm_free(ptr);
130
+}
0 131
new file mode 100644
... ...
@@ -0,0 +1,44 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#ifndef __NATS_PUB_H_
26
+#define __NATS_PUB_H_
27
+
28
+#include <fcntl.h>
29
+#include "../../core/fmsg.h"
30
+#include "../../core/mod_fix.h"
31
+
32
+typedef struct _nats_pub_delivery
33
+{
34
+	char *subject;
35
+	char *payload;
36
+} nats_pub_delivery, *nats_pub_delivery_ptr;
37
+
38
+nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload);
39
+void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr);
40
+int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload);
41
+int fixup_publish_get_value(void **param, int param_no);
42
+int fixup_publish_get_value_free(void **param, int param_no);
43
+
44
+#endif