Browse code

sworker: added parameter to be able to pass extra data to worker

Daniel-Constantin Mierla authored on 24/09/2021 08:31:12
Showing 1 changed files
... ...
@@ -44,12 +44,15 @@ static int w_sworker_task(sip_msg_t *msg, char *pgname, char *p2);
44 44
 static int w_sworker_active(sip_msg_t *msg, char *p1, char *p2);
45 45
 
46 46
 static int _sworker_active = 0;
47
+static str _sworker_xdata = STR_NULL;
48
+static pv_spec_t *_sworker_xdata_spec = NULL;
47 49
 
48 50
 /* clang-format off */
49 51
 typedef struct sworker_task_param {
50 52
 	char *buf;
51 53
 	int len;
52 54
 	receive_info_t rcv;
55
+	str xdata;
53 56
 } sworker_task_param_t;
54 57
 
55 58
 static cmd_export_t cmds[]={
... ...
@@ -60,17 +63,22 @@ static cmd_export_t cmds[]={
60 63
 	{0, 0, 0, 0, 0, 0}
61 64
 };
62 65
 
66
+static param_export_t params[]={
67
+	{"xdata",    PARAM_STR,   &_sworker_xdata},
68
+	{0, 0, 0}
69
+};
70
+
63 71
 struct module_exports exports = {
64
-	"sworker",
72
+	"sworker",       /* module name */
65 73
 	DEFAULT_DLFLAGS, /* dlopen flags */
66
-	cmds,
67
-	0,
68
-	0,              /* exported RPC methods */
69
-	0,              /* exported pseudo-variables */
70
-	0,              /* response function */
71
-	mod_init,       /* module initialization function */
72
-	child_init,     /* per child init function */
73
-	mod_destroy    	/* destroy function */
74
+	cmds,            /* exported functions */
75
+	params,          /* exported parameters */
76
+	0,               /* exported RPC methods */
77
+	0,               /* exported pseudo-variables */
78
+	0,               /* response function */
79
+	mod_init,        /* module initialization function */
80
+	child_init,      /* per child init function */
81
+	mod_destroy      /* destroy function */
74 82
 };
75 83
 /* clang-format on */
76 84
 
... ...
@@ -80,6 +88,19 @@ struct module_exports exports = {
80 88
  */
81 89
 static int mod_init(void)
82 90
 {
91
+	if(_sworker_xdata.s!=NULL && _sworker_xdata.len>0) {
92
+		_sworker_xdata_spec = pv_cache_get(&_sworker_xdata);
93
+		if(_sworker_xdata_spec==NULL) {
94
+			LM_ERR("cannot get pv spec for [%.*s]\n",
95
+					_sworker_xdata.len, _sworker_xdata.s);
96
+			return -1;
97
+		}
98
+		if(_sworker_xdata_spec->setf==NULL) {
99
+			LM_ERR("read only output variable [%.*s]\n",
100
+					_sworker_xdata.len, _sworker_xdata.s);
101
+			return -1;
102
+		}
103
+	}
83 104
 	return 0;
84 105
 }
85 106
 
... ...
@@ -110,6 +131,7 @@ void sworker_exec_task(void *param)
110 131
 	static char buf[BUF_SIZE+1];
111 132
 	receive_info_t rcvi;
112 133
 	int len;
134
+	pv_value_t val;
113 135
 
114 136
 	stp = (sworker_task_param_t *)param;
115 137
 
... ...
@@ -124,6 +146,23 @@ void sworker_exec_task(void *param)
124 146
 	memcpy(&rcvi, &stp->rcv, sizeof(receive_info_t));
125 147
 	rcvi.rflags |= RECV_F_INTERNAL;
126 148
 
149
+	if(_sworker_xdata_spec!=NULL) {
150
+		if(stp->xdata.len>0) {
151
+			memset(&val, 0, sizeof(pv_value_t));
152
+			val.flags |= PV_VAL_STR;
153
+			val.rs = stp->xdata;
154
+			if(pv_set_spec_value(NULL, _sworker_xdata_spec, 0, &val)!=0) {
155
+				LM_ERR("failed to set the xdata variable\n");
156
+				return;
157
+			}
158
+		} else {
159
+			if(pv_set_spec_value(NULL, _sworker_xdata_spec, 0, NULL)!=0) {
160
+				LM_ERR("failed to reset the xdata variable\n");
161
+				return;
162
+			}
163
+		}
164
+	}
165
+
127 166
 	_sworker_active = 1;
128 167
 	receive_msg(buf, len, &rcvi);
129 168
 	_sworker_active = 0;
... ...
@@ -137,9 +176,23 @@ int sworker_send_task(sip_msg_t *msg, str *gname)
137 176
 	async_task_t *at = NULL;
138 177
 	sworker_task_param_t *stp = NULL;
139 178
 	int dsize;
179
+	pv_value_t val;
140 180
 
181
+	memset(&val, 0, sizeof(pv_value_t));
141 182
 	dsize = sizeof(async_task_t) + sizeof(sworker_task_param_t)
142 183
 		+ (msg->len+1)*sizeof(char);
184
+	if(_sworker_xdata_spec!=NULL) {
185
+		if(pv_get_spec_value(msg, _sworker_xdata_spec, &val)!=0) {
186
+			LM_ERR("failed to get xdata value\n");
187
+			return -1;
188
+		}
189
+		if((val.flags & PV_VAL_STR) && (val.rs.len>0)) {
190
+			dsize += val.rs.len + 1;
191
+		} else {
192
+			LM_DBG("xdata does not have a string value - skipping\n");
193
+			val.rs.len = 0;
194
+		}
195
+	}
143 196
 	at = (async_task_t *)shm_malloc(dsize);
144 197
 	if(at == NULL) {
145 198
 		LM_ERR("no more shm memory\n");
... ...
@@ -153,6 +206,12 @@ int sworker_send_task(sip_msg_t *msg, str *gname)
153 206
 	memcpy(stp->buf, msg->buf, msg->len);
154 207
 	stp->len = msg->len;
155 208
 	memcpy(&stp->rcv, &msg->rcv, sizeof(receive_info_t));
209
+	if(val.rs.len>0) {
210
+		stp->xdata.s = (char*)stp+sizeof(sworker_task_param_t)+msg->len+1;
211
+		memcpy(stp->xdata.s, val.rs.s, val.rs.len);
212
+		stp->xdata.len = val.rs.len;
213
+		pv_value_destroy(&val);
214
+	}
156 215
 
157 216
 	return async_task_group_push(gname, at);
158 217
 }
Browse code

sworker: KSR.sworker.active() exported to kemi

- equivalent to existing sworker_active() config function

Daniel-Constantin Mierla authored on 30/07/2021 11:31:30
Showing 1 changed files
... ...
@@ -196,6 +196,17 @@ static int w_sworker_task(sip_msg_t *msg, char *pgname, char *p2)
196 196
 	return ki_sworker_task(msg, &gname);
197 197
 }
198 198
 
199
+/**
200
+ *
201
+ */
202
+static int ki_sworker_active(sip_msg_t *msg)
203
+{
204
+	if(_sworker_active==0) {
205
+		return -1;
206
+	}
207
+	return 1;
208
+}
209
+
199 210
 /**
200 211
  *
201 212
  */
... ...
@@ -217,6 +228,11 @@ static sr_kemi_t sr_kemi_sworker_exports[] = {
217 228
 		{ SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE,
218 229
 			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
219 230
 	},
231
+	{ str_init("sworker"), str_init("active"),
232
+		SR_KEMIP_INT, ki_sworker_active,
233
+		{ SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
234
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
235
+	},
220 236
 
221 237
 	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
222 238
 };
Browse code

sworker: restrict use of sworker_task() in pre-processing phase

- allow execution only in event_route[core:pre-routing]

Daniel-Constantin Mierla authored on 26/11/2020 12:25:50
Showing 1 changed files
... ...
@@ -167,12 +167,15 @@ int ki_sworker_task(sip_msg_t *msg, str *gname)
167 167
 		return -1;
168 168
 	}
169 169
 
170
+	if(!(msg->rcv.rflags & RECV_F_PREROUTING)) {
171
+		LM_WARN("not used in pre-routing phase\n");
172
+		return -1;
173
+	}
170 174
 	if(sworker_send_task(msg, gname) < 0) {
171 175
 		return -1;
172 176
 	}
173 177
 
174
-	/* force exit in config */
175
-	return 0;
178
+	return 1;
176 179
 }
177 180
 
178 181
 /**
Browse code

sworker: set the internal received flag

Daniel-Constantin Mierla authored on 26/11/2020 08:49:37
Showing 1 changed files
... ...
@@ -122,6 +122,7 @@ void sworker_exec_task(void *param)
122 122
 	memcpy(buf, stp->buf, stp->len);
123 123
 	len = stp->len;
124 124
 	memcpy(&rcvi, &stp->rcv, sizeof(receive_info_t));
125
+	rcvi.rflags |= RECV_F_INTERNAL;
125 126
 
126 127
 	_sworker_active = 1;
127 128
 	receive_msg(buf, len, &rcvi);
Browse code

sworker: new module to delegate sip message processing to a group of workers

Daniel-Constantin Mierla authored on 25/11/2020 13:05:31
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,228 @@
1
+/**
2
+ * Copyright (C) 2020 Daniel-Constantin Mierla (asipto.com)
3
+ *
4
+ * This file is part of Kamailio, a free SIP server.
5
+ *
6
+ * This file is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version
10
+ *
11
+ *
12
+ * This file is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License
18
+ * along with this program; if not, write to the Free Software
19
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+ *
21
+ */
22
+
23
+#include <stdio.h>
24
+#include <unistd.h>
25
+#include <stdlib.h>
26
+#include <string.h>
27
+
28
+#include "../../core/sr_module.h"
29
+#include "../../core/dprint.h"
30
+#include "../../core/ut.h"
31
+#include "../../core/fmsg.h"
32
+#include "../../core/receive.h"
33
+#include "../../core/mod_fix.h"
34
+#include "../../core/async_task.h"
35
+#include "../../core/kemi.h"
36
+
37
+MODULE_VERSION
38
+
39
+static int mod_init(void);
40
+static int child_init(int);
41
+static void mod_destroy(void);
42
+
43
+static int w_sworker_task(sip_msg_t *msg, char *pgname, char *p2);
44
+static int w_sworker_active(sip_msg_t *msg, char *p1, char *p2);
45
+
46
+static int _sworker_active = 0;
47
+
48
+/* clang-format off */
49
+typedef struct sworker_task_param {
50
+	char *buf;
51
+	int len;
52
+	receive_info_t rcv;
53
+} sworker_task_param_t;
54
+
55
+static cmd_export_t cmds[]={
56
+	{"sworker_task", (cmd_function)w_sworker_task, 1, fixup_spve_null,
57
+		fixup_free_spve_null, REQUEST_ROUTE|CORE_ONREPLY_ROUTE},
58
+	{"sworker_active", (cmd_function)w_sworker_active, 0, 0,
59
+		0, REQUEST_ROUTE|CORE_ONREPLY_ROUTE},
60
+	{0, 0, 0, 0, 0, 0}
61
+};
62
+
63
+struct module_exports exports = {
64
+	"sworker",
65
+	DEFAULT_DLFLAGS, /* dlopen flags */
66
+	cmds,
67
+	0,
68
+	0,              /* exported RPC methods */
69
+	0,              /* exported pseudo-variables */
70
+	0,              /* response function */
71
+	mod_init,       /* module initialization function */
72
+	child_init,     /* per child init function */
73
+	mod_destroy    	/* destroy function */
74
+};
75
+/* clang-format on */
76
+
77
+
78
+/**
79
+ * init module function
80
+ */
81
+static int mod_init(void)
82
+{
83
+	return 0;
84
+}
85
+
86
+/**
87
+ * @brief Initialize async module children
88
+ */
89
+static int child_init(int rank)
90
+{
91
+	return 0;
92
+}
93
+
94
+/**
95
+ * destroy module function
96
+ */
97
+static void mod_destroy(void)
98
+{
99
+}
100
+
101
+/**
102
+ *
103
+ */
104
+/**
105
+ *
106
+ */
107
+void sworker_exec_task(void *param)
108
+{
109
+	sworker_task_param_t *stp;
110
+	static char buf[BUF_SIZE+1];
111
+	receive_info_t rcvi;
112
+	int len;
113
+
114
+	stp = (sworker_task_param_t *)param;
115
+
116
+	LM_DBG("received task [%p] - msg len [%d]\n", stp, stp->len);
117
+	if(stp->len > BUF_SIZE) {
118
+		LM_ERR("message is too large [%d]\n", stp->len);
119
+		return;
120
+	}
121
+
122
+	memcpy(buf, stp->buf, stp->len);
123
+	len = stp->len;
124
+	memcpy(&rcvi, &stp->rcv, sizeof(receive_info_t));
125
+
126
+	_sworker_active = 1;
127
+	receive_msg(buf, len, &rcvi);
128
+	_sworker_active = 0;
129
+}
130
+
131
+/**
132
+ *
133
+ */
134
+int sworker_send_task(sip_msg_t *msg, str *gname)
135
+{
136
+	async_task_t *at = NULL;
137
+	sworker_task_param_t *stp = NULL;
138
+	int dsize;
139
+
140
+	dsize = sizeof(async_task_t) + sizeof(sworker_task_param_t)
141
+		+ (msg->len+1)*sizeof(char);
142
+	at = (async_task_t *)shm_malloc(dsize);
143
+	if(at == NULL) {
144
+		LM_ERR("no more shm memory\n");
145
+		return -1;
146
+	}
147
+	memset(at, 0, dsize);
148
+	at->exec = sworker_exec_task;
149
+	at->param = (char *)at + sizeof(async_task_t);
150
+	stp = (sworker_task_param_t *)at->param;
151
+	stp->buf = (char*)stp+sizeof(sworker_task_param_t);
152
+	memcpy(stp->buf, msg->buf, msg->len);
153
+	stp->len = msg->len;
154
+	memcpy(&stp->rcv, &msg->rcv, sizeof(receive_info_t));
155
+
156
+	return async_task_group_push(gname, at);
157
+}
158
+
159
+/**
160
+ *
161
+ */
162
+int ki_sworker_task(sip_msg_t *msg, str *gname)
163
+{
164
+	if(msg==NULL || faked_msg_match(msg)) {
165
+		LM_ERR("invalid usage for null or faked message\n");
166
+		return -1;
167
+	}
168
+
169
+	if(sworker_send_task(msg, gname) < 0) {
170
+		return -1;
171
+	}
172
+
173
+	/* force exit in config */
174
+	return 0;
175
+}
176
+
177
+/**
178
+ *
179
+ */
180
+static int w_sworker_task(sip_msg_t *msg, char *pgname, char *p2)
181
+{
182
+	str gname;
183
+
184
+	if(msg == NULL) {
185
+		return -1;
186
+	}
187
+
188
+	if(fixup_get_svalue(msg, (gparam_t *)pgname, &gname) != 0) {
189
+		LM_ERR("no async route block name\n");
190
+		return -1;
191
+	}
192
+	return ki_sworker_task(msg, &gname);
193
+}
194
+
195
+/**
196
+ *
197
+ */
198
+static int w_sworker_active(sip_msg_t *msg, char *p1, char *p2)
199
+{
200
+	if(_sworker_active==0) {
201
+		return -1;
202
+	}
203
+	return 1;
204
+}
205
+
206
+/**
207
+ *
208
+ */
209
+/* clang-format off */
210
+static sr_kemi_t sr_kemi_sworker_exports[] = {
211
+	{ str_init("sworker"), str_init("task"),
212
+		SR_KEMIP_INT, ki_sworker_task,
213
+		{ SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE,
214
+			SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
215
+	},
216
+
217
+	{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
218
+};
219
+/* clang-format on */
220
+
221
+/**
222
+ *
223
+ */
224
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
225
+{
226
+	sr_kemi_modules_add(sr_kemi_sworker_exports);
227
+	return 0;
228
+}