Browse code

nats: new nats message consumer module

Emmanuel Schmidbauer authored on 27/06/2021 11:58:58
Showing 6 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,15 @@
1
+#
2
+# NATS module
3
+#
4
+#
5
+# WARNING: do not run this directly, it should be run by the master Makefile
6
+
7
+include ../../Makefile.defs
8
+
9
+auto_gen=
10
+NAME=nats.so
11
+
12
+LIBS=-lnats -luv
13
+DEFS+=-I$(LOCALBASE)/include -I/usr/local/include
14
+
15
+include ../../Makefile.modules
0 16
new file mode 100644
... ...
@@ -0,0 +1,4 @@
1
+docs = nats.xml
2
+
3
+docbook_dir = ../../../../doc/docbook
4
+include $(docbook_dir)/Makefile.module
0 5
new file mode 100644
... ...
@@ -0,0 +1,42 @@
1
+<?xml version="1.0" encoding='ISO-8859-1'?>
2
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN" "http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
3
+	<!-- Include general documentation entities -->
4
+	<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
5
+	%docentities;
6
+]>
7
+
8
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
9
+	<bookinfo>
10
+		<title>NATS Module</title>
11
+		<productname class="trade">&kamailioname;</productname>
12
+		<authorgroup>
13
+			<author>
14
+				<firstname>Emmanuel</firstname>
15
+				<surname>Schmidbauer</surname>
16
+				<email>eschmidbauer@gmail.com</email>
17
+			</author>
18
+			<author>
19
+				<firstname>Joe</firstname>
20
+				<surname>Mordica</surname>
21
+				<email>joe@voxo.co</email>
22
+			</author>
23
+			<editor>
24
+				<firstname>Emmanuel</firstname>
25
+				<surname>Schmidbauer</surname>
26
+				<email>eschmidbauer@gmail.com</email>
27
+			</editor>
28
+		</authorgroup>
29
+		<copyright>
30
+			<year>2021</year>
31
+			<holder>Voxcom Inc</holder>
32
+		</copyright>
33
+		<copyright>
34
+			<year>2021</year>
35
+			<holder>VOXO</holder>
36
+		</copyright>
37
+	</bookinfo>
38
+	<toc></toc>
39
+
40
+	<xi:include href="nats_admin.xml" />
41
+
42
+</book>
0 43
\ No newline at end of file
1 44
new file mode 100644
... ...
@@ -0,0 +1,207 @@
1
+<?xml version="1.0" encoding='ISO-8859-1'?>
2
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN" "http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
3
+	<!-- Include general documentation entities -->
4
+	<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
5
+	%docentities;
6
+]>
7
+<!-- Module User's Guide -->
8
+
9
+<chapter xmlns:xi="http://www.w3.org/2001/XInclude">
10
+	<title>&adminguide;</title>
11
+
12
+
13
+	<section>
14
+		<title>Overview</title>
15
+		<para>
16
+			The module provides an NATS consumer for &kamailio;.
17
+			NATS is a real time distributed messaging platform, more details about it
18
+			can be found at
19
+			<ulink url="https://nats.io">nats.io</ulink>
20
+			.
21
+		</para>
22
+		<para>
23
+			From a high-level perspective, the module may be used for:
24
+			<itemizedlist>
25
+				<listitem>
26
+					<para>
27
+						Provide a real-time distributed messaging layer in &kamailio;
28
+					</para>
29
+				</listitem>
30
+			</itemizedlist>
31
+		</para>
32
+
33
+
34
+		<para>
35
+			Supported NATS operations are:
36
+			<itemizedlist>
37
+				<listitem>
38
+					<para>
39
+						Subscribe to a Subject and Queue Group
40
+					</para>
41
+				</listitem>
42
+			</itemizedlist>
43
+		</para>
44
+
45
+	</section>
46
+	<section>
47
+		<title>How it works</title>
48
+		<para>
49
+			The module creates invokes a consumer process for each defined `subject_queue_group`. The messages are visible in event routes matching the "subject" name.
50
+		</para>
51
+	</section>
52
+
53
+	<section>
54
+		<title>Dependencies</title>
55
+		<section>
56
+			<title>&kamailio; Modules</title>
57
+			<para>
58
+				The following modules must be loaded before this module:
59
+				<itemizedlist>
60
+					<listitem>
61
+						<para>
62
+							<emphasis>none</emphasis>
63
+							.
64
+						</para>
65
+					</listitem>
66
+				</itemizedlist>
67
+			</para>
68
+		</section>
69
+		<section>
70
+			<title>External Libraries or Applications</title>
71
+			<para>
72
+				The following libraries or applications must be installed
73
+				<itemizedlist>
74
+					<listitem>
75
+						<para>
76
+							<emphasis>libuv</emphasis>
77
+						</para>
78
+					</listitem>
79
+					<listitem>
80
+						<para>
81
+							<emphasis>nats.c</emphasis>
82
+							-
83
+							<ulink url="https://github.com/nats-io/nats.c/releases">https://github.com/nats-io/nats.c/releases</ulink>
84
+						</para>
85
+					</listitem>
86
+				</itemizedlist>
87
+			</para>
88
+		</section>
89
+	</section>
90
+
91
+
92
+	<section>
93
+		<title>Parameters</title>
94
+		<section>
95
+			<title>
96
+				<varname>nats_url</varname>
97
+				(str)
98
+			</title>
99
+			<para>
100
+				The nats url.
101
+			</para>
102
+			<para>
103
+				Usage: nats related.
104
+			</para>
105
+			<para>
106
+				<emphasis>Default value is nats://127.0.0.1:4222</emphasis>
107
+			</para>
108
+			<example>
109
+				<title>
110
+					Set
111
+					<varname>nats_url</varname>
112
+					parameter
113
+				</title>
114
+				<programlisting format="linespecific">
115
+...
116
+modparam("nats", "nats_url", "nats://127.0.0.1:4222")
117
+modparam("nats", "nats_url", "nats://user1:pass1127.0.1.2:4222") // with auth
118
+modparam("nats", "nats_url", "nats://127.1.2.3:4222")
119
+...
120
+</programlisting>
121
+			</example>
122
+		</section>
123
+		<section>
124
+			<title>
125
+				<varname>subject_queue_group</varname>
126
+				(str)
127
+			</title>
128
+			<para>
129
+				The NATS Subject and Queue Group. Separated by ":"
130
+			</para>
131
+			<para>
132
+				Usage: nats related.
133
+			</para>
134
+			<para>
135
+				<emphasis>Default value is not set.</emphasis>
136
+			</para>
137
+			<example>
138
+				<title>
139
+					Set
140
+					<varname>subject_queue_group</varname>
141
+					parameter
142
+				</title>
143
+				<programlisting format="linespecific">
144
+...
145
+modparam("nats", "subject_queue_group", "Kamailio-World:2020")
146
+modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject
147
+modparam("nats", "subject_queue_group", "MyQueue1:2021")
148
+modparam("nats", "subject_queue_group", "MyQueue2:2021")
149
+...
150
+				</programlisting>
151
+			</example>
152
+		</section>
153
+	</section>
154
+	<section>
155
+		<title>Pseudo Variables</title>
156
+		<itemizedlist>
157
+			<listitem>
158
+				<para>
159
+					<emphasis>$natsData</emphasis>
160
+					Contains the payload of a consumed message
161
+				</para>
162
+			</listitem>
163
+			<example>
164
+				<title>Example usage of $natsData pseudo variable</title>
165
+				<programlisting format="linespecific">
166
+	...
167
+	xlog("L_INFO", "received payload $natsData");
168
+}
169
+
170
+</programlisting>
171
+			</example>
172
+		</itemizedlist>
173
+	</section>
174
+
175
+	<section>
176
+		<title>Event Routes</title>
177
+		<para>
178
+				The worker process issues an event-route where we can act on the received payload. The name of the event-route name must match the subject of the message.
179
+			</para>
180
+		<example>
181
+			<title>Define the event routes</title>
182
+			<programlisting format="linespecific">
183
+...
184
+modparam("nats", "subject_queue_group", "Kamailio-World:2021")
185
+modparam("nats", "subject_queue_group", "MyQueue1:2021")
186
+...
187
+
188
+event_route[nats:Kamailio-World]
189
+{
190
+	if ($(natsData{json.parse,Event-Package}) == "dialog") {
191
+		xlog("L_INFO", "received $(natsData{json.parse,Event-Package}) update for $(natsData{json.parse,From})");
192
+		pua_json_publish($natsData);
193
+	}
194
+}
195
+
196
+event_route[nats:MyQueue1]
197
+{
198
+	xlog("L_INFO", "received $(natsData{json.parse,Event-Package}) update for $(natsData{json.parse,From})");
199
+	...
200
+}
201
+
202
+</programlisting>
203
+		</example>
204
+	</section>
205
+
206
+
207
+</chapter>
0 208
\ No newline at end of file
1 209
new file mode 100644
... ...
@@ -0,0 +1,569 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#include "nats_mod.h"
26
+
27
+MODULE_VERSION
28
+
29
+init_nats_sub_ptr _init_nats_sc = NULL;
30
+init_nats_server_ptr _init_nats_srv = NULL;
31
+nats_consumer_worker_t *nats_workers = NULL;
32
+int _nats_proc_count;
33
+char *eventData = NULL;
34
+
35
+static pv_export_t nats_mod_pvs[] = {
36
+		{{"natsData", (sizeof("natsData") - 1)}, PVT_OTHER,
37
+				nats_pv_get_event_payload, 0, 0, 0, 0, 0},
38
+		{{0, 0}, 0, 0, 0, 0, 0, 0, 0}};
39
+
40
+static param_export_t params[] = {{"nats_url", PARAM_STRING | USE_FUNC_PARAM,
41
+										  (void *)_init_nats_server_url_add},
42
+		{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
43
+				(void *)_init_nats_sub_add}};
44
+
45
+struct module_exports exports = {
46
+		"nats", DEFAULT_DLFLAGS, /* dlopen flags */
47
+		0,						 /* Exported functions */
48
+		params,					 /* Exported parameters */
49
+		0,						 /* exported MI functions */
50
+		nats_mod_pvs,			 /* exported pseudo-variables */
51
+		0,						 /* response function*/
52
+		mod_init,				 /* module initialization function */
53
+		mod_child_init,			 /* per-child init function */
54
+		mod_destroy				 /* destroy function */
55
+};
56
+
57
+static void onMsg(
58
+		natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
59
+{
60
+	nats_on_message_ptr on_message = (nats_on_message_ptr)closure;
61
+	char *s = (char *)natsMsg_GetSubject(msg);
62
+	char *data = (char *)natsMsg_GetData(msg);
63
+	if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) {
64
+		LM_INFO("event-route [nats:%s] does not exist\n", s);
65
+		goto end;
66
+	}
67
+	eventData = data;
68
+	nats_run_cfg_route(on_message->rt);
69
+
70
+end:
71
+	eventData = NULL;
72
+	natsMsg_Destroy(msg);
73
+}
74
+
75
+static void connectedCB(natsConnection *nc, void *closure)
76
+{
77
+	char url[NATS_URL_MAX_SIZE];
78
+	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
79
+	nats_run_cfg_route(_nats_rts.connected);
80
+}
81
+
82
+static void disconnectedCb(natsConnection *nc, void *closure)
83
+{
84
+	char url[NATS_URL_MAX_SIZE];
85
+	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
86
+	nats_run_cfg_route(_nats_rts.disconnected);
87
+}
88
+
89
+static void reconnectedCb(natsConnection *nc, void *closure)
90
+{
91
+	char url[NATS_URL_MAX_SIZE];
92
+	natsConnection_GetConnectedUrl(nc, url, sizeof(url));
93
+	nats_run_cfg_route(_nats_rts.connected);
94
+}
95
+
96
+static void closedCB(natsConnection *nc, void *closure)
97
+{
98
+	bool *closed = (bool *)closure;
99
+	const char *err = NULL;
100
+	natsConnection_GetLastError(nc, &err);
101
+	LM_INFO("connect failed: %s\n", err);
102
+	*closed = true;
103
+}
104
+
105
+void nats_consumer_worker_proc(
106
+		nats_consumer_worker_t *worker, const char *servers[])
107
+{
108
+	natsStatus s;
109
+	bool closed = false;
110
+
111
+	LM_INFO("nats worker connecting to subject [%s] queue group [%s]\n",
112
+			worker->subject, worker->queue_group);
113
+
114
+	s = natsOptions_Create(&worker->opts);
115
+	if(s != NATS_OK) {
116
+		LM_ERR("could not create nats options [%s]\n", natsStatus_GetText(s));
117
+		return;
118
+	}
119
+	// use these defaults
120
+	natsOptions_SetAllowReconnect(worker->opts, true);
121
+	natsOptions_SetSecure(worker->opts, false);
122
+	natsOptions_SetMaxReconnect(worker->opts, 10000);
123
+	natsOptions_SetReconnectWait(worker->opts, 2 * 1000);	  // 2s
124
+	natsOptions_SetPingInterval(worker->opts, 2 * 60 * 1000); // 2m
125
+	natsOptions_SetMaxPingsOut(worker->opts, 2);
126
+	natsOptions_SetIOBufSize(worker->opts, 32 * 1024); // 32 KB
127
+	natsOptions_SetMaxPendingMsgs(worker->opts, 65536);
128
+	natsOptions_SetTimeout(worker->opts, 2 * 1000);					// 2s
129
+	natsOptions_SetReconnectBufSize(worker->opts, 8 * 1024 * 1024); // 8 MB;
130
+	natsOptions_SetReconnectJitter(worker->opts, 100, 1000); // 100ms, 1s;
131
+	s = natsOptions_SetServers(worker->opts, servers, 1);
132
+	if(s != NATS_OK) {
133
+		LM_ERR("could not set nats server [%s]\n", natsStatus_GetText(s));
134
+	}
135
+	s = natsOptions_SetDisconnectedCB(worker->opts, disconnectedCb, NULL);
136
+	if(s != NATS_OK) {
137
+		LM_ERR("could not set disconnect callback [%s]\n",
138
+				natsStatus_GetText(s));
139
+	}
140
+	s = natsOptions_SetReconnectedCB(worker->opts, reconnectedCb, NULL);
141
+	if(s != NATS_OK) {
142
+		LM_ERR("could not set reconnect callback [%s]\n",
143
+				natsStatus_GetText(s));
144
+	}
145
+	s = natsOptions_SetRetryOnFailedConnect(
146
+			worker->opts, true, connectedCB, NULL);
147
+	if(s != NATS_OK) {
148
+		LM_ERR("could not set retry on failed callback [%s]\n",
149
+				natsStatus_GetText(s));
150
+	}
151
+	s = natsOptions_SetClosedCB(worker->opts, closedCB, (void *)&closed);
152
+	if(s != NATS_OK) {
153
+		LM_ERR("could not set closed callback [%s]\n", natsStatus_GetText(s));
154
+	}
155
+
156
+	s = natsConnection_Connect(&worker->conn, worker->opts);
157
+	if(s != NATS_OK) {
158
+		LM_ERR("could not connect [%s]\n", natsStatus_GetText(s));
159
+	}
160
+	// create a loop
161
+	natsLibuv_Init();
162
+	worker->uvLoop = uv_default_loop();
163
+	if(worker->uvLoop != NULL) {
164
+		natsLibuv_SetThreadLocalLoop(worker->uvLoop);
165
+	} else {
166
+		s = NATS_ERR;
167
+	}
168
+
169
+	s = natsOptions_SetEventLoop(worker->opts, (void *)worker->uvLoop,
170
+			natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write,
171
+			natsLibuv_Detach);
172
+	if(s != NATS_OK) {
173
+		LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s));
174
+	}
175
+
176
+	if(s) {
177
+		LM_ERR("error setting options [%s]\n", natsStatus_GetText(s));
178
+	}
179
+
180
+	s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn,
181
+			worker->subject, worker->queue_group, onMsg, worker->on_message);
182
+	if(s != NATS_OK) {
183
+		LM_ERR("could not subscribe [%s]\n", natsStatus_GetText(s));
184
+	}
185
+
186
+	s = natsSubscription_SetPendingLimits(worker->subscription, -1, -1);
187
+	if(s != NATS_OK) {
188
+		LM_ERR("could not set pending limits [%s]\n", natsStatus_GetText(s));
189
+	}
190
+
191
+	// Run the event loop.
192
+	// This call will return when the connection is closed (either after
193
+	// receiving all messages, or disconnected and unable to reconnect).
194
+	if(s == NATS_OK) {
195
+		uv_run(worker->uvLoop, UV_RUN_DEFAULT);
196
+	}
197
+	if(s != NATS_OK) {
198
+		LM_ERR("nats error [%s]\n", natsStatus_GetText(s));
199
+	}
200
+}
201
+
202
+static int mod_init(void)
203
+{
204
+	if(faked_msg_init() < 0) {
205
+		LM_ERR("failed to init faked sip message\n");
206
+		return -1;
207
+	}
208
+	nats_init_environment();
209
+	register_procs(_nats_proc_count);
210
+	nats_workers =
211
+			shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t));
212
+	if(nats_workers == NULL) {
213
+		LM_ERR("error in shm_malloc\n");
214
+		return -1;
215
+	}
216
+	memset(nats_workers, 0, _nats_proc_count * sizeof(nats_consumer_worker_t));
217
+	return 0;
218
+}
219
+
220
+int init_worker(
221
+		nats_consumer_worker_t *worker, char *subject, char *queue_group)
222
+{
223
+	int buffsize = strlen(subject) + 6;
224
+	char routename[buffsize];
225
+	int rt;
226
+	int len;
227
+	char *sc;
228
+	int num_servers = 0;
229
+	init_nats_server_ptr s0;
230
+
231
+	memset(worker, 0, sizeof(*worker));
232
+	worker->subject = shm_malloc(strlen(subject) + 1);
233
+	strcpy(worker->subject, subject);
234
+	worker->subject[strlen(subject)] = '\0';
235
+	worker->queue_group = shm_malloc(strlen(queue_group) + 1);
236
+	strcpy(worker->queue_group, queue_group);
237
+	worker->queue_group[strlen(queue_group)] = '\0';
238
+	memset(worker->init_nats_servers, 0, sizeof(worker->init_nats_servers));
239
+	worker->on_message =
240
+			(nats_on_message_ptr)shm_malloc(sizeof(nats_on_message));
241
+	memset(worker->on_message, 0, sizeof(nats_on_message));
242
+
243
+	s0 = _init_nats_srv;
244
+	while(s0) {
245
+		if(s0->url != NULL && num_servers < NATS_MAX_SERVERS) {
246
+			len = strlen(s0->url);
247
+			sc = shm_malloc(len + 1);
248
+			strcpy(sc, s0->url);
249
+			sc[len] = '\0';
250
+			worker->init_nats_servers[num_servers++] = sc;
251
+		}
252
+		s0 = s0->next;
253
+	}
254
+	if(num_servers == 0) {
255
+		worker->init_nats_servers[0] = NATS_DEFAULT_URL;
256
+		LM_INFO("using default server [%s]\n", NATS_DEFAULT_URL);
257
+	}
258
+
259
+	snprintf(routename, buffsize, "nats:%s", subject);
260
+	routename[buffsize] = '\0';
261
+
262
+	rt = route_get(&event_rt, routename);
263
+	if(rt < 0 || event_rt.rlist[rt] == NULL) {
264
+		LM_INFO("route [%s] does not exist\n", routename);
265
+		worker->on_message->rt = -1;
266
+		return 0;
267
+	}
268
+	worker->on_message->rt = rt;
269
+	return 0;
270
+}
271
+
272
+void worker_loop(int id)
273
+{
274
+	nats_consumer_worker_t *worker = &nats_workers[id];
275
+	nats_consumer_worker_proc(worker, (const char **)worker->init_nats_servers);
276
+	for(;;) {
277
+		sleep(1000);
278
+	}
279
+}
280
+
281
+/**
282
+ * @brief Initialize async module children
283
+ */
284
+static int mod_child_init(int rank)
285
+{
286
+	init_nats_sub_ptr n;
287
+	int i = 0;
288
+	int newpid;
289
+
290
+	if(rank == PROC_INIT) {
291
+		n = _init_nats_sc;
292
+		while(n) {
293
+			if(init_worker(&nats_workers[i], n->sub, n->queue_group) < 0) {
294
+				LM_ERR("failed to init struct for worker[%d]\n", i);
295
+				return -1;
296
+			}
297
+			n = n->next;
298
+			i++;
299
+		}
300
+		if(nats_cleanup_init_sub() < 0) {
301
+			LM_INFO("could not cleanup init data\n");
302
+		}
303
+		if(nats_cleanup_init_servers() < 0) {
304
+			LM_INFO("could not cleanup init server data\n");
305
+		}
306
+		return 0;
307
+	}
308
+
309
+	if(rank == PROC_MAIN) {
310
+		for(i = 0; i < _nats_proc_count; i++) {
311
+			newpid = fork_process(PROC_RPC, "NATS WORKER", 1);
312
+			if(newpid < 0) {
313
+				LM_ERR("failed to fork worker process %d\n", i);
314
+				return -1;
315
+			} else if(newpid == 0) {
316
+				worker_loop(i);
317
+			} else {
318
+				nats_workers[i].pid = newpid;
319
+			}
320
+		}
321
+		return 0;
322
+	}
323
+
324
+	return 0;
325
+}
326
+
327
+int nats_cleanup_init_sub()
328
+{
329
+	init_nats_sub_ptr n0;
330
+	init_nats_sub_ptr n1;
331
+	n0 = _init_nats_sc;
332
+	while(n0) {
333
+		n1 = n0->next;
334
+		if(n0->sub != NULL) {
335
+			shm_free(n0->sub);
336
+		}
337
+		if(n0->queue_group != NULL) {
338
+			shm_free(n0->queue_group);
339
+		}
340
+		shm_free(n0);
341
+		n0 = n1;
342
+	}
343
+	_init_nats_sc = NULL;
344
+	return 0;
345
+}
346
+
347
+int nats_cleanup_init_servers()
348
+{
349
+	init_nats_server_ptr s0;
350
+	init_nats_server_ptr s1;
351
+	s0 = _init_nats_srv;
352
+	while(s0) {
353
+		s1 = s0->next;
354
+		if(s0->url != NULL) {
355
+			shm_free(s0->url);
356
+		}
357
+		shm_free(s0);
358
+		s0 = s1;
359
+	}
360
+	_init_nats_srv = NULL;
361
+	return 0;
362
+}
363
+
364
+int nats_destroy_workers()
365
+{
366
+	int i;
367
+	int s;
368
+	nats_consumer_worker_t *worker;
369
+	for(i = 0; i < _nats_proc_count; i++) {
370
+		worker = &nats_workers[i];
371
+		natsSubscription_Unsubscribe(worker->subscription);
372
+		natsSubscription_Destroy(worker->subscription);
373
+		natsConnection_Close(worker->conn);
374
+		natsConnection_Destroy(worker->conn);
375
+		natsOptions_Destroy(worker->opts);
376
+		if(worker->uvLoop != NULL) {
377
+			uv_loop_close(worker->uvLoop);
378
+		}
379
+		nats_Close();
380
+		if(worker->subject != NULL) {
381
+			shm_free(worker->subject);
382
+		}
383
+		if(worker->queue_group != NULL) {
384
+			shm_free(worker->queue_group);
385
+		}
386
+		if(worker->on_message != NULL) {
387
+			shm_free(worker->on_message);
388
+		}
389
+		for(s = 0; s < NATS_MAX_SERVERS; s++) {
390
+			if(worker->init_nats_servers[s]) {
391
+				shm_free(worker->init_nats_servers[s]);
392
+			}
393
+		}
394
+		shm_free(worker);
395
+	}
396
+	return 0;
397
+}
398
+
399
+/**
400
+ * destroy module function
401
+ */
402
+static void mod_destroy(void)
403
+{
404
+	if(nats_destroy_workers() < 0) {
405
+		LM_ERR("could not cleanup workers\n");
406
+	}
407
+}
408
+
409
+int _init_nats_server_url_add(modparam_t type, void *val)
410
+{
411
+	char *url = (char *)val;
412
+	int len = strlen(url);
413
+	char *value;
414
+	if(len > NATS_URL_MAX_SIZE) {
415
+		LM_ERR("connection url exceeds max size %d\n", NATS_URL_MAX_SIZE);
416
+		return -1;
417
+	}
418
+	if(strncmp(url, "nats://", 7)) {
419
+		LM_ERR("invalid nats url [%s]\n", url);
420
+		return -1;
421
+	}
422
+	value = pkg_malloc(len + 1);
423
+	strcpy(value, url);
424
+	value[len] = '\0';
425
+	if(init_nats_server_url_add(url) < 0) {
426
+		LM_ERR("could not add server\n");
427
+	}
428
+	pkg_free(value);
429
+	return 0;
430
+}
431
+
432
+int _init_nats_sub_add(modparam_t type, void *val)
433
+{
434
+	char *sub = (char *)val;
435
+	int len = strlen(sub);
436
+	char *s = pkg_malloc(len + 1);
437
+	strcpy(s, sub);
438
+	s[len] = '\0';
439
+	if(init_nats_sub_add(s) < 0) {
440
+		LM_ERR("could not add init data\n");
441
+	}
442
+	pkg_free(s);
443
+	return 0;
444
+}
445
+
446
+/**
447
+ * Invoke a event route block
448
+ */
449
+int nats_run_cfg_route(int rt)
450
+{
451
+	struct run_act_ctx ctx;
452
+	sip_msg_t *fmsg;
453
+	sip_msg_t tmsg;
454
+
455
+	// check for valid route pointer
456
+	if(rt < 0) {
457
+		return 0;
458
+	}
459
+
460
+	fmsg = faked_msg_next();
461
+	memcpy(&tmsg, fmsg, sizeof(sip_msg_t));
462
+	fmsg = &tmsg;
463
+	set_route_type(EVENT_ROUTE);
464
+	init_run_actions_ctx(&ctx);
465
+	run_top_route(event_rt.rlist[rt], fmsg, 0);
466
+	return 0;
467
+}
468
+
469
+void nats_init_environment()
470
+{
471
+	memset(&_nats_rts, 0, sizeof(nats_evroutes_t));
472
+	_nats_rts.connected = route_lookup(&event_rt, "nats:connected");
473
+	if(_nats_rts.connected < 0 || event_rt.rlist[_nats_rts.connected] == NULL)
474
+		_nats_rts.connected = -1;
475
+
476
+	_nats_rts.disconnected = route_lookup(&event_rt, "nats:disconnected");
477
+	if(_nats_rts.disconnected < 0
478
+			|| event_rt.rlist[_nats_rts.disconnected] == NULL)
479
+		_nats_rts.disconnected = -1;
480
+}
481
+
482
+init_nats_server_ptr _init_nats_server_list_new(char *url)
483
+{
484
+	init_nats_server_ptr p =
485
+			(init_nats_server_ptr)shm_malloc(sizeof(init_nats_server));
486
+	memset(p, 0, sizeof(init_nats_server));
487
+	p->url = shm_malloc(strlen(url) + 1);
488
+	strcpy(p->url, url);
489
+	p->url[strlen(url)] = '\0';
490
+	return p;
491
+}
492
+
493
+int init_nats_server_url_add(char *url)
494
+{
495
+	init_nats_server_ptr n;
496
+	n = _init_nats_srv;
497
+	while(n != NULL) {
498
+		n = n->next;
499
+	}
500
+	n = _init_nats_server_list_new(url);
501
+	n->next = _init_nats_srv;
502
+	_init_nats_srv = n;
503
+	return 0;
504
+}
505
+
506
+init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group)
507
+{
508
+	init_nats_sub_ptr p = (init_nats_sub_ptr)shm_malloc(sizeof(init_nats_sub));
509
+	memset(p, 0, sizeof(init_nats_sub));
510
+	p->sub = shm_malloc(strlen(sub) + 1);
511
+	strcpy(p->sub, sub);
512
+	p->sub[strlen(sub)] = '\0';
513
+	p->queue_group = shm_malloc(strlen(queue_group) + 1);
514
+	strcpy(p->queue_group, queue_group);
515
+	p->queue_group[strlen(queue_group)] = '\0';
516
+	return p;
517
+}
518
+
519
+int init_nats_sub_add(char *sc)
520
+{
521
+	int len;
522
+	char *s;
523
+	char *c;
524
+	init_nats_sub_ptr n;
525
+
526
+	if(sc == NULL) {
527
+		return -1;
528
+	}
529
+
530
+	len = strlen(sc);
531
+	s = pkg_malloc(len + 1);
532
+	strcpy(s, sc);
533
+	s[len] = '\0';
534
+
535
+	if((c = strchr(s, ':')) != 0) {
536
+		*c = 0;
537
+		for(c = c + 1; !*c; c++)
538
+			;
539
+	}
540
+	if(s == NULL) {
541
+		goto error;
542
+		return -1;
543
+	}
544
+	if(c == NULL) {
545
+		goto error;
546
+		return -1;
547
+	}
548
+
549
+	n = _init_nats_sc;
550
+	while(n != NULL) {
551
+		n = n->next;
552
+	}
553
+	n = _init_nats_sub_new(s, c);
554
+	n->next = _init_nats_sc;
555
+	_init_nats_sc = n;
556
+	_nats_proc_count++;
557
+
558
+
559
+error:
560
+	pkg_free(s);
561
+	return 0;
562
+}
563
+
564
+int nats_pv_get_event_payload(
565
+		struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
566
+{
567
+	return eventData == NULL ? pv_get_null(msg, param, res)
568
+							 : pv_get_strzval(msg, param, res, eventData);
569
+}
0 570
new file mode 100644
... ...
@@ -0,0 +1,99 @@
1
+/*
2
+ * NATS module interface
3
+ *
4
+ * Copyright (C) 2021 Voxcom Inc
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License
19
+ * along with this program; if not, write to the Free Software
20
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21
+ *
22
+ *
23
+ */
24
+
25
+#ifndef __NATS_MOD_H_
26
+#define __NATS_MOD_H_
27
+
28
+#include <stdio.h>
29
+#include <nats/nats.h>
30
+#include <nats/adapters/libuv.h>
31
+#include "../json/api.h"
32
+#include "../../core/cfg/cfg_struct.h"
33
+#include "../../core/fmsg.h"
34
+
35
+#define NATS_DEFAULT_URL "nats://localhost:4222"
36
+#define NATS_MAX_SERVERS 10
37
+#define NATS_URL_MAX_SIZE 256
38
+
39
+typedef struct _nats_evroutes
40
+{
41
+	int connected;
42
+	int disconnected;
43
+} nats_evroutes_t;
44
+static nats_evroutes_t _nats_rts;
45
+
46
+typedef struct _init_nats_sub
47
+{
48
+	char *sub;
49
+	char *queue_group;
50
+	struct _init_nats_sub *next;
51
+} init_nats_sub, *init_nats_sub_ptr;
52
+
53
+typedef struct _init_nats_server
54
+{
55
+	char *url;
56
+	struct _init_nats_server *next;
57
+} init_nats_server, *init_nats_server_ptr;
58
+
59
+typedef struct _nats_on_message
60
+{
61
+	int rt;
62
+} nats_on_message, *nats_on_message_ptr;
63
+
64
+struct nats_consumer_worker
65
+{
66
+	char *subject;
67
+	char *queue_group;
68
+	int pid;
69
+	natsConnection *conn;
70
+	natsOptions *opts;
71
+	natsSubscription *subscription;
72
+	uv_loop_t *uvLoop;
73
+	nats_on_message_ptr on_message;
74
+	char *init_nats_servers[NATS_MAX_SERVERS];
75
+};
76
+typedef struct nats_consumer_worker nats_consumer_worker_t;
77
+
78
+static int mod_init(void);
79
+static int mod_child_init(int);
80
+static void mod_destroy(void);
81
+
82
+int nats_run_cfg_route(int rt);
83
+void nats_init_environment();
84
+
85
+int _init_nats_server_url_add(modparam_t type, void *val);
86
+init_nats_server_ptr _init_nats_server_list_new(char *url);
87
+int init_nats_server_url_add(char *url);
88
+int nats_cleanup_init_servers();
89
+
90
+int _init_nats_sub_add(modparam_t type, void *val);
91
+init_nats_sub_ptr _init_nats_sub_new(char *sub, char *queue_group);
92
+int init_nats_sub_add(char *sub);
93
+int nats_cleanup_init_sub();
94
+
95
+void nats_consumer_worker_proc(
96
+		nats_consumer_worker_t *worker, const char *init_nats_servers[]);
97
+int nats_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *);
98
+
99
+#endif