Browse code

sworker: new module to delegate sip message processing to a group of workers

Daniel-Constantin Mierla authored on 25/11/2020 13:05:31
Showing 6 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,9 @@
1
+#
2
+# WARNING: do not run this directly, it should be run by the main Makefile
3
+
4
+include ../../Makefile.defs
5
+auto_gen=
6
+NAME=sworker.so
7
+LIBS=
8
+
9
+include ../../Makefile.modules
0 10
new file mode 100644
... ...
@@ -0,0 +1,119 @@
1
+SWORKER Module
2
+
3
+Daniel-Constantin Mierla
4
+
5
+   <miconda@gmail.com>
6
+
7
+Edited by
8
+
9
+Daniel-Constantin Mierla
10
+
11
+   <miconda@gmail.com>
12
+
13
+   Copyright � 2020 asipto.com
14
+     __________________________________________________________________
15
+
16
+   Table of Contents
17
+
18
+   1. Admin Guide
19
+
20
+        1. Overview
21
+        2. Dependencies
22
+
23
+              2.1. Kamailio Modules
24
+              2.2. External Libraries or Applications
25
+
26
+        3. Functions
27
+
28
+              3.1. sworker_active()
29
+              3.2. swork_task(gname)
30
+
31
+   List of Examples
32
+
33
+   1.1. sworker_active() usage
34
+   1.2. sworker_task() usage
35
+
36
+Chapter 1. Admin Guide
37
+
38
+   Table of Contents
39
+
40
+   1. Overview
41
+   2. Dependencies
42
+
43
+        2.1. Kamailio Modules
44
+        2.2. External Libraries or Applications
45
+
46
+   3. Functions
47
+
48
+        3.1. sworker_active()
49
+        3.2. swork_task(gname)
50
+
51
+1. Overview
52
+
53
+   This module can delegate processing of SIP requests to a group of
54
+   workers in the configuration file. The async workers have to defined
55
+   with the global parameter.
56
+
57
+   It does not create the transaction and nor suspend it.
58
+
59
+2. Dependencies
60
+
61
+   2.1. Kamailio Modules
62
+   2.2. External Libraries or Applications
63
+
64
+2.1. Kamailio Modules
65
+
66
+   The following modules must be loaded before this module:
67
+     * None.
68
+
69
+2.2. External Libraries or Applications
70
+
71
+   The following libraries or applications must be installed before
72
+   running Kamailio with this module loaded:
73
+     * None
74
+
75
+3. Functions
76
+
77
+   3.1. sworker_active()
78
+   3.2. swork_task(gname)
79
+
80
+3.1. sworker_active()
81
+
82
+   Return 1 (true) if the processing happens in an asyn process, or -1
83
+   (false) if the processing is happening in a SIP receiving process.
84
+
85
+   This function can be used from REQUEST_ROUTE|CORE_REPLY_ROUTE.
86
+
87
+   Example 1.1. sworker_active() usage
88
+...
89
+request_route {
90
+    ...
91
+    if(sworker_active()) {
92
+    }
93
+    ...
94
+}
95
+...
96
+
97
+3.2. swork_task(gname)
98
+
99
+   Delegate the processing of SIP message to a group of async workers.
100
+
101
+   The parameter gname provides the name of the group workers, it can
102
+   contain pseudo-variables.
103
+
104
+   The function returns 0 (exit) in case the task is delegated.
105
+
106
+   This function can be used from REQUEST_ROUTE|CORE_REPLY_ROUTE.
107
+
108
+   Example 1.2. sworker_task() usage
109
+...
110
+request_route {
111
+        if(!sworker_active()) {
112
+                xinfo("===== delegate processing [$Tf] [$si:$sp]\n");
113
+                sworker_task("default");
114
+                exit;
115
+        }
116
+        xinfo("===== processing continues [$Tf] [$si:$sp]\n");
117
+    ...
118
+}
119
+...
0 120
new file mode 100644
... ...
@@ -0,0 +1,4 @@
1
+docs = sworker.xml
2
+
3
+docbook_dir = ../../../../doc/docbook
4
+include $(docbook_dir)/Makefile.module
0 5
new file mode 100644
... ...
@@ -0,0 +1,36 @@
1
+<?xml version="1.0" encoding='ISO-8859-1'?>
2
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
3
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
4
+
5
+<!-- Include general documentation entities -->
6
+<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
7
+%docentities;
8
+
9
+]>
10
+
11
+<book>
12
+    <bookinfo>
13
+	<title>SWORKER  Module</title>
14
+	<productname class="trade">kamailio.org</productname>
15
+	<authorgroup>
16
+	    <author>
17
+		<firstname>Daniel-Constantin</firstname>
18
+		<surname>Mierla</surname>
19
+		<email>miconda@gmail.com</email>
20
+	    </author>
21
+	    <editor>
22
+		<firstname>Daniel-Constantin</firstname>
23
+		<surname>Mierla</surname>
24
+		<email>miconda@gmail.com</email>
25
+	    </editor>
26
+	</authorgroup>
27
+	<copyright>
28
+	    <year>2020</year>
29
+	    <holder>asipto.com</holder>
30
+	</copyright>
31
+    </bookinfo>
32
+    <toc></toc>
33
+
34
+    <xi:include  xmlns:xi="http://www.w3.org/2001/XInclude" href="sworker_admin.xml"/>
35
+
36
+</book>
0 37
new file mode 100644
... ...
@@ -0,0 +1,122 @@
1
+<?xml version="1.0" encoding='ISO-8859-1'?>
2
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
3
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
4
+
5
+<!-- Include general documentation entities -->
6
+<!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
7
+%docentities;
8
+
9
+]>
10
+<!-- Module User's Guide -->
11
+
12
+<chapter>
13
+
14
+	<title>&adminguide;</title>
15
+
16
+	<section>
17
+	<title>Overview</title>
18
+	<para>
19
+		This module can delegate processing of SIP requests to a group of
20
+		workers in the configuration file. The async workers have to defined
21
+		with the global parameter.
22
+	</para>
23
+	<para>
24
+		It does not create the transaction and nor suspend it.
25
+	</para>
26
+	</section>
27
+
28
+	<section>
29
+	<title>Dependencies</title>
30
+	<section>
31
+		<title>&kamailio; Modules</title>
32
+		<para>
33
+		The following modules must be loaded before this module:
34
+			<itemizedlist>
35
+			<listitem>
36
+			<para>
37
+				<emphasis>None</emphasis>.
38
+			</para>
39
+			</listitem>
40
+			</itemizedlist>
41
+		</para>
42
+	</section>
43
+	<section>
44
+		<title>External Libraries or Applications</title>
45
+		<para>
46
+		The following libraries or applications must be installed before running
47
+		&kamailio; with this module loaded:
48
+			<itemizedlist>
49
+			<listitem>
50
+			<para>
51
+				<emphasis>None</emphasis>
52
+			</para>
53
+			</listitem>
54
+			</itemizedlist>
55
+		</para>
56
+	</section>
57
+	</section>
58
+
59
+	<section>
60
+	<title>Functions</title>
61
+	<section id="sworker.f.sworker_active">
62
+		<title>
63
+		<function moreinfo="none">sworker_active()</function>
64
+		</title>
65
+		<para>
66
+		Return 1 (true) if the processing happens in an asyn process, or -1
67
+		(false) if the processing is happening in a SIP receiving process.
68
+		</para>
69
+		<para>
70
+		This function can be used from REQUEST_ROUTE|CORE_REPLY_ROUTE.
71
+		</para>
72
+		<example>
73
+		<title><function>sworker_active()</function> usage</title>
74
+		<programlisting format="linespecific">
75
+...
76
+request_route {
77
+    ...
78
+    if(sworker_active()) {
79
+    }
80
+    ...
81
+}
82
+...
83
+</programlisting>
84
+		</example>
85
+	</section>
86
+	<section id="sworker.f.swork_task">
87
+		<title>
88
+		<function moreinfo="none">swork_task(gname)</function>
89
+		</title>
90
+		<para>
91
+		Delegate the processing of SIP message to a group of async workers.
92
+		</para>
93
+		<para>
94
+		The parameter gname provides the name of the group workers, it can
95
+		contain pseudo-variables.
96
+		</para>
97
+		<para>
98
+		The function returns 0 (exit) in case the task is delegated.
99
+		</para>
100
+		<para>
101
+		This function can be used from REQUEST_ROUTE|CORE_REPLY_ROUTE.
102
+		</para>
103
+		<example>
104
+		<title><function>sworker_task()</function> usage</title>
105
+		<programlisting format="linespecific">
106
+...
107
+request_route {
108
+	if(!sworker_active()) {
109
+		xinfo("===== delegate processing [$Tf] [$si:$sp]\n");
110
+		sworker_task("default");
111
+		exit;
112
+	}
113
+	xinfo("===== processing continues [$Tf] [$si:$sp]\n");
114
+    ...
115
+}
116
+...
117
+</programlisting>
118
+		</example>
119
+	</section>
120
+
121
+	</section>
122
+</chapter>
0 123
new file mode 100644
... ...
@@ -0,0 +1,228 @@
1
+/**
2
+ * Copyright (C) 2020 Daniel-Constantin Mierla (asipto.com)
3
+ *
4
+ * This file is part of Kamailio, a free SIP server.
5
+ *
6
+ * This file is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version
10
+ *
11
+ *
12
+ * This file is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License
18
+ * along with this program; if not, write to the Free Software
19
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+ *
21
+ */
22
+
23
+#include <stdio.h>
24
+#include <unistd.h>
25
+#include <stdlib.h>
26
+#include <string.h>
27
+
28
+#include "../../core/sr_module.h"
29
+#include "../../core/dprint.h"
30
+#include "../../core/ut.h"
31
+#include "../../core/fmsg.h"
32
+#include "../../core/receive.h"
33
+#include "../../core/mod_fix.h"
34
+#include "../../core/async_task.h"
35
+#include "../../core/kemi.h"
36
+
37
+MODULE_VERSION
38
+
39
+static int mod_init(void);
40
+static int child_init(int);
41
+static void mod_destroy(void);
42
+
43
+static int w_sworker_task(sip_msg_t *msg, char *pgname, char *p2);
44
+static int w_sworker_active(sip_msg_t *msg, char *p1, char *p2);
45
+
46
+static int _sworker_active = 0;
47
+
48
+/* clang-format off */
49
+typedef struct sworker_task_param {
50
+	char *buf;
51
+	int len;
52
+	receive_info_t rcv;
53
+} sworker_task_param_t;
54
+
55
+static cmd_export_t cmds[]={
56
+	{"sworker_task", (cmd_function)w_sworker_task, 1, fixup_spve_null,
57
+		fixup_free_spve_null, REQUEST_ROUTE|CORE_ONREPLY_ROUTE},
58
+	{"sworker_active", (cmd_function)w_sworker_active, 0, 0,
59
+		0, REQUEST_ROUTE|CORE_ONREPLY_ROUTE},
60
+	{0, 0, 0, 0, 0, 0}
61
+};
62
+
63
+struct module_exports exports = {
64
+	"sworker",
65
+	DEFAULT_DLFLAGS, /* dlopen flags */
66
+	cmds,
67
+	0,
68
+	0,              /* exported RPC methods */
69
+	0,              /* exported pseudo-variables */
70
+	0,              /* response function */
71
+	mod_init,       /* module initialization function */
72
+	child_init,     /* per child init function */
73
+	mod_destroy    	/* destroy function */
74
+};
75
+/* clang-format on */
76
+
77
+
78
+/**
79
+ * init module function
80
+ */
81
+static int mod_init(void)
82
+{
83
+	return 0;
84
+}
85
+
86
+/**
87
+ * @brief Initialize async module children
88
+ */
89
+static int child_init(int rank)
90
+{
91
+	return 0;
92
+}
93
+
94
+/**
95
+ * destroy module function
96
+ */
97
+static void mod_destroy(void)
98
+{
99
+}
100
+
101
+/**
102
+ *
103
+ */
104
+/**
105
+ *
106
+ */
107
+void sworker_exec_task(void *param)
108
+{
109
+	sworker_task_param_t *stp;
110
+	static char buf[BUF_SIZE+1];
111
+	receive_info_t rcvi;
112
+	int len;
113
+
114
+	stp = (sworker_task_param_t *)param;
115
+
116
+	LM_DBG("received task [%p] - msg len [%d]\n", stp, stp->len);
117
+	if(stp->len > BUF_SIZE) {
118
+		LM_ERR("message is too large [%d]\n", stp->len);
119
+		return;
120
+	}
121
+
122
+	memcpy(buf, stp->buf, stp->len);
123
+	len = stp->len;
124
+	memcpy(&rcvi, &stp->rcv, sizeof(receive_info_t));
125
+
126
+	_sworker_active = 1;
127
+	receive_msg(buf, len, &rcvi);
128
+	_sworker_active = 0;
129
+}
130
+
131
+/**
132
+ *
133
+ */
134
+int sworker_send_task(sip_msg_t *msg, str *gname)
135
+{
136
+	async_task_t *at = NULL;
137
+	sworker_task_param_t *stp = NULL;
138
+	int dsize;
139
+
140
+	dsize = sizeof(async_task_t) + sizeof(sworker_task_param_t)
141
+		+ (msg->len+1)*sizeof(char);
142
+	at = (async_task_t *)shm_malloc(dsize);
143
+	if(at == NULL) {
144
+		LM_ERR("no more shm memory\n");
145
+		return -1;
146
+	}
147
+	memset(at, 0, dsize);
148
+	at->exec = sworker_exec_task;
149
+	at->param = (char *)at + sizeof(async_task_t);
150
+	stp = (sworker_task_param_t *)at->param;
151
+	stp->buf = (char*)stp+sizeof(sworker_task_param_t);
152
+	memcpy(stp->buf, msg->buf, msg->len);
153
+	stp->len = msg->len;
154
+	memcpy(&stp->rcv, &msg->rcv, sizeof(receive_info_t));
155
+
156
+	return async_task_group_push(gname, at);
157
+}
158
+
159
+/**
160
+ *
161
+ */
162
+int ki_sworker_task(sip_msg_t *msg, str *gname)
163
+{
164
+	if(msg==NULL || faked_msg_match(msg)) {
165
+		LM_ERR("invalid usage for null or faked message\n");
166
+		return -1;
167
+	}
168
+
169
+	if(sworker_send_task(msg, gname) < 0) {
170
+		return -1;
171
+	}
172
+
173
+	/* force exit in config */
174
+	return 0;
175
+}
176
+
177
+/**
178
+ *
179
+ */
180
+static int w_sworker_task(sip_msg_t *msg, char *pgname, char *p2)
181
+{
182
+	str gname;
183
+
184
+	if(msg == NULL) {
185
+		return -1;
186
+	}
187
+
188
+	if(fixup_get_svalue(msg, (gparam_t *)pgname, &gname) != 0) {
189
+		LM_ERR("no async route block name\n");
190
+		return -1;
191
+	}
192
+	return ki_sworker_task(msg, &gname);
193
+}
194
+
195
+/**
196
+ *
197
+ */
198
+static int w_sworker_active(sip_msg_t *msg, char *p1, char *p2)
199
+{
200
+	if(_sworker_active==0) {
201
+		return -1;
202
+	}
203
+	return 1;
204
+}
205
+
206
+/**
207
+ *
208
+ */
209
+/* clang-format off */
210
+static sr_kemi_t sr_kemi_sworker_exports[] = {
211
+	{ str_init("sworker"), str_init("task"),
212
+		SR_KEMIP_INT, ki_sworker_task,
213
+		{ SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE,
214
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
215
+	},
216
+
217
+	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
218
+};
219
+/* clang-format on */
220
+
221
+/**
222
+ *
223
+ */
224
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
225
+{
226
+	sr_kemi_modules_add(sr_kemi_sworker_exports);
227
+	return 0;
228
+}