Browse code

htable: initial dmq integration

Charles Chance authored on 07/10/2013 06:44:38
Showing 8 changed files
... ...
@@ -16,4 +16,5 @@ SERLIBPATH=../../lib
16 16
 SER_LIBS+=$(SERLIBPATH)/kmi/kmi
17 17
 SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
18 18
 SER_LIBS+=$(SERLIBPATH)/kcore/kcore
19
+SER_LIBS+=$(SERLIBPATH)/srutils/srutils
19 20
 include ../../Makefile.modules
... ...
@@ -28,6 +28,7 @@
28 28
 
29 29
 #include "ht_api.h"
30 30
 #include "api.h"
31
+#include "ht_dmq.h"
31 32
 
32 33
 /**
33 34
  *
... ...
@@ -39,6 +40,11 @@ int ht_api_set_cell(str *hname, str *name, int type,
39 40
 	ht = ht_get_table(hname);
40 41
 	if(ht==NULL)
41 42
 		return -1;
43
+
44
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, hname, name, type, val, mode)!=0) {
45
+		LM_ERR("dmq relication failed\n");
46
+	}
47
+
42 48
 	return ht_set_cell(ht, name, type, val, mode);
43 49
 }
44 50
 
... ...
@@ -51,6 +57,9 @@ int ht_api_del_cell(str *hname, str *name)
51 57
 	ht = ht_get_table(hname);
52 58
 	if(ht==NULL)
53 59
 		return -1;
60
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, hname, name, 0, NULL, 0)!=0) {
61
+		LM_ERR("dmq relication failed\n");
62
+	}
54 63
 	return ht_del_cell(ht, name);
55 64
 }
56 65
 
... ...
@@ -64,6 +73,9 @@ int ht_api_set_cell_expire(str *hname, str *name,
64 73
 	ht = ht_get_table(hname);
65 74
 	if(ht==NULL)
66 75
 		return -1;
76
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL_EXPIRE, hname, name, type, val, 0)!=0) {
77
+		LM_ERR("dmq relication failed\n");
78
+	}
67 79
 	return ht_set_cell_expire(ht, name, type, val);
68 80
 }
69 81
 
... ...
@@ -86,9 +98,17 @@ int ht_api_get_cell_expire(str *hname, str *name,
86 98
 int ht_api_rm_cell_re(str *hname, str *sre, int mode)
87 99
 {
88 100
 	ht_t* ht;
101
+	int_str isval;
89 102
 	ht = ht_get_table(hname);
90 103
 	if(ht==NULL)
91 104
 		return -1;
105
+	if (ht->dmqreplicate>0) {
106
+		isval.s.s = sre->s;
107
+		isval.s.len = sre->len;
108
+		if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, hname, NULL, AVP_VAL_STR, &isval, mode)!=0) {
109
+			LM_ERR("dmq relication failed\n");
110
+		}
111
+	}
92 112
 	if(ht_rm_cell_re(sre, ht, mode /* 0 - name; 1 - value */)<0)
93 113
 		return -1;
94 114
 	return 0;
... ...
@@ -218,7 +218,7 @@ ht_t* ht_get_table(str *name)
218 218
 }
219 219
 
220 220
 int ht_add_table(str *name, int autoexp, str *dbtable, int size, int dbmode,
221
-		int itype, int_str *ival, int updateexpire)
221
+		int itype, int_str *ival, int updateexpire, int dmqreplicate)
222 222
 {
223 223
 	unsigned int htid;
224 224
 	ht_t *ht;
... ...
@@ -261,7 +261,7 @@ int ht_add_table(str *name, int autoexp, str *dbtable, int size, int dbmode,
261 261
 	ht->flags = itype;
262 262
 	if(ival!=NULL)
263 263
 		ht->initval = *ival;
264
-
264
+	ht->dmqreplicate = dmqreplicate;
265 265
 	ht->next = _ht_root;
266 266
 	_ht_root = ht;
267 267
 	return 0;
... ...
@@ -761,6 +761,7 @@ int ht_table_spec(char *spec)
761 761
 	unsigned int size = 4;
762 762
 	unsigned int dbmode = 0;
763 763
 	unsigned int updateexpire = 1;
764
+	unsigned int dmqreplicate = 0;
764 765
 	str in;
765 766
 	str tok;
766 767
 	param_t *pit=NULL;
... ...
@@ -817,11 +818,16 @@ int ht_table_spec(char *spec)
817 818
 				goto error;
818 819
 
819 820
 			LM_DBG("htable [%.*s] - updateexpire [%u]\n", name.len, name.s, updateexpire); 
821
+		} else if(pit->name.len == 12 && strncmp(pit->name.s, "dmqreplicate", 12) == 0) {
822
+			if(str2int(&tok, &dmqreplicate) != 0)
823
+				goto error;
824
+
825
+			LM_DBG("htable [%.*s] - dmqreplicate [%u]\n", name.len, name.s, dmqreplicate); 
820 826
 		} else { goto error; }
821 827
 	}
822 828
 
823 829
 	return ht_add_table(&name, autoexpire, &dbtable, size, dbmode,
824
-			itype, &ival, updateexpire);
830
+			itype, &ival, updateexpire, dmqreplicate);
825 831
 
826 832
 error:
827 833
 	LM_ERR("invalid htable parameter [%.*s]\n", in.len, in.s);
... ...
@@ -62,6 +62,7 @@ typedef struct _ht
62 62
 	int_str initval;
63 63
 	int updateexpire;
64 64
 	unsigned int htsize;
65
+	int dmqreplicate;
65 66
 	ht_entry_t *entries;
66 67
 	struct _ht *next;
67 68
 } ht_t;
... ...
@@ -73,7 +74,7 @@ typedef struct _ht_pv {
73 74
 } ht_pv_t, *ht_pv_p;
74 75
 
75 76
 int ht_add_table(str *name, int autoexp, str *dbtable, int size, int dbmode,
76
-		int itype, int_str *ival, int updateexpire);
77
+		int itype, int_str *ival, int updateexpire, int dmqreplicate);
77 78
 int ht_init_tables(void);
78 79
 int ht_destroy(void);
79 80
 int ht_set_cell(ht_t *ht, str *name, int type, int_str *val, int mode);
80 81
new file mode 100644
... ...
@@ -0,0 +1,282 @@
1
+/**
2
+ * 
3
+ * Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
4
+ *
5
+ * This file is part of Kamailio, a free SIP server.
6
+ *
7
+ * Kamailio is free software; you can redistribute it and/or modify
8
+ * it under the terms of the GNU General Public License as published by
9
+ * the Free Software Foundation; either version 2 of the License, or
10
+ * (at your option) any later version
11
+ *
12
+ * Kamailio is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License 
18
+ * along with this program; if not, write to the Free Software 
19
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20
+ *
21
+ */
22
+
23
+
24
+#include "ht_dmq.h"
25
+#include "ht_api.h"
26
+
27
+static str ht_dmq_content_type = str_init("application/json");
28
+static str dmq_200_rpl  = str_init("OK");
29
+static str dmq_400_rpl  = str_init("Bad Request");
30
+static str dmq_500_rpl  = str_init("Server Internal Error");
31
+
32
+typedef struct _ht_dmq_repdata {
33
+	int action;
34
+	str htname;
35
+	str cname;
36
+	int type;
37
+	int intval;
38
+	str strval;
39
+	int expire;
40
+} ht_dmq_repdata_t;
41
+
42
+dmq_api_t ht_dmqb;
43
+dmq_peer_t* ht_dmq_peer = NULL;
44
+dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
45
+
46
+/**
47
+ * @brief add notification peer
48
+ */
49
+int ht_dmq_initialize()
50
+{
51
+	dmq_peer_t not_peer;
52
+
53
+        /* load the DMQ API */
54
+        if (dmq_load_api(&ht_dmqb)!=0) {
55
+                LM_ERR("cannot load dmq api\n");
56
+                return -1;
57
+        } else {
58
+                LM_DBG("loaded dmq api\n");
59
+        }
60
+
61
+	not_peer.callback = ht_dmq_handle_msg;
62
+	not_peer.description.s = "htable";
63
+	not_peer.description.len = 6;
64
+	not_peer.peer_id.s = "htable";
65
+	not_peer.peer_id.len = 6;
66
+	ht_dmq_peer = ht_dmqb.register_dmq_peer(&not_peer);
67
+	if(!ht_dmq_peer) {
68
+		LM_ERR("error in register_dmq_peer\n");
69
+		goto error;
70
+	} else {
71
+		LM_DBG("dmq peer registered\n");
72
+	}
73
+	return 0;
74
+error:
75
+	return -1;
76
+}
77
+
78
+int ht_dmq_broadcast(str* body) {
79
+        if (!ht_dmq_peer) {
80
+                LM_ERR("ht_dmq_peer is null!\n");
81
+                return -1;
82
+        }
83
+        LM_DBG("sending broadcast...\n");
84
+        ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
85
+        return 0;
86
+}
87
+
88
+/**
89
+ * @brief ht dmq callback
90
+ */
91
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
92
+{
93
+	int content_length;
94
+	str body;
95
+	ht_dmq_action_t action = HT_DMQ_NONE;
96
+	str htname, cname;
97
+	int type = 0, mode = 0;
98
+	int_str val;
99
+	srjson_doc_t jdoc;
100
+	srjson_t *it = NULL;
101
+
102
+	/* received dmq message */
103
+	LM_DBG("dmq message received\n");
104
+	/* parse the message headers */
105
+	if(parse_headers(msg, HDR_EOH_F, 0) < 0) {
106
+		LM_ERR("error parsing message headers\n");
107
+		goto error;
108
+	}
109
+	
110
+	if(!msg->content_length) {
111
+		LM_ERR("no content length header found\n");
112
+		goto invalid;
113
+	}
114
+	content_length = get_content_length(msg);
115
+	if(!content_length) {
116
+		LM_DBG("content length is 0\n");
117
+		goto invalid;
118
+	}
119
+
120
+	body.s = get_body(msg);
121
+	body.len = content_length;
122
+
123
+	if (!body.s) {
124
+		LM_ERR("unable to get body\n");
125
+		goto error;
126
+	}
127
+
128
+	/* parse body */
129
+	LM_DBG("body: %.*s\n", body.len, body.s);	
130
+
131
+	srjson_InitDoc(&jdoc, NULL);
132
+	jdoc.buf = body;
133
+
134
+	if(jdoc.root == NULL) {
135
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
136
+		if(jdoc.root == NULL)
137
+		{
138
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
139
+			goto invalid;
140
+		}
141
+	}
142
+
143
+	for(it=jdoc.root->child; it; it = it->next)
144
+	{
145
+		LM_DBG("found field: %s\n", it->string);
146
+		if (strcmp(it->string, "action")==0) {
147
+			action = it->valueint;
148
+		} else if (strcmp(it->string, "htname")==0) {
149
+			htname.s = it->valuestring;
150
+			htname.len = strlen(htname.s);
151
+		} else if (strcmp(it->string, "cname")==0) {
152
+			cname.s = it->valuestring;
153
+			cname.len = strlen(cname.s);
154
+		} else if (strcmp(it->string, "type")==0) {
155
+			type = it->valueint;
156
+		} else if (strcmp(it->string, "strval")==0) {
157
+			val.s.s = it->valuestring;
158
+			val.s.len = strlen(val.s.s);
159
+		} else if (strcmp(it->string, "intval")==0) {
160
+			val.n = it->valueint;
161
+		} else if (strcmp(it->string, "mode")==0) {
162
+			mode = it->valueint;
163
+		} else {
164
+			LM_ERR("unrecognized field in json object\n");
165
+			goto invalid;
166
+		}
167
+	}	
168
+
169
+	if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
170
+		LM_ERR("failed to replay action\n");
171
+		goto error;
172
+	}
173
+
174
+	srjson_DestroyDoc(&jdoc);
175
+	resp->reason = dmq_200_rpl;
176
+	resp->resp_code = 200;
177
+	return 0;
178
+
179
+invalid:
180
+	srjson_DestroyDoc(&jdoc);
181
+	resp->reason = dmq_400_rpl;
182
+	resp->resp_code = 400;
183
+	return 0;
184
+
185
+error:
186
+	srjson_DestroyDoc(&jdoc);
187
+	resp->reason = dmq_500_rpl;
188
+	resp->resp_code = 500;	
189
+	return 0;
190
+}
191
+
192
+int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
193
+
194
+	srjson_doc_t jdoc;
195
+
196
+        LM_DBG("replicating action to dmq peers...\n");
197
+
198
+	srjson_InitDoc(&jdoc, NULL);
199
+
200
+	jdoc.root = srjson_CreateObject(&jdoc);
201
+	if(jdoc.root==NULL) {
202
+		LM_ERR("cannot create json root\n");
203
+		goto error;
204
+	}
205
+
206
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
207
+	srjson_AddStrToObject(&jdoc, jdoc.root, "htname", htname->s, htname->len);
208
+	if (cname!=NULL) {
209
+		srjson_AddStrToObject(&jdoc, jdoc.root, "cname", cname->s, cname->len);
210
+	}
211
+
212
+	if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE || action==HT_DMQ_RM_CELL_RE) {
213
+		srjson_AddNumberToObject(&jdoc, jdoc.root, "type", type);
214
+		if (type&AVP_VAL_STR) {
215
+			srjson_AddStrToObject(&jdoc, jdoc.root, "strval", val->s.s, val->s.len);
216
+		} else {
217
+			srjson_AddNumberToObject(&jdoc, jdoc.root, "intval", val->n);
218
+		}
219
+	}
220
+
221
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "mode", mode);	
222
+
223
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
224
+	if(jdoc.buf.s!=NULL) {
225
+		jdoc.buf.len = strlen(jdoc.buf.s);
226
+		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
227
+		if (ht_dmq_broadcast(&jdoc.buf)!=0) {
228
+			goto error;
229
+		}
230
+		jdoc.free_fn(jdoc.buf.s);
231
+		jdoc.buf.s = NULL;
232
+	} else {
233
+		LM_ERR("unable to serialize data\n");
234
+		goto error;
235
+	}
236
+
237
+	srjson_DestroyDoc(&jdoc);
238
+	return 0;
239
+
240
+error:
241
+	if(jdoc.buf.s!=NULL) {
242
+		jdoc.free_fn(jdoc.buf.s);
243
+		jdoc.buf.s = NULL;
244
+	}
245
+	srjson_DestroyDoc(&jdoc);
246
+	return -1;
247
+}
248
+
249
+int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
250
+
251
+	ht_t* ht;
252
+	ht = ht_get_table(htname);
253
+	if(ht==NULL) {
254
+		LM_ERR("unable to get table\n");
255
+		return -1;
256
+	}
257
+
258
+        LM_DBG("replaying action %d on %.*s=>%.*s...\n", action, htname->len, htname->s, cname->len, cname->s);
259
+
260
+	if (action==HT_DMQ_SET_CELL) {
261
+		return ht_set_cell(ht, cname, type, val, mode);
262
+	} else if (action==HT_DMQ_SET_CELL_EXPIRE) {
263
+		return ht_set_cell_expire(ht, cname, 0, val);
264
+	} else if (action==HT_DMQ_DEL_CELL) {
265
+		return ht_del_cell(ht, cname);
266
+	} else if (action==HT_DMQ_RM_CELL_RE) {
267
+		return ht_rm_cell_re(&val->s, ht, mode);
268
+	} else {
269
+		LM_ERR("unrecognized action");
270
+		return -1;
271
+	}
272
+}
273
+
274
+/**
275
+ * @brief dmq response callback
276
+ */
277
+int ht_dmq_resp_callback_f(struct sip_msg* msg, int code,
278
+		dmq_node_t* node, void* param)
279
+{
280
+	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
281
+	return 0;
282
+}
0 283
new file mode 100644
... ...
@@ -0,0 +1,48 @@
1
+/**
2
+ *
3
+ * Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
4
+ *
5
+ * This file is part of Kamailio, a free SIP server.
6
+ *
7
+ * Kamailio is free software; you can redistribute it and/or modify
8
+ * it under the terms of the GNU General Public License as published by
9
+ * the Free Software Foundation; either version 2 of the License, or
10
+ * (at your option) any later version
11
+ *
12
+ * Kamailio is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License 
18
+ * along with this program; if not, write to the Free Software 
19
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20
+ */
21
+
22
+#ifndef _HT_DMQ_H_
23
+#define _HT_DMQ_H_
24
+
25
+#include "../dmq/bind_dmq.h"
26
+#include "../../lib/srutils/srjson.h"
27
+#include "../../parser/msg_parser.h"
28
+#include "../../parser/parse_content.h"
29
+
30
+extern dmq_api_t ht_dmqb;
31
+extern dmq_peer_t* ht_dmq_peer;
32
+extern dmq_resp_cback_t ht_dmq_resp_callback;
33
+
34
+typedef enum {
35
+		HT_DMQ_NONE,
36
+        HT_DMQ_SET_CELL,
37
+        HT_DMQ_SET_CELL_EXPIRE,
38
+        HT_DMQ_DEL_CELL,
39
+        HT_DMQ_RM_CELL_RE
40
+} ht_dmq_action_t;
41
+
42
+int ht_dmq_initialize();
43
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
44
+int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
45
+int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
46
+int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
47
+
48
+#endif
... ...
@@ -22,6 +22,7 @@
22 22
 		       
23 23
 #include "ht_api.h"
24 24
 #include "ht_var.h"
25
+#include "ht_dmq.h"
25 26
 
26 27
 /* pkg copy */
27 28
 ht_cell_t *_htc_local=NULL;
... ...
@@ -90,6 +91,9 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
90 91
 	if((val==NULL) || (val->flags&PV_VAL_NULL))
91 92
 	{
92 93
 		/* delete it */
94
+		if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, &hpv->htname, &htname, 0, NULL, 0)!=0) {
95
+			LM_ERR("dmq relication failed\n");
96
+		}
93 97
 		ht_del_cell(hpv->ht, &htname);
94 98
 		return 0;
95 99
 	}
... ...
@@ -97,6 +101,9 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
97 101
 	if(val->flags&PV_TYPE_INT)
98 102
 	{
99 103
 		isval.n = val->ri;
104
+		if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &hpv->htname, &htname, 0, &isval, 1)!=0) {
105
+			LM_ERR("dmq relication failed\n");
106
+		}
100 107
 		if(ht_set_cell(hpv->ht, &htname, 0, &isval, 1)!=0)
101 108
 		{
102 109
 			LM_ERR("cannot set $ht(%.*s)\n", htname.len, htname.s);
... ...
@@ -104,6 +111,9 @@ int pv_set_ht_cell(struct sip_msg* msg, pv_param_t *param,
104 111
 		}
105 112
 	} else {
106 113
 		isval.s = val->rs;
114
+		if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &hpv->htname, &htname, AVP_VAL_STR, &isval, 1)!=0) {
115
+			LM_ERR("dmq relication failed\n");
116
+		}
107 117
 		if(ht_set_cell(hpv->ht, &htname, AVP_VAL_STR, &isval, 1)!=0)
108 118
 		{
109 119
 			LM_ERR("cannot set $ht(%.*s)\n", htname.len, htname.s);
... ...
@@ -229,6 +239,9 @@ int pv_set_ht_cell_expire(struct sip_msg* msg, pv_param_t *param,
229 239
 		if(val->flags&PV_TYPE_INT)
230 240
 			isval.n = val->ri;
231 241
 	}
242
+	if (hpv->ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL_EXPIRE, &hpv->htname, &htname, 0, &isval, 0)!=0) {
243
+		LM_ERR("dmq relication failed\n");
244
+	}	
232 245
 	if(ht_set_cell_expire(hpv->ht, &htname, 0, &isval)!=0)
233 246
 	{
234 247
 		LM_ERR("cannot set $ht(%.*s)\n", htname.len, htname.s);
... ...
@@ -327,6 +340,11 @@ int pv_get_ht_add(struct sip_msg *msg,  pv_param_t *param,
327 340
 		return pv_get_null(msg, param, res);
328 341
 
329 342
 	/* integer */
343
+	if (hpv->ht->dmqreplicate>0) {
344
+		if (ht_dmq_replicate_action(HT_DMQ_SET_CELL, &hpv->htname, &htname, 0, &htc->value, 1)!=0) {
345
+			LM_ERR("dmq relication failed\n");
346
+		}
347
+	}	
330 348
 	return pv_get_sintval(msg, param, res, htc->value.n);
331 349
 }
332 350
 
... ...
@@ -44,12 +44,14 @@
44 44
 #include "ht_db.h"
45 45
 #include "ht_var.h"
46 46
 #include "api.h"
47
+#include "ht_dmq.h"
47 48
 
48 49
 
49 50
 MODULE_VERSION
50 51
 
51 52
 int  ht_timer_interval = 20;
52 53
 int  ht_db_expires_flag = 0;
54
+int  ht_enable_dmq = 0;
53 55
 
54 56
 static int htable_init_rpc(void);
55 57
 
... ...
@@ -124,6 +126,7 @@ static param_export_t params[]={
124 126
 	{"fetch_rows",         INT_PARAM, &ht_fetch_rows},
125 127
 	{"timer_interval",     INT_PARAM, &ht_timer_interval},
126 128
 	{"db_expires",         INT_PARAM, &ht_db_expires_flag},
129
+	{"enable_dmq",         INT_PARAM, &ht_enable_dmq},
127 130
 	{0,0,0}
128 131
 };
129 132
 
... ...
@@ -188,6 +191,12 @@ static int mod_init(void)
188 191
 			return -1;
189 192
 		}
190 193
 	}
194
+
195
+	if (ht_enable_dmq>0 && ht_dmq_initialize()!=0) {
196
+		LM_ERR("failed to initialize dmq integration\n");
197
+		return -1;
198
+	}
199
+
191 200
 	return 0;
192 201
 }
193 202
 
... ...
@@ -286,6 +295,7 @@ static int ht_rm_name_re(struct sip_msg* msg, char* key, char* foo)
286 295
 	str sre;
287 296
 	pv_spec_t *sp;
288 297
 	sp = (pv_spec_t*)key;
298
+	int_str isval;
289 299
 
290 300
 	hpv = (ht_pv_t*)sp->pvp.pvn.u.dname;
291 301
 
... ...
@@ -300,6 +310,12 @@ static int ht_rm_name_re(struct sip_msg* msg, char* key, char* foo)
300 310
 		LM_ERR("cannot get $ht expression\n");
301 311
 		return -1;
302 312
 	}
313
+	if (hpv->ht->dmqreplicate>0) {
314
+		isval.s = sre;
315
+		if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, &hpv->htname, NULL, AVP_VAL_STR, &isval, 0)!=0) {
316
+			LM_ERR("dmq relication failed\n");
317
+		}
318
+	}
303 319
 	if(ht_rm_cell_re(&sre, hpv->ht, 0)<0)
304 320
 		return -1;
305 321
 	return 1;
... ...
@@ -311,6 +327,7 @@ static int ht_rm_value_re(struct sip_msg* msg, char* key, char* foo)
311 327
 	str sre;
312 328
 	pv_spec_t *sp;
313 329
 	sp = (pv_spec_t*)key;
330
+	int_str isval;
314 331
 
315 332
 	hpv = (ht_pv_t*)sp->pvp.pvn.u.dname;
316 333
 
... ...
@@ -326,6 +343,12 @@ static int ht_rm_value_re(struct sip_msg* msg, char* key, char* foo)
326 343
 		return -1;
327 344
 	}
328 345
 
346
+	if (hpv->ht->dmqreplicate>0) {
347
+		isval.s = sre;
348
+		if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, &hpv->htname, NULL, AVP_VAL_STR, &isval, 1)!=0) {
349
+			LM_ERR("dmq relication failed\n");
350
+		}
351
+	}
329 352
 	if(ht_rm_cell_re(&sre, hpv->ht, 1)<0)
330 353
 		return -1;
331 354
 	return 1;
... ...
@@ -531,6 +554,10 @@ static struct mi_root* ht_mi_delete(struct mi_root* cmd_tree, void* param) {
531 554
 	if (!ht)
532 555
 		return init_mi_tree(404, MI_BAD_PARM_S, MI_BAD_PARM_LEN);
533 556
 
557
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, &ht->name, key, 0, NULL, 0)!=0) {
558
+		LM_ERR("dmq relication failed\n");
559
+	}
560
+
534 561
 	ht_del_cell(ht, key);
535 562
 
536 563
 	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
... ...
@@ -655,6 +682,10 @@ static void htable_rpc_delete(rpc_t* rpc, void* c) {
655 682
 		return;
656 683
 	}
657 684
 
685
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_DEL_CELL, &ht->name, &keyname, 0, NULL, 0)!=0) {
686
+		LM_ERR("dmq relication failed\n");
687
+	}
688
+
658 689
 	ht_del_cell(ht, &keyname);
659 690
 }
660 691
 
... ...
@@ -737,6 +768,10 @@ static void htable_rpc_sets(rpc_t* rpc, void* c) {
737 768
 		return;
738 769
 	}
739 770
 	
771
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &ht->name, &keyname, AVP_VAL_STR, &keyvalue, 1)!=0) {
772
+		LM_ERR("dmq relication failed\n");
773
+	}
774
+
740 775
 	if(ht_set_cell(ht, &keyname, AVP_VAL_STR, &keyvalue, 1)!=0)
741 776
 	{
742 777
 		LM_ERR("cannot set $ht(%.*s=>%.*s)\n", htname.len, htname.s,
... ...
@@ -766,6 +801,10 @@ static void htable_rpc_seti(rpc_t* rpc, void* c) {
766 801
 		rpc->fault(c, 500, "No such htable");
767 802
 		return;
768 803
 	}
804
+
805
+	if (ht->dmqreplicate>0 && ht_dmq_replicate_action(HT_DMQ_SET_CELL, &ht->name, &keyname, 0, &keyvalue, 1)!=0) {
806
+		LM_ERR("dmq relication failed\n");
807
+	}
769 808
 	
770 809
 	if(ht_set_cell(ht, &keyname, 0, &keyvalue, 1)!=0)
771 810
 	{
... ...
@@ -886,13 +925,14 @@ static void  htable_rpc_list(rpc_t* rpc, void* c)
886 925
 			dbname[0] = '\0';
887 926
 		}
888 927
 
889
-		if(rpc->struct_add(th, "Ssdddd",
928
+		if(rpc->struct_add(th, "Ssddddd",
890 929
 						"name", &ht->name,	/* String */
891 930
 						"dbtable", &dbname ,	/* Char * */
892 931
 						"dbmode", (int)  ht->dbmode,		/* u int */
893 932
 						"expire", (int) ht->htexpire,		/* u int */
894 933
 						"updateexpire", ht->updateexpire,	/* int */
895
-						"size", (int) ht->htsize		/* u int */
934
+						"size", (int) ht->htsize,			/* u int */
935
+						"dmqreplicate", ht->dmqreplicate	/* int */
896 936
 						) < 0) {
897 937
 			rpc->fault(c, 500, "Internal error creating data rpc");
898 938
 			goto error;