Browse code

Merge pull request #2359 from kamailio/jchavanton/mqueue_db_persistent

mqueue: add support for db persistency

Julien Chavanton authored on 02/07/2020 17:03:56 • GitHub committed on 02/07/2020 17:03:56
Showing 8 changed files
... ...
@@ -6,4 +6,6 @@ auto_gen=
6 6
 NAME=mqueue.so
7 7
 LIBS=
8 8
 
9
+SERLIBPATH=../../lib
10
+SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
9 11
 include ../../Makefile.modules
... ...
@@ -46,7 +46,7 @@
46 46
 	    <holder>Elena-Ramona Modroiu (asipto.com)</holder>
47 47
 	</copyright>
48 48
 	<copyright>
49
-	    <year>2018</year>
49
+	    <year>2018-2020</year>
50 50
 	    <holder>Julien chavanton, Flowroute</holder>
51 51
 	</copyright>
52 52
     </bookinfo>
... ...
@@ -61,6 +61,32 @@
61 61
     <section>
62 62
 	<title>Parameters</title>
63 63
 
64
+	<section id="mqueue.p.db_url">
65
+		<title><varname>db_url</varname> (str)</title>
66
+		<para>
67
+			The <acronym>URL</acronym> to connect to database for loading values in mqueue table at start up and/or saving values at shutdown.
68
+		</para>
69
+		<para>
70
+		<emphasis>
71
+			Default value is NULL (do not connect).
72
+		</emphasis>
73
+		</para>
74
+		<example>
75
+		<title>Set <varname>db_url</varname> parameter</title>
76
+		<programlisting format="linespecific">
77
+...
78
+modparam("mqueue", "db_url", "&defaultdb;")
79
+
80
+# Example of table in sqlite, you have the set the fields to support the length according to the data that will be present in the mqueue
81
+CREATE TABLE mqueue_name (
82
+id INTEGER PRIMARY KEY AUTOINCREMENT,
83
+key character varying(64) DEFAULT "" NOT NULL,
84
+val character varying(4096) DEFAULT "" NOT NULL
85
+);
86
+...
87
+</programlisting>
88
+		</example>
89
+	</section>
64 90
 	<section id="mqueue.p.mqueue">
65 91
 	    <title><varname>mqueue</varname> (string)</title>
66 92
 	    <para>
... ...
@@ -95,6 +121,16 @@
95 121
 				If not set the queue will be limitless.
96 122
 				</para>
97 123
 			</listitem>
124
+			<listitem>
125
+				<para>
126
+				<emphasis>dbmode</emphasis>: If set to 1, the content of the queue
127
+				is written to database table when the SIP server is stopped
128
+				(i.e., ensure persistency over restarts).
129
+				If set to 2, it is written at shutdown but not read at startup.
130
+				If set to 3, it is read at sartup but not written at shutdown.
131
+				Default value is 0 (no db table interaction).
132
+				</para>
133
+			</listitem>
98 134
 			</itemizedlist>
99 135
 		</listitem>
100 136
 		</itemizedlist>
... ...
@@ -34,7 +34,7 @@
34 34
 #include "../../core/fmsg.h"
35 35
 
36 36
 #include "mqueue_api.h"
37
-
37
+#include "mqueue_db.h"
38 38
 
39 39
 /**
40 40
  *
... ...
@@ -71,6 +71,11 @@ void mq_destroy(void)
71 71
 	mh = _mq_head_list;
72 72
 	while(mh!=NULL)
73 73
 	{
74
+		if(mh->dbmode == 1 || mh->dbmode == 3)
75
+		{
76
+			LM_INFO("mqueue[%.*s] dbmode[%d]\n", mh->name.len, mh->name.s, mh->dbmode);
77
+			mqueue_db_save_queue(&mh->name);
78
+		}
74 79
 		mi = mh->ifirst;
75 80
 		while(mi!=NULL)
76 81
 		{
... ...
@@ -180,6 +185,27 @@ mq_head_t *mq_head_get(str *name)
180 185
 	return NULL;
181 186
 }
182 187
 
188
+/**
189
+ *
190
+ */
191
+int mq_set_dbmode(str *name, int dbmode)
192
+{
193
+	mq_head_t *mh = NULL;
194
+
195
+	mh = _mq_head_list;
196
+	while(mh!=NULL)
197
+	{
198
+		if(name->len == mh->name.len
199
+				&& strncmp(mh->name.s, name->s, name->len)==0)
200
+		{
201
+			mh->dbmode = dbmode;
202
+			return 0;
203
+		}
204
+		mh = mh->next;
205
+	}
206
+	return -1;
207
+}
208
+
183 209
 /**
184 210
  *
185 211
  */
... ...
@@ -44,6 +44,7 @@ typedef struct _mq_head
44 44
 	str name;
45 45
 	int msize;
46 46
 	int csize;
47
+	int dbmode;
47 48
 	gen_lock_t lock;
48 49
 	mq_item_t *ifirst;
49 50
 	mq_item_t *ilast;
... ...
@@ -61,7 +62,6 @@ typedef struct _mq_pv
61 62
 } mq_pv_t;
62 63
 
63 64
 mq_pv_t *mq_pv_get(str *name);
64
-
65 65
 int pv_parse_mq_name(pv_spec_p sp, str *in);
66 66
 int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
67 67
 		pv_value_t *res);
... ...
@@ -79,6 +79,7 @@ void mq_pv_free(str *name);
79 79
 int mq_item_add(str *qname, str *key, str *val);
80 80
 
81 81
 int _mq_get_csize(str *);
82
+int mq_set_dbmode(str *, int dbmode);
82 83
 
83 84
 #endif
84 85
 
85 86
new file mode 100644
... ...
@@ -0,0 +1,324 @@
1
+/**
2
+ * Copyright (C) 2020 Julien Chavanton
3
+ *
4
+ * This file is part of Kamailio, a free SIP server.
5
+ *
6
+ * This file is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version
10
+ *
11
+ *
12
+ * This file is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License
18
+ * along with this program; if not, write to the Free Software
19
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+ *
21
+ */
22
+
23
+#include "../../lib/srdb1/db.h"
24
+#include "mqueue_api.h"
25
+
26
+/** database connection */
27
+db1_con_t *mqueue_db_con = NULL;
28
+db_func_t mq_dbf;
29
+
30
+/** db parameters */
31
+str mqueue_db_url = {0, 0};
32
+str mq_db_key_column = str_init("key");
33
+str mq_db_val_column = str_init("val");
34
+str mq_db_id_column  = str_init("id");
35
+
36
+/**
37
+ * initialize database connection
38
+ */
39
+int mqueue_db_init_con(void)
40
+{
41
+	if (mqueue_db_url.len<=0) {
42
+		LM_ERR("failed to connect to the database, no db url\n");
43
+		return -1;
44
+	}
45
+	/* binding to DB module */
46
+	if (db_bind_mod(&mqueue_db_url, &mq_dbf))
47
+	{
48
+		LM_ERR("database module not found\n");
49
+		return -1;
50
+	}
51
+
52
+	if (!DB_CAPABILITY(mq_dbf, DB_CAP_ALL))
53
+	{
54
+		LM_ERR("database module does not "
55
+				"implement all functions needed by the module\n");
56
+		return -1;
57
+	}
58
+	return 0;
59
+}
60
+
61
+/**
62
+ * open database connection
63
+ */
64
+int mqueue_db_open_con(void) {
65
+	if (mqueue_db_init_con()==0) {
66
+		mqueue_db_con = mq_dbf.init(&mqueue_db_url);
67
+		if (mqueue_db_con==NULL) {
68
+			LM_ERR("failed to connect to the database\n");
69
+			return -1;
70
+		}
71
+		LM_DBG("database connection opened successfully\n");
72
+		return 0;
73
+	}
74
+	return 0;
75
+}
76
+
77
+/**
78
+ * close database connection
79
+ */
80
+int mqueue_db_close_con(void)
81
+{
82
+	if (mqueue_db_con!=NULL && mq_dbf.close!=NULL)
83
+		mq_dbf.close(mqueue_db_con);
84
+	mqueue_db_con=NULL;
85
+	return 0;
86
+}
87
+
88
+int mqueue_db_load_queue(str *name)
89
+{
90
+	int ncols=2;
91
+	db1_res_t* db_res = NULL;
92
+	db_key_t db_cols[2] = {&mq_db_key_column, &mq_db_val_column};
93
+	db_key_t db_ord = &mq_db_id_column;
94
+	int mq_fetch_rows = 100;
95
+	int ret = 0;
96
+	str val = str_init("");
97
+	str key = str_init("");
98
+	int i;
99
+	int cnt=0;
100
+
101
+	if (mqueue_db_open_con() != 0) {
102
+		LM_ERR("no db connection\n");
103
+		return -1;
104
+	}
105
+
106
+	if (mq_dbf.use_table(mqueue_db_con, name) < 0)
107
+	{
108
+		LM_ERR("failed to use_table\n");
109
+		goto error;
110
+	}
111
+
112
+	LM_INFO("=============== loading queue table [%.*s] from database\n",
113
+			name->len, name->s);
114
+
115
+	if (DB_CAPABILITY(mq_dbf, DB_CAP_FETCH)) {
116
+		if(mq_dbf.query(mqueue_db_con,0,0,0,db_cols,0,ncols,db_ord,0) < 0)
117
+		{
118
+			LM_ERR("Error while querying db\n");
119
+			goto error;
120
+		}
121
+		if(mq_dbf.fetch_result(mqueue_db_con, &db_res, mq_fetch_rows)<0)
122
+		{
123
+			LM_ERR("Error while fetching result\n");
124
+			if (db_res)
125
+				mq_dbf.free_result(mqueue_db_con, db_res);
126
+			goto error;
127
+		} else {
128
+			if(RES_ROW_N(db_res)==0)
129
+			{
130
+				mq_dbf.free_result(mqueue_db_con, db_res);
131
+				LM_DBG("Nothing to be loaded in queue\n");
132
+				mqueue_db_close_con();
133
+				return 0;
134
+			}
135
+		}
136
+	} else {
137
+		if((ret=mq_dbf.query(mqueue_db_con, NULL, NULL, NULL, db_cols,
138
+				0, ncols, 0, &db_res))!=0
139
+			|| RES_ROW_N(db_res)<=0 )
140
+		{
141
+			if(ret==0)
142
+			{
143
+				mq_dbf.free_result(mqueue_db_con, db_res);
144
+				mqueue_db_close_con();
145
+				return 0;
146
+			} else {
147
+				goto error;
148
+			}
149
+		}
150
+	}
151
+
152
+	do {
153
+		for(i=0; i<RES_ROW_N(db_res); i++)
154
+		{
155
+			if(VAL_NULL(&RES_ROWS(db_res)[i].values[0])) {
156
+				LM_ERR("mqueue [%.*s] row [%d] has NULL key string\n",
157
+					name->len, name->s, i);
158
+				goto error;
159
+			}
160
+			if(VAL_NULL(&RES_ROWS(db_res)[i].values[1])) {
161
+				LM_ERR("mqueue [%.*s] row [%d] has NULL value string\n",
162
+					name->len, name->s, i);
163
+				goto error;
164
+			}
165
+			switch(RES_ROWS(db_res)[i].values[0].type) {
166
+			case DB1_STR:
167
+				key.s = (RES_ROWS(db_res)[i].values[0].val.str_val.s);
168
+				if(key.s==NULL) {
169
+					LM_ERR("mqueue [%.*s] row [%d] has NULL key\n",
170
+						name->len, name->s, i);
171
+					goto error;
172
+				}
173
+				key.len = (RES_ROWS(db_res)[i].values[0].val.str_val.len);
174
+				break;
175
+			case DB1_BLOB:
176
+				key.s = (RES_ROWS(db_res)[i].values[0].val.blob_val.s);
177
+				if(key.s==NULL) {
178
+					LM_ERR("mqueue [%.*s] row [%d] has NULL key\n",
179
+						name->len, name->s, i);
180
+					goto error;
181
+				}
182
+				key.len = (RES_ROWS(db_res)[i].values[0].val.blob_val.len);
183
+				break;
184
+			case DB1_STRING:
185
+				key.s = (char*)(RES_ROWS(db_res)[i].values[0].val.string_val);
186
+				if(key.s==NULL) {
187
+					LM_ERR("mqueue [%.*s] row [%d] has NULL key\n",
188
+						name->len, name->s, i);
189
+					goto error;
190
+				}
191
+				key.len = strlen(key.s);
192
+				break;
193
+			default:
194
+				LM_ERR("key type must be string (type=%d)\n",
195
+						RES_ROWS(db_res)[i].values[0].type);
196
+				goto error;
197
+			}
198
+			switch(RES_ROWS(db_res)[i].values[1].type) {
199
+			case DB1_STR:
200
+				val.s = (RES_ROWS(db_res)[i].values[1].val.str_val.s);
201
+				if(val.s==NULL) {
202
+					LM_ERR("mqueue [%.*s] row [%d] has NULL value\n",
203
+						name->len, name->s, i);
204
+					goto error;
205
+				}
206
+				val.len = (RES_ROWS(db_res)[i].values[1].val.str_val.len);
207
+				break;
208
+			case DB1_BLOB:
209
+				val.s = (RES_ROWS(db_res)[i].values[1].val.blob_val.s);
210
+				if(val.s==NULL) {
211
+					LM_ERR("mqueue [%.*s] row [%d] has NULL value\n",
212
+						name->len, name->s, i);
213
+					goto error;
214
+				}
215
+				val.len = (RES_ROWS(db_res)[i].values[1].val.blob_val.len);
216
+				break;
217
+			case DB1_STRING:
218
+				val.s = (char*)(RES_ROWS(db_res)[i].values[1].val.string_val);
219
+				if(val.s==NULL) {
220
+					LM_ERR("mqueue [%.*s] row [%d] has NULL value\n",
221
+						name->len, name->s, i);
222
+					goto error;
223
+				}
224
+				val.len = strlen(val.s);
225
+				break;
226
+			default:
227
+				LM_ERR("key type must be string (type=%d)\n",
228
+						RES_ROWS(db_res)[i].values[1].type);
229
+				goto error;
230
+			}
231
+			cnt++;
232
+			LM_DBG("adding item[%d] key[%.*s] value[%.*s]\n", cnt, key.len, key.s, val.len, val.s);
233
+			mq_item_add(name, &key, &val);
234
+		}
235
+
236
+		if (DB_CAPABILITY(mq_dbf, DB_CAP_FETCH)) {
237
+			if(mq_dbf.fetch_result(mqueue_db_con, &db_res, mq_fetch_rows)<0) {
238
+				LM_ERR("Error while fetching!\n");
239
+				goto error;
240
+			}
241
+		} else {
242
+			break;
243
+		}
244
+	}  while(RES_ROW_N(db_res)>0);
245
+
246
+	mq_dbf.free_result(mqueue_db_con, db_res);
247
+
248
+	if (mq_dbf.delete(mqueue_db_con, 0, 0, 0, 0) < 0) {
249
+		LM_ERR("failed to clear table\n");
250
+		goto error;
251
+	}
252
+
253
+	LM_DBG("loaded %d values in queue\n", cnt);
254
+	mqueue_db_close_con();
255
+	return 0;
256
+error:
257
+	mqueue_db_close_con();
258
+	return -1;
259
+}
260
+
261
+int mqueue_db_save_queue(str *name)
262
+{
263
+	int ncols=2;
264
+	db_key_t db_cols[2] = {&mq_db_key_column, &mq_db_val_column};
265
+	db_val_t db_vals[2];
266
+	int i;
267
+	int mqueue_sz = 0;
268
+	int ret = 0;
269
+
270
+	if (mqueue_db_open_con() != 0) {
271
+		LM_ERR("no db connection\n");
272
+		return -1;
273
+	}
274
+
275
+	if (mq_dbf.use_table(mqueue_db_con, name) < 0)
276
+	{
277
+		LM_ERR("failed to use_table\n");
278
+		goto error;
279
+	}
280
+
281
+	if (name->len <= 0 || name->s == NULL) {
282
+		LM_ERR("bad mqueue name\n");
283
+		goto error;
284
+	}
285
+
286
+	mqueue_sz = _mq_get_csize(name);
287
+
288
+	if (mqueue_sz < 0) {
289
+		LM_ERR("no such mqueue\n");
290
+		goto error;
291
+	}
292
+	for(i=0;i<mqueue_sz;i++) {
293
+		ret = mq_head_fetch(name);
294
+		if (ret != 0) break;
295
+		str *key = NULL;
296
+		str *val = NULL;
297
+		key = get_mqk(name);
298
+		val = get_mqv(name);
299
+		LM_DBG("inserting mqueue[%.*s] name[%.*s] value[%.*s]\n",
300
+				name->len, name->s, key->len, key->s, val->len, val->s);
301
+		db_vals[0].type = DB1_STR;
302
+		db_vals[0].nul  = 0;
303
+		db_vals[0].val.str_val.s   = key->s;
304
+		db_vals[0].val.str_val.len = key->len;
305
+		db_vals[1].type = DB1_STR;
306
+		db_vals[1].nul  = 0;
307
+		db_vals[1].val.str_val.s   = val->s;
308
+		db_vals[1].val.str_val.len = val->len;
309
+		if(mq_dbf.insert(mqueue_db_con, db_cols, db_vals, ncols) < 0)
310
+		{
311
+			LM_ERR("failed to store key [%.*s] val [%.*s]\n",
312
+					key->len, key->s,
313
+					val->len, val->s);
314
+		}
315
+	}
316
+
317
+	LM_INFO("queue [%.*s] saved in db\n",
318
+			name->len, name->s);
319
+	mqueue_db_close_con();
320
+	return 0;
321
+error:
322
+	mqueue_db_close_con();
323
+	return -1;
324
+}
0 325
new file mode 100644
... ...
@@ -0,0 +1,33 @@
1
+/**
2
+ * Copyright (C) 2020 Julien Chavanton
3
+ *
4
+ * This file is part of Kamailio, a free SIP server.
5
+ *
6
+ * This file is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version
10
+ *
11
+ *
12
+ * This file is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License
18
+ * along with this program; if not, write to the Free Software
19
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+ *
21
+ */
22
+
23
+#ifndef _MQUEUE_DB_H_
24
+#define _MQUEUE_DB_H_
25
+
26
+#include "../../lib/srdb1/db.h"
27
+#include "mqueue_api.h"
28
+
29
+extern str mqueue_db_url;
30
+
31
+int mqueue_db_load_queue(str *name);
32
+int mqueue_db_save_queue(str *name);
33
+#endif
... ...
@@ -37,6 +37,7 @@
37 37
 #include "../../core/kemi.h"
38 38
 
39 39
 #include "mqueue_api.h"
40
+#include "mqueue_db.h"
40 41
 #include "api.h"
41 42
 
42 43
 MODULE_VERSION
... ...
@@ -54,6 +55,7 @@ static int bind_mq(mq_api_t* api);
54 55
 
55 56
 static int mqueue_rpc_init(void);
56 57
 
58
+
57 59
 static pv_export_t mod_pvs[] = {
58 60
 	{ {"mqk", sizeof("mqk")-1}, PVT_OTHER, pv_get_mqk, 0,
59 61
 		pv_parse_mq_name, 0, 0, 0 },
... ...
@@ -80,6 +82,7 @@ static cmd_export_t cmds[]={
80 82
 };
81 83
 
82 84
 static param_export_t params[]={
85
+	{"db_url",          PARAM_STR, &mqueue_db_url},
83 86
 	{"mqueue",          PARAM_STRING|USE_FUNC_PARAM, (void*)mq_param},
84 87
 	{0, 0, 0}
85 88
 };
... ...
@@ -205,6 +208,7 @@ int mq_param(modparam_t type, void *val)
205 208
 	param_t *pit=NULL;
206 209
 	str qname = {0, 0};
207 210
 	int msize = 0;
211
+	int dbmode = 0;
208 212
 
209 213
 	if(val==NULL)
210 214
 		return -1;
... ...
@@ -229,6 +233,9 @@ int mq_param(modparam_t type, void *val)
229 233
 		} else if(pit->name.len==4
230 234
 				&& strncasecmp(pit->name.s, "size", 4)==0) {
231 235
 			str2sint(&pit->body, &msize);
236
+		} else if(pit->name.len==6
237
+				&& strncasecmp(pit->name.s, "dbmode", 6)==0) {
238
+			str2sint(&pit->body, &dbmode);
232 239
 		}  else {
233 240
 			LM_ERR("unknown param: %.*s\n", pit->name.len, pit->name.s);
234 241
 			free_params(params_list);
... ...
@@ -247,6 +254,16 @@ int mq_param(modparam_t type, void *val)
247 254
 		free_params(params_list);
248 255
 		return -1;
249 256
 	}
257
+	LM_INFO("mqueue param: [%.*s|%d]\n", qname.len, qname.s, dbmode);
258
+	if(dbmode == 1 || dbmode == 2) {
259
+		if(mqueue_db_load_queue(&qname)<0)
260
+		{
261
+			LM_ERR("error loading mqueue: %.*s from DB\n", qname.len, qname.s);
262
+			free_params(params_list);
263
+			return -1;
264
+		}
265
+	}
266
+	mq_set_dbmode(&qname, dbmode);
250 267
 	free_params(params_list);
251 268
 	return 0;
252 269
 }