Browse code

Merge pull request #1402 from kamailio/cchance/presence_dmq

presence: dmq integration

Charles Chance authored on 25/01/2018 18:39:28 • GitHub committed on 25/01/2018 18:39:28
Showing 28 changed files
... ...
@@ -9,7 +9,7 @@
9 9
 
10 10
 <table id="presentity" xmlns:db="http://docbook.org/ns/docbook">
11 11
     <name>presentity</name>
12
-    <version>4</version>
12
+    <version>5</version>
13 13
     <type db="mysql">&MYSQL_TABLE_TYPE;</type>
14 14
     <description>
15 15
 		<db:para>
... ...
@@ -98,6 +98,14 @@
98 98
         <description>Priority of the record</description>
99 99
 	</column>
100 100
 
101
+    <column id="ruid">
102
+        <name>ruid</name>
103
+        <type>string</type>
104
+        <size>64</size>
105
+        <null/>
106
+        <description>Record internal unique id</description>
107
+    </column>
108
+
101 109
     <index>
102 110
         <name>presentity_idx</name>
103 111
         <colref linkend="username"/>
... ...
@@ -107,6 +115,12 @@
107 115
         <unique/>
108 116
     </index>
109 117
 
118
+    <index>
119
+        <name>ruid_idx</name>
120
+        <colref linkend="ruid"/>
121
+        <unique/>
122
+    </index>
123
+
110 124
     <index>
111 125
         <name>presentity_expires</name>
112 126
         <colref linkend="expires"/>
... ...
@@ -349,7 +349,7 @@ static void dmq_rpc_list_nodes(rpc_t *rpc, void *c)
349 349
 			goto error;
350 350
 		if(rpc->struct_add(h, "SSsSdd", "host", &cur->uri.host, "port",
351 351
 				   &cur->uri.port, "resolved_ip", ip, "status",
352
-				   get_status_str(cur->status), "last_notification",
352
+				   dmq_get_status_str(cur->status), "last_notification",
353 353
 				   cur->last_notification, "local", cur->local)
354 354
 				< 0)
355 355
 			goto error;
... ...
@@ -39,7 +39,7 @@ str dmq_node_timeout_str = str_init("timeout");
39 39
 /**
40 40
  * @brief get the string status of the node
41 41
  */
42
-str *get_status_str(int status)
42
+str *dmq_get_status_str(int status)
43 43
 {
44 44
 	switch(status) {
45 45
 		case DMQ_NODE_ACTIVE: {
... ...
@@ -389,8 +389,8 @@ int build_node_str(dmq_node_t *node, char *buf, int buflen)
389 389
 	len += 1;
390 390
 	memcpy(buf + len, "status=", 7);
391 391
 	len += 7;
392
-	memcpy(buf + len, get_status_str(node->status)->s,
393
-			get_status_str(node->status)->len);
394
-	len += get_status_str(node->status)->len;
392
+	memcpy(buf + len, dmq_get_status_str(node->status)->s,
393
+			dmq_get_status_str(node->status)->len);
394
+	len += dmq_get_status_str(node->status)->len;
395 395
 	return len;
396 396
 }
... ...
@@ -75,7 +75,7 @@ void shm_free_node(dmq_node_t *node);
75 75
 void pkg_free_node(dmq_node_t *node);
76 76
 int set_dmq_node_params(dmq_node_t *node, param_t *params);
77 77
 
78
-str *get_status_str(int status);
78
+str *dmq_get_status_str(int status);
79 79
 int build_node_str(dmq_node_t *node, char *buf, int buflen);
80 80
 
81 81
 extern dmq_node_t *self_node;
... ...
@@ -22,4 +22,5 @@ DEFS+=-DKAMAILIO_MOD_INTERFACE
22 22
 
23 23
 SERLIBPATH=../../lib
24 24
 SER_LIBS+=$(SERLIBPATH)/srdb1/srdb1
25
+SER_LIBS+=$(SERLIBPATH)/srutils/srutils
25 26
 include ../../Makefile.modules
... ...
@@ -62,6 +62,11 @@
62 62
 				<emphasis>tm</emphasis>.
63 63
 			</para>
64 64
 			</listitem>
65
+			<listitem>
66
+			<para>
67
+				<emphasis>dmq (only if replication is enabled)</emphasis>.
68
+			</para>
69
+			</listitem>
65 70
 			</itemizedlist>
66 71
 		</para>
67 72
 	</section>
... ...
@@ -897,6 +902,33 @@ modparam("presence", "retrieve_order_by", "priority, received_time")
897 902
     </example>
898 903
 </section>
899 904
 
905
+<section id="presence.p.enable_dmq">
906
+	<title><varname>enable_dmq</varname> (integer)</title>
907
+	<para>
908
+		If set to 1, will enable DMQ replication of presentities between nodes. Use this instead of a shared DB
909
+		to share state across a cluster and update local watchers in realtime (subs_db_mode < 3) or on next
910
+		notifier run (subs_db_mode = 3).
911
+	</para>
912
+	<para>
913
+		<emphasis>
914
+			If this parameter is enabled, the DMQ module must be loaded first - otherwise, startup will fail.
915
+		</emphasis>
916
+	</para>
917
+	<para>
918
+		<emphasis>
919
+			Default value is 0.
920
+		</emphasis>
921
+	</para>
922
+	<example>
923
+		<title>Set <varname>enable_dmq</varname> parameter</title>
924
+		<programlisting format="linespecific">
925
+			...
926
+			modparam("presence", "enable_dmq", 1)
927
+			...
928
+		</programlisting>
929
+	</example>
930
+</section>
931
+
900 932
 </section>
901 933
 
902 934
 <section>
... ...
@@ -79,6 +79,7 @@ str str_watcher_domain_col = str_init("watcher_domain");
79 79
 str str_event_id_col = str_init("event_id");
80 80
 str str_event_col = str_init("event");
81 81
 str str_etag_col = str_init("etag");
82
+str str_ruid_col = str_init("ruid");
82 83
 str str_from_tag_col = str_init("from_tag");
83 84
 str str_to_tag_col = str_init("to_tag");
84 85
 str str_callid_col = str_init("callid");
... ...
@@ -76,6 +76,7 @@ extern str str_watcher_domain_col;
76 76
 extern str str_event_id_col;
77 77
 extern str str_event_col;
78 78
 extern str str_etag_col;
79
+extern str str_ruid_col;
79 80
 extern str str_from_tag_col;
80 81
 extern str str_to_tag_col;
81 82
 extern str str_callid_col;
... ...
@@ -68,6 +68,7 @@
68 68
 #include "event_list.h"
69 69
 #include "bind_presence.h"
70 70
 #include "notify.h"
71
+#include "presence_dmq.h"
71 72
 #include "../../core/mod_fix.h"
72 73
 #include "../../core/kemi.h"
73 74
 #include "../../core/timer_proc.h"
... ...
@@ -165,6 +166,7 @@ int pres_startup_mode = 1;
165 166
 str pres_xavp_cfg = {0};
166 167
 int pres_retrieve_order = 0;
167 168
 str pres_retrieve_order_by = str_init("priority");
169
+int pres_enable_dmq = 0;
168 170
 
169 171
 int db_table_lock_type = 1;
170 172
 db_locking_t db_table_lock = DB_LOCKING_WRITE;
... ...
@@ -174,6 +176,8 @@ int *pres_notifier_id = NULL;
174 176
 int phtable_size= 9;
175 177
 phtable_t* pres_htable=NULL;
176 178
 
179
+sruid_t pres_sruid;
180
+
177 181
 static cmd_export_t cmds[]=
178 182
 {
179 183
 	{"handle_publish",        (cmd_function)w_handle_publish,        0,
... ...
@@ -233,7 +237,8 @@ static param_export_t params[]={
233 237
 	{ "retrieve_order",         PARAM_INT, &pres_retrieve_order},
234 238
 	{ "retrieve_order_by",      PARAM_STR, &pres_retrieve_order_by},
235 239
 	{ "sip_uri_match",          PARAM_INT, &pres_uri_match},
236
-    { "cseq_offset",            PARAM_INT, &pres_cseq_offset},
240
+	{ "cseq_offset",            PARAM_INT, &pres_cseq_offset},
241
+	{ "enable_dmq",             PARAM_INT, &pres_enable_dmq},
237 242
 	{0,0,0}
238 243
 };
239 244
 
... ...
@@ -293,6 +298,10 @@ static int mod_init(void)
293 298
 		return 0;
294 299
 	}
295 300
 
301
+	if(sruid_init(&pres_sruid, '-', "pres", SRUID_INC) < 0) {
302
+		return -1;
303
+	}
304
+
296 305
 	if(expires_offset<0)
297 306
 		expires_offset = 0;
298 307
 
... ...
@@ -463,6 +472,11 @@ static int mod_init(void)
463 472
 	if (goto_on_notify_reply>=0 && event_rt.rlist[goto_on_notify_reply]==0)
464 473
 		goto_on_notify_reply=-1; /* disable */
465 474
 
475
+	if (pres_enable_dmq>0 && pres_dmq_initialize()!=0) {
476
+		LM_ERR("failed to initialize dmq integration\n");
477
+		return -1;
478
+	}
479
+
466 480
 	return 0;
467 481
 }
468 482
 
... ...
@@ -479,6 +493,10 @@ static int child_init(int rank)
479 493
 	if(library_mode)
480 494
 		return 0;
481 495
 
496
+	if(sruid_init(&pres_sruid, '-', "pres", SRUID_INC) < 0) {
497
+		return -1;
498
+	}
499
+
482 500
 	if (rank == PROC_MAIN)
483 501
 	{
484 502
 		int i;
... ...
@@ -34,6 +34,7 @@
34 34
 #include "../../modules/sl/sl.h"
35 35
 #include "../../lib/srdb1/db.h"
36 36
 #include "../../core/parser/parse_from.h"
37
+#include "../../lib/srutils/sruid.h"
37 38
 #include "event_list.h"
38 39
 #include "hash.h"
39 40
 
... ...
@@ -94,10 +95,13 @@ extern int pres_startup_mode;
94 95
 extern str pres_xavp_cfg;
95 96
 extern int pres_retrieve_order;
96 97
 extern str pres_retrieve_order_by;
98
+extern int pres_enable_dmq;
97 99
 
98 100
 extern int phtable_size;
99 101
 extern phtable_t* pres_htable;
100 102
 
103
+extern sruid_t pres_sruid;
104
+
101 105
 extern db_locking_t db_table_lock;
102 106
 
103 107
 int update_watchers_status(str pres_uri, pres_ev_t* ev, str* rules_doc);
104 108
new file mode 100644
... ...
@@ -0,0 +1,489 @@
1
+/**
2
+*
3
+* Copyright (C) 2018 Charles Chance (Sipcentric Ltd)
4
+*
5
+* This file is part of Kamailio, a free SIP server.
6
+*
7
+* Kamailio is free software; you can redistribute it and/or modify
8
+* it under the terms of the GNU General Public License as published by
9
+* the Free Software Foundation; either version 2 of the License, or
10
+* (at your option) any later version
11
+*
12
+* Kamailio is distributed in the hope that it will be useful,
13
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
+* GNU General Public License for more details.
16
+*
17
+* You should have received a copy of the GNU General Public License
18
+* along with this program; if not, write to the Free Software
19
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+*
21
+*/
22
+
23
+#include "presence_dmq.h"
24
+
25
+static str pres_dmq_content_type = str_init("application/json");
26
+static str pres_dmq_200_rpl = str_init("OK");
27
+static str pres_dmq_400_rpl = str_init("Bad Request");
28
+static str pres_dmq_500_rpl = str_init("Server Internal Error");
29
+
30
+static int *pres_dmq_proc_init = 0;
31
+static int *pres_dmq_recv = 0;
32
+
33
+dmq_api_t pres_dmqb;
34
+dmq_peer_t *pres_dmq_peer = NULL;
35
+dmq_resp_cback_t pres_dmq_resp_callback = {&pres_dmq_resp_callback_f, 0};
36
+
37
+int pres_dmq_send_all_presentities();
38
+int pres_dmq_request_sync();
39
+
40
+/**
41
+* @brief add notification peer
42
+*/
43
+int pres_dmq_initialize()
44
+{
45
+	dmq_peer_t not_peer;
46
+
47
+	/* load the DMQ API */
48
+	if(dmq_load_api(&pres_dmqb) != 0) {
49
+		LM_ERR("cannot load dmq api\n");
50
+		return -1;
51
+	} else {
52
+		LM_DBG("loaded dmq api\n");
53
+	}
54
+
55
+	not_peer.callback = pres_dmq_handle_msg;
56
+	not_peer.init_callback = pres_dmq_request_sync;
57
+	not_peer.description.s = "presence";
58
+	not_peer.description.len = 8;
59
+	not_peer.peer_id.s = "presence";
60
+	not_peer.peer_id.len = 8;
61
+	pres_dmq_peer = pres_dmqb.register_dmq_peer(&not_peer);
62
+	if(!pres_dmq_peer) {
63
+		LM_ERR("error in register_dmq_peer\n");
64
+		goto error;
65
+	} else {
66
+		LM_DBG("dmq peer registered\n");
67
+	}
68
+	return 0;
69
+error:
70
+	return -1;
71
+}
72
+
73
+static int pres_dmq_init_proc()
74
+{
75
+	// TODO: tidy up
76
+
77
+	if(!pres_dmq_proc_init) {
78
+		LM_DBG("Initializing pres_dmq_proc_init for pid (%d)\n", my_pid());
79
+		pres_dmq_proc_init = (int *)pkg_malloc(sizeof(int));
80
+		if(!pres_dmq_proc_init) {
81
+			LM_ERR("no more pkg memory\n");
82
+			return -1;
83
+		}
84
+		*pres_dmq_proc_init = 0;
85
+	}
86
+
87
+	if(!pres_dmq_recv) {
88
+		LM_DBG("Initializing pres_dmq_recv for pid (%d)\n", my_pid());
89
+		pres_dmq_recv = (int *)pkg_malloc(sizeof(int));
90
+		if(!pres_dmq_recv) {
91
+			LM_ERR("no more pkg memory\n");
92
+			return -1;
93
+		}
94
+		*pres_dmq_recv = 0;
95
+	}
96
+
97
+	if(pres_sruid.pid == 0) {
98
+		LM_DBG("Initializing pres_sruid for pid (%d)\n", my_pid());
99
+		if(sruid_init(&pres_sruid, '-', "pres", SRUID_INC) < 0) {
100
+			return -1;
101
+		}
102
+	}
103
+
104
+	if(!pa_db) {
105
+		LM_DBG("Initializing presence DB connection for pid (%d)\n", my_pid());
106
+
107
+		if(pa_dbf.init == 0) {
108
+			LM_ERR("dmq_worker_init: database not bound\n");
109
+			return -1;
110
+		}
111
+
112
+		/* Do not pool the connections where possible when running notifier
113
+		* processes. */
114
+		if(pres_notifier_processes > 0 && pa_dbf.init2)
115
+			pa_db = pa_dbf.init2(&db_url, DB_POOLING_NONE);
116
+		else
117
+			pa_db = pa_dbf.init(&db_url);
118
+
119
+		if(!pa_db) {
120
+			LM_ERR("dmq_worker_init: unsuccessful database connection\n");
121
+			return -1;
122
+		}
123
+	}
124
+
125
+	*pres_dmq_proc_init = 1;
126
+
127
+	LM_DBG("process initialization complete\n");
128
+
129
+	return 0;
130
+}
131
+
132
+int pres_dmq_send(str *body, dmq_node_t *node)
133
+{
134
+	if(!pres_dmq_peer) {
135
+		LM_ERR("pres_dmq_peer is null!\n");
136
+		return -1;
137
+	}
138
+	if(node) {
139
+		LM_DBG("sending dmq message ...\n");
140
+		pres_dmqb.send_message(pres_dmq_peer, body, node,
141
+				&pres_dmq_resp_callback, 1, &pres_dmq_content_type);
142
+	} else {
143
+		LM_DBG("sending dmq broadcast...\n");
144
+		pres_dmqb.bcast_message(pres_dmq_peer, body, 0, &pres_dmq_resp_callback,
145
+				1, &pres_dmq_content_type);
146
+	}
147
+	return 0;
148
+}
149
+
150
+/**
151
+ * @brief extract presentity from json object
152
+*/
153
+presentity_t *pres_parse_json_presentity(srjson_t *in)
154
+{
155
+
156
+	int p_expires = 0, p_recv = 0;
157
+	str p_domain = STR_NULL, p_user = STR_NULL, p_etag = STR_NULL,
158
+		p_sender = STR_NULL, p_event_str = STR_NULL;
159
+	srjson_t *p_it;
160
+	pres_ev_t *p_event = NULL;
161
+	presentity_t *presentity = NULL;
162
+
163
+	LM_DBG("extracting presentity\n");
164
+
165
+	for(p_it = in->child; p_it; p_it = p_it->next) {
166
+		if(strcmp(p_it->string, "domain") == 0) {
167
+			p_domain.s = p_it->valuestring;
168
+			p_domain.len = strlen(p_it->valuestring);
169
+		} else if(strcmp(p_it->string, "user") == 0) {
170
+			p_user.s = p_it->valuestring;
171
+			p_user.len = strlen(p_it->valuestring);
172
+		} else if(strcmp(p_it->string, "etag") == 0) {
173
+			p_etag.s = p_it->valuestring;
174
+			p_etag.len = strlen(p_it->valuestring);
175
+		} else if(strcmp(p_it->string, "expires") == 0) {
176
+			p_expires = SRJSON_GET_INT(p_it);
177
+		} else if(strcmp(p_it->string, "recv") == 0) {
178
+			p_recv = SRJSON_GET_INT(p_it);
179
+		} else if(strcmp(p_it->string, "sender") == 0) {
180
+			p_sender.s = p_it->valuestring;
181
+			p_sender.len = strlen(p_it->valuestring);
182
+		} else if(strcmp(p_it->string, "event") == 0) {
183
+			p_event_str.s = p_it->valuestring;
184
+			p_event_str.len = strlen(p_it->valuestring);
185
+			p_event = contains_event(&p_event_str, 0);
186
+			if(!p_event) {
187
+				LM_ERR("unsupported event %s\n", p_it->valuestring);
188
+				return NULL;
189
+			}
190
+		} else {
191
+			LM_ERR("unrecognized field in json object\n");
192
+			return NULL;
193
+		}
194
+	}
195
+
196
+	LM_DBG("building presentity from domain: %.*s, user: %.*s, expires: %d, "
197
+		   "event: "
198
+		   "%.*s, etag: %.*s, sender: %.*s",
199
+			p_domain.len, p_domain.s, p_user.len, p_user.s, p_expires,
200
+			p_event->name.len, p_event->name.s, p_etag.len, p_etag.s,
201
+			p_sender.len, p_sender.s);
202
+
203
+	presentity = new_presentity(
204
+			&p_domain, &p_user, p_expires, p_event, &p_etag, &p_sender);
205
+
206
+	if(!presentity)
207
+		return NULL;
208
+
209
+	if(p_recv > 0)
210
+		presentity->received_time = p_recv;
211
+
212
+	return presentity;
213
+}
214
+
215
+/**
216
+* @brief presence dmq callback
217
+*/
218
+int pres_dmq_handle_msg(
219
+		struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *node)
220
+{
221
+	int content_length = 0, t_new = 0, sent_reply = 0;
222
+	str cur_etag = STR_NULL, body = STR_NULL, p_body = STR_NULL,
223
+		ruid = STR_NULL;
224
+	char *sphere = NULL;
225
+	srjson_doc_t jdoc;
226
+	srjson_t *it = NULL;
227
+	presentity_t *presentity = NULL;
228
+
229
+	pres_dmq_action_t action = PRES_DMQ_NONE;
230
+
231
+	/* received dmq message */
232
+	LM_DBG("dmq message received\n");
233
+
234
+	if(!pres_dmq_proc_init && pres_dmq_init_proc() < 0) {
235
+		return 0;
236
+	}
237
+
238
+	*pres_dmq_recv = 1;
239
+
240
+	if(!msg->content_length) {
241
+		LM_ERR("no content length header found\n");
242
+		goto invalid;
243
+	}
244
+	content_length = get_content_length(msg);
245
+	if(!content_length) {
246
+		LM_DBG("content length is 0\n");
247
+		goto invalid;
248
+	}
249
+
250
+	body.s = get_body(msg);
251
+	body.len = content_length;
252
+
253
+	if(!body.s) {
254
+		LM_ERR("unable to get body\n");
255
+		goto error;
256
+	}
257
+
258
+	/* parse body */
259
+	LM_DBG("body: %.*s\n", body.len, body.s);
260
+
261
+	srjson_InitDoc(&jdoc, NULL);
262
+	jdoc.buf = body;
263
+
264
+	if(jdoc.root == NULL) {
265
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
266
+		if(jdoc.root == NULL) {
267
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
268
+			goto invalid;
269
+		}
270
+	}
271
+
272
+	/* iterate over keys */
273
+	for(it = jdoc.root->child; it; it = it->next) {
274
+		LM_DBG("found field: %s\n", it->string);
275
+		if(strcmp(it->string, "action") == 0) {
276
+			action = SRJSON_GET_INT(it);
277
+		} else if(strcmp(it->string, "presentity") == 0) {
278
+			presentity = pres_parse_json_presentity(it);
279
+			if(!presentity) {
280
+				LM_ERR("failed to construct presentity from json\n");
281
+				goto invalid;
282
+			}
283
+		} else if(strcmp(it->string, "t_new") == 0) {
284
+			t_new = SRJSON_GET_INT(it);
285
+		} else if(strcmp(it->string, "cur_etag") == 0) {
286
+			cur_etag.s = it->valuestring;
287
+			cur_etag.len = strlen(it->valuestring);
288
+		} else if(strcmp(it->string, "sphere") == 0) {
289
+			sphere = it->valuestring;
290
+		} else if(strcmp(it->string, "ruid") == 0) {
291
+			ruid.s = it->valuestring;
292
+			ruid.len = strlen(it->valuestring);
293
+		} else if(strcmp(it->string, "body") == 0) {
294
+			p_body.s = it->valuestring;
295
+			p_body.len = strlen(it->valuestring);
296
+		} else {
297
+			LM_ERR("unrecognized field in json object\n");
298
+			goto invalid;
299
+		}
300
+	}
301
+
302
+	switch(action) {
303
+		case PRES_DMQ_UPDATE_PRESENTITY:
304
+			if(update_presentity(NULL, presentity, &p_body, t_new, &sent_reply,
305
+					   sphere, &cur_etag, &ruid)
306
+					< 0) {
307
+				goto error;
308
+			}
309
+			break;
310
+		case PRES_DMQ_SYNC:
311
+		case PRES_DMQ_NONE:
312
+			break;
313
+	}
314
+
315
+	resp->reason = pres_dmq_200_rpl;
316
+	resp->resp_code = 200;
317
+	goto cleanup;
318
+
319
+invalid:
320
+	resp->reason = pres_dmq_400_rpl;
321
+	resp->resp_code = 400;
322
+	goto cleanup;
323
+
324
+error:
325
+	resp->reason = pres_dmq_500_rpl;
326
+	resp->resp_code = 500;
327
+
328
+cleanup:
329
+	*pres_dmq_recv = 0;
330
+	srjson_DestroyDoc(&jdoc);
331
+	if(presentity)
332
+		pkg_free(presentity);
333
+
334
+	return 0;
335
+}
336
+
337
+
338
+int pres_dmq_request_sync()
339
+{
340
+	srjson_doc_t jdoc;
341
+
342
+	LM_DBG("requesting sync from dmq peers\n");
343
+
344
+	srjson_InitDoc(&jdoc, NULL);
345
+
346
+	jdoc.root = srjson_CreateObject(&jdoc);
347
+	if(jdoc.root == NULL) {
348
+		LM_ERR("cannot create json root\n");
349
+		goto error;
350
+	}
351
+
352
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", PRES_DMQ_SYNC);
353
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
354
+	if(jdoc.buf.s == NULL) {
355
+		LM_ERR("unable to serialize data\n");
356
+		goto error;
357
+	}
358
+	jdoc.buf.len = strlen(jdoc.buf.s);
359
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
360
+	if(pres_dmq_send(&jdoc.buf, 0) != 0) {
361
+		goto error;
362
+	}
363
+
364
+	jdoc.free_fn(jdoc.buf.s);
365
+	jdoc.buf.s = NULL;
366
+	srjson_DestroyDoc(&jdoc);
367
+	return 0;
368
+
369
+error:
370
+	if(jdoc.buf.s != NULL) {
371
+		jdoc.free_fn(jdoc.buf.s);
372
+		jdoc.buf.s = NULL;
373
+	}
374
+	srjson_DestroyDoc(&jdoc);
375
+	return -1;
376
+}
377
+
378
+
379
+int pres_dmq_replicate_presentity(presentity_t *presentity, str *body,
380
+		int t_new, str *cur_etag, char *sphere, str *ruid, dmq_node_t *node)
381
+{
382
+
383
+	srjson_doc_t jdoc;
384
+	srjson_t *p_json;
385
+
386
+	LM_DBG("replicating presentity record - old etag %.*s, new etag %.*s, ruid "
387
+		   "%.*s\n",
388
+			presentity->etag.len, presentity->etag.s, cur_etag->len,
389
+			cur_etag->s, ruid->len, ruid->s);
390
+
391
+	if(!pres_dmq_proc_init && pres_dmq_init_proc() < 0) {
392
+		return -1;
393
+	}
394
+
395
+	if(*pres_dmq_recv) {
396
+		return 0;
397
+	}
398
+
399
+	srjson_InitDoc(&jdoc, NULL);
400
+
401
+	jdoc.root = srjson_CreateObject(&jdoc);
402
+	if(jdoc.root == NULL) {
403
+		LM_ERR("cannot create json root\n");
404
+		goto error;
405
+	}
406
+
407
+	// action
408
+	srjson_AddNumberToObject(
409
+			&jdoc, jdoc.root, "action", PRES_DMQ_UPDATE_PRESENTITY);
410
+	// presentity
411
+	p_json = srjson_CreateObject(&jdoc);
412
+	srjson_AddStrToObject(&jdoc, p_json, "domain", presentity->domain.s,
413
+			presentity->domain.len);
414
+	srjson_AddStrToObject(
415
+			&jdoc, p_json, "user", presentity->user.s, presentity->user.len);
416
+	srjson_AddStrToObject(
417
+			&jdoc, p_json, "etag", presentity->etag.s, presentity->etag.len);
418
+	srjson_AddNumberToObject(&jdoc, p_json, "expires", presentity->expires);
419
+	srjson_AddNumberToObject(&jdoc, p_json, "recv", presentity->received_time);
420
+	if(presentity->sender) {
421
+		srjson_AddStrToObject(&jdoc, p_json, "sender", presentity->sender->s,
422
+				presentity->sender->len);
423
+	}
424
+	srjson_AddStrToObject(&jdoc, p_json, "event", presentity->event->name.s,
425
+			presentity->event->name.len);
426
+	srjson_AddItemToObject(&jdoc, jdoc.root, "presentity", p_json);
427
+	// t_new
428
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "t_new", t_new);
429
+	// cur_etag
430
+	if(cur_etag) {
431
+		srjson_AddStrToObject(
432
+				&jdoc, jdoc.root, "cur_etag", cur_etag->s, cur_etag->len);
433
+	}
434
+	// sphere
435
+	if(sphere) {
436
+		srjson_AddStringToObject(&jdoc, jdoc.root, "sphere", sphere);
437
+	}
438
+	// ruid
439
+	if(ruid) {
440
+		srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ruid->s, ruid->len);
441
+	}
442
+	// body
443
+	if(body) {
444
+		srjson_AddStrToObject(&jdoc, jdoc.root, "body", body->s, body->len);
445
+	}
446
+
447
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
448
+	if(jdoc.buf.s == NULL) {
449
+		LM_ERR("unable to serialize data\n");
450
+		goto error;
451
+	}
452
+	jdoc.buf.len = strlen(jdoc.buf.s);
453
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
454
+	if(pres_dmq_send(&jdoc.buf, node) != 0) {
455
+		goto error;
456
+	}
457
+
458
+	jdoc.free_fn(jdoc.buf.s);
459
+	jdoc.buf.s = NULL;
460
+	srjson_DestroyDoc(&jdoc);
461
+	return 0;
462
+
463
+error:
464
+	if(jdoc.buf.s != NULL) {
465
+		jdoc.free_fn(jdoc.buf.s);
466
+		jdoc.buf.s = NULL;
467
+	}
468
+	srjson_DestroyDoc(&jdoc);
469
+	return -1;
470
+}
471
+
472
+
473
+int pres_dmq_send_all_presentities(dmq_node_t *dmq_node)
474
+{
475
+	// TODO: implement send all presentities
476
+
477
+	return 0;
478
+}
479
+
480
+
481
+/**
482
+* @brief dmq response callback
483
+*/
484
+int pres_dmq_resp_callback_f(
485
+		struct sip_msg *msg, int code, dmq_node_t *node, void *param)
486
+{
487
+	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
488
+	return 0;
489
+}
0 490
new file mode 100644
... ...
@@ -0,0 +1,50 @@
1
+/**
2
+ *
3
+ * Copyright (C) 2018 Charles Chance (Sipcentric Ltd)
4
+ *
5
+ * This file is part of Kamailio, a free SIP server.
6
+ *
7
+ * Kamailio is free software; you can redistribute it and/or modify
8
+ * it under the terms of the GNU General Public License as published by
9
+ * the Free Software Foundation; either version 2 of the License, or
10
+ * (at your option) any later version
11
+ *
12
+ * Kamailio is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License 
18
+ * along with this program; if not, write to the Free Software 
19
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+ */
21
+
22
+#ifndef _PRESENCE_DMQ_H_
23
+#define _PRESENCE_DMQ_H_
24
+
25
+#include "presence.h"
26
+#include "presentity.h"
27
+#include "../dmq/bind_dmq.h"
28
+#include "../../lib/srutils/srjson.h"
29
+#include "../../core/strutils.h"
30
+#include "../../core/parser/msg_parser.h"
31
+#include "../../core/parser/parse_content.h"
32
+
33
+extern dmq_api_t pres_dmqb;
34
+extern dmq_peer_t* pres_dmq_peer;
35
+extern dmq_resp_cback_t pres_dmq_resp_callback;
36
+
37
+typedef enum {
38
+	PRES_DMQ_NONE,
39
+	PRES_DMQ_UPDATE_PRESENTITY,
40
+	PRES_DMQ_SYNC,
41
+} pres_dmq_action_t;
42
+
43
+int pres_dmq_initialize();
44
+int pres_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp,
45
+		dmq_node_t* node);
46
+int pres_dmq_replicate_presentity(presentity_t* presentity, str* body, int new_t, 
47
+		str* cur_etag, char* sphere, str* ruid, dmq_node_t* node);
48
+int pres_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node,
49
+		void* param);
50
+#endif
... ...
@@ -44,6 +44,7 @@
44 44
 #include "publish.h"
45 45
 #include "hash.h"
46 46
 #include "utils_func.h"
47
+#include "presence_dmq.h"
47 48
 
48 49
 
49 50
 /* base priority value (20150101T000000) */
... ...
@@ -425,7 +426,7 @@ int delete_presentity_if_dialog_id_exists(presentity_t* presentity, char* dialog
425 426
 
426 427
 				LM_WARN("Presentity already exists - deleting it\n");
427 428
 
428
-				if(delete_presentity(&old_presentity)<0) {
429
+				if(delete_presentity(&old_presentity, NULL)<0) {
429 430
 					LM_ERR("failed to delete presentity\n");
430 431
 				}
431 432
 
... ...
@@ -555,20 +556,21 @@ int is_dialog_terminated(presentity_t* presentity)
555 556
 }
556 557
 
557 558
 int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
558
-		int new_t, int* sent_reply, char* sphere)
559
+		int new_t, int* sent_reply, char* sphere, str* etag_override, str* ruid)
559 560
 {
560
-	db_key_t query_cols[13], update_keys[9], result_cols[6];
561
-	db_op_t  query_ops[13];
562
-	db_val_t query_vals[13], update_vals[9];
561
+	db_key_t query_cols[14], rquery_cols[2], update_keys[9], result_cols[7];
562
+	db_op_t  query_ops[14], rquery_ops[2];
563
+	db_val_t query_vals[14], rquery_vals[2], update_vals[9];
563 564
 	db1_res_t *result= NULL;
564 565
 	int n_query_cols = 0;
566
+	int n_rquery_cols = 0;
565 567
 	int n_update_cols = 0;
566 568
 	char* dot= NULL;
567 569
 	str etag= {0, 0};
568 570
 	str cur_etag= {0, 0};
569 571
 	str* rules_doc= NULL;
570 572
 	str pres_uri= {0, 0};
571
-	int rez_body_col, rez_sender_col, n_result_cols= 0;
573
+	int rez_body_col, rez_sender_col, rez_ruid_col, n_result_cols= 0;
572 574
 	db_row_t *row = NULL ;
573 575
 	db_val_t *row_vals = NULL;
574 576
 	str old_body, sender;
... ...
@@ -578,6 +580,8 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
578 580
 	int db_record_exists = 0;
579 581
 	int num_watchers = 0;
580 582
 	char *old_dialog_id = NULL, *dialog_id = NULL;
583
+	str cur_ruid= {0, 0};
584
+	str p_ruid = {0, 0};
581 585
 
582 586
 	if (sent_reply) *sent_reply= 0;
583 587
 	if(pres_notifier_processes == 0 && presentity->event->req_auth)
... ...
@@ -628,9 +632,22 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
628 632
 
629 633
 	result_cols[rez_body_col= n_result_cols++] = &str_body_col;
630 634
 	result_cols[rez_sender_col= n_result_cols++] = &str_sender_col;
635
+	result_cols[rez_ruid_col= n_result_cols++] = &str_ruid_col;
631 636
 
632 637
 	if(new_t)
633 638
 	{
639
+		LM_DBG("new presentity with etag %.*s\n", presentity->etag.len, presentity->etag.s);
640
+
641
+		if (ruid) {
642
+			/* use the provided ruid */
643
+			p_ruid = *ruid;
644
+		} else {
645
+			/* generate a new ruid */
646
+			if(sruid_next(&pres_sruid)<0)
647
+				goto error;
648
+			p_ruid = pres_sruid.uid;
649
+		}
650
+
634 651
 		/* insert new record in hash_table */
635 652
 
636 653
 		if ( publ_cache_enabled &&
... ...
@@ -640,6 +657,8 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
640 657
 			goto error;
641 658
 		}
642 659
 
660
+		LM_DBG("new htable record added\n");
661
+
643 662
 		/* insert new record into database */
644 663
 		query_cols[n_query_cols] = &str_sender_col;
645 664
 		query_vals[n_query_cols].type = DB1_STR;
... ...
@@ -672,6 +691,12 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
672 691
 		query_vals[n_query_cols].val.int_val = presentity->priority;
673 692
 		n_query_cols++;
674 693
 
694
+		query_cols[n_query_cols] = &str_ruid_col;
695
+		query_vals[n_query_cols].type = DB1_STR;
696
+		query_vals[n_query_cols].nul = 0;
697
+		query_vals[n_query_cols].val.str_val = p_ruid;
698
+		n_query_cols++;
699
+
675 700
 		if (presentity->expires != -1)
676 701
 		{
677 702
 			/* A real PUBLISH */
... ...
@@ -765,6 +790,21 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
765 790
 	else
766 791
 	{
767 792
 
793
+		LM_DBG("updating existing presentity with etag %.*s\n", presentity->etag.len, presentity->etag.s);
794
+
795
+		if (ruid) {
796
+			p_ruid = *ruid;
797
+
798
+			rquery_cols[n_rquery_cols] = &str_ruid_col;
799
+			rquery_ops[n_rquery_cols] = OP_EQ;
800
+			rquery_vals[n_rquery_cols].type = DB1_STR;
801
+			rquery_vals[n_rquery_cols].nul = 0;
802
+			rquery_vals[n_rquery_cols].val.str_val = p_ruid;
803
+			n_rquery_cols++;
804
+
805
+			// TODO: check for out-of-sequence updates
806
+		}
807
+
768 808
 		if (pa_dbf.use_table(pa_db, &presentity_table) < 0)
769 809
 		{
770 810
 			LM_ERR("unsuccessful sql use table\n");
... ...
@@ -783,8 +823,10 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
783 823
 		if(EVENT_DIALOG_SLA(presentity->event->evp))
784 824
 		{
785 825
 
786
-			if (pa_dbf.query (pa_db, query_cols, query_ops, query_vals,
787
-					result_cols, n_query_cols, n_result_cols, 0, &result) < 0)
826
+			if (pa_dbf.query (pa_db, ruid?rquery_cols:query_cols, 
827
+					ruid?rquery_ops:query_ops, ruid?rquery_vals:query_vals,
828
+					result_cols, ruid?n_rquery_cols:n_query_cols, n_result_cols,
829
+					0, &result) < 0)
788 830
 			{
789 831
 				LM_ERR("unsuccessful sql query\n");
790 832
 				goto error;
... ...
@@ -800,6 +842,19 @@ int update_presentity(struct sip_msg* msg, presentity_t* presentity, str* body,
800 842
 			row = &result->rows[0];
801 843
 			row_vals = ROW_VALUES(row);
802 844
 
845
+			/* store current ruid if we don't already know it */
846
+			if (!p_ruid.s && row_vals[rez_ruid_col].val.string_val) {
847
+				cur_ruid.len = strlen((char *) row_vals[rez_ruid_col].val.string_val);
848
+				cur_ruid.s = (char *) pkg_malloc(sizeof(char) * cur_ruid.len);
849
+				if (!cur_ruid.s)
850
+				{
851
+					LM_ERR("no private memory\n");
852
+					goto error;
853
+				}
854
+				memcpy(cur_ruid.s, (char *) row_vals[rez_ruid_col].val.string_val, cur_ruid.len);
855
+				p_ruid = cur_ruid;
856
+			}
857
+
803 858
 			old_body.s = (char*)row_vals[rez_body_col].val.string_val;
804 859
 			old_body.len = strlen(old_body.s);
805 860
 			if(check_if_dialog(*body, &is_dialog, &dialog_id)< 0)
... ...
@@ -864,8 +919,10 @@ after_dialog_check:
864 919
 
865 920
 			if (!db_record_exists)
866 921
 			{
867
-				if (pa_dbf.query (pa_db, query_cols, query_ops, query_vals,
868
-					result_cols, n_query_cols, n_result_cols, 0, &result) < 0)
922
+				if (pa_dbf.query (pa_db, ruid?rquery_cols:query_cols, 
923
+						ruid?rquery_ops:query_ops, ruid?rquery_vals:query_vals,
924
+						result_cols, ruid?n_rquery_cols:n_query_cols, n_result_cols,
925
+						0, &result) < 0)
869 926
 				{
870 927
 					LM_ERR("unsuccessful sql query\n");
871 928
 					goto error;
... ...
@@ -878,6 +935,22 @@ after_dialog_check:
878 935
 
879 936
 				db_record_exists = 1;
880 937
 
938
+				row = &result->rows[0];
939
+				row_vals = ROW_VALUES(row);
940
+
941
+				/* store current ruid if we don't already know it */
942
+				if (!p_ruid.s && row_vals[rez_ruid_col].val.string_val) {
943
+					cur_ruid.len = strlen((char *) row_vals[rez_ruid_col].val.string_val);
944
+					cur_ruid.s = (char *) pkg_malloc(sizeof(char) * cur_ruid.len);
945
+					if (!cur_ruid.s)
946
+					{
947
+						LM_ERR("no private memory\n");
948
+						goto error;
949
+					}
950
+					memcpy(cur_ruid.s, (char *) row_vals[rez_ruid_col].val.string_val, cur_ruid.len);
951
+					p_ruid = cur_ruid;
952
+				}
953
+
881 954
 				pa_dbf.free_result(pa_db, result);
882 955
 				result = NULL;
883 956
 			}
... ...
@@ -899,7 +972,7 @@ after_dialog_check:
899 972
 
900 973
 				if (num_watchers > 0)
901 974
 				{
902
-					if (mark_presentity_for_delete(presentity) < 0)
975
+					if (mark_presentity_for_delete(presentity, &p_ruid) < 0)
903 976
 					{
904 977
 						LM_ERR("Marking presentities\n");
905 978
 						goto error;
... ...
@@ -917,7 +990,7 @@ after_dialog_check:
917 990
 
918 991
 			if (pres_notifier_processes == 0 || num_watchers == 0)
919 992
 			{
920
-				if (delete_presentity(presentity) < 0)
993
+				if (delete_presentity(presentity, &p_ruid) < 0)
921 994
 				{
922 995
 					LM_ERR("Deleting presentity\n");
923 996
 					goto error;
... ...
@@ -953,8 +1026,15 @@ after_dialog_check:
953 1026
 			goto done;
954 1027
 		}
955 1028
 
956
-		if(presentity->event->etag_not_new== 0)
1029
+		if(presentity->event->etag_not_new== 0 || etag_override)
957 1030
 		{
1031
+			if (etag_override) {
1032
+				/* use the supplied etag */
1033
+				LM_DBG("updating with supplied etag %.*s\n", etag_override->len, etag_override->s);
1034
+				cur_etag = *etag_override;
1035
+				goto after_etag_generation;
1036
+			}
1037
+
958 1038
 			/* generate another etag */
959 1039
 			unsigned int publ_nr;
960 1040
 			str str_publ_nr= {0, 0};
... ...
@@ -988,10 +1068,12 @@ after_dialog_check:
988 1068
 
989 1069
 			cur_etag= etag;
990 1070
 
1071
+after_etag_generation:
1072
+
991 1073
 			update_keys[n_update_cols] = &str_etag_col;
992 1074
 			update_vals[n_update_cols].type = DB1_STR;
993 1075
 			update_vals[n_update_cols].nul = 0;
994
-			update_vals[n_update_cols].val.str_val = etag;
1076
+			update_vals[n_update_cols].val.str_val = cur_etag;
995 1077
 			n_update_cols++;
996 1078
 
997 1079
 		}
... ...
@@ -1068,11 +1150,14 @@ after_dialog_check:
1068 1150
 			n_update_cols++;
1069 1151
 		}
1070 1152
 
1071
-		/* if there is no support for affected_rows and no previous query has been done, do query */
1072
-		if (!pa_dbf.affected_rows && !db_record_exists)
1153
+		/* if there is no support for affected_rows and no previous query has been done,
1154
+		 * or dmq replication is enabled and we don't already know the ruid, do query */
1155
+		if ((!pa_dbf.affected_rows && !db_record_exists) || (pres_enable_dmq > 0 && !p_ruid.s))
1073 1156
 		{
1074
-			if (pa_dbf.query (pa_db, query_cols, query_ops, query_vals,
1075
-					result_cols, n_query_cols, n_result_cols, 0, &result) < 0)
1157
+			if (pa_dbf.query (pa_db, ruid?rquery_cols:query_cols, 
1158
+					ruid?rquery_ops:query_ops, ruid?rquery_vals:query_vals,
1159
+					result_cols, ruid?n_rquery_cols:n_query_cols, n_result_cols,
1160
+					0, &result) < 0)
1076 1161
 			{
1077 1162
 				LM_ERR("unsuccessful sql query\n");
1078 1163
 				goto error;
... ...
@@ -1084,12 +1169,33 @@ after_dialog_check:
1084 1169
 				goto send_412;
1085 1170
 
1086 1171
 			db_record_exists = 1;
1172
+			affected_rows = result->n;
1173
+
1174
+			row = &result->rows[0];
1175
+			row_vals = ROW_VALUES(row);
1176
+
1177
+			/* store current ruid if we don't already know it */
1178
+			if (!p_ruid.s && row_vals[rez_ruid_col].val.string_val) {
1179
+				cur_ruid.len = strlen((char *) row_vals[rez_ruid_col].val.string_val);
1180
+				cur_ruid.s = (char *) pkg_malloc(sizeof(char) * cur_ruid.len);
1181
+				if (!cur_ruid.s)
1182
+				{
1183
+					LM_ERR("no private memory\n");
1184
+					goto error;
1185
+				}
1186
+				memcpy(cur_ruid.s, (char *) row_vals[rez_ruid_col].val.string_val, cur_ruid.len);
1187
+				p_ruid = cur_ruid;
1188
+
1189
+				LM_DBG("existing ruid %.*s\n", p_ruid.len, p_ruid.s);
1190
+			}
1191
+
1087 1192
 			pa_dbf.free_result(pa_db, result);
1088 1193
 			result = NULL;
1089 1194
 		}
1090 1195
 
1091
-		if( pa_dbf.update( pa_db,query_cols, query_ops, query_vals,
1092
-				update_keys, update_vals, n_query_cols, n_update_cols )<0)
1196
+		if (pa_dbf.update (pa_db, ruid?rquery_cols:query_cols, 
1197
+				ruid?rquery_ops:query_ops, ruid?rquery_vals:query_vals,
1198
+				update_keys, update_vals, ruid?n_rquery_cols:n_query_cols, n_update_cols) < 0)
1093 1199
 		{
1094 1200
 			LM_ERR("updating published info in database\n");
1095 1201
 			goto error;
... ...
@@ -1108,7 +1214,7 @@ after_dialog_check:
1108 1214
 
1109 1215
 
1110 1216
 		/*if either affected_rows (if exists) or select query show that there is no line in database*/
1111
-		if ((pa_dbf.affected_rows && !affected_rows) || (!pa_dbf.affected_rows && !db_record_exists))
1217
+		if ((pa_dbf.affected_rows && !affected_rows && !db_record_exists) || (!pa_dbf.affected_rows && !db_record_exists))
1112 1218
 			goto send_412;
1113 1219
 
1114 1220
 		/* send 200OK */
... ...
@@ -1148,6 +1254,15 @@ send_notify:
1148 1254
 	}
1149 1255
 
1150 1256
 done:
1257
+
1258
+	if (pres_enable_dmq>0) {
1259
+		pres_dmq_replicate_presentity(presentity, body, new_t, &cur_etag, sphere, &p_ruid, NULL);
1260
+	}
1261
+
1262
+	if(cur_ruid.s)
1263
+		pkg_free(cur_ruid.s);
1264
+	cur_ruid.s= NULL;
1265
+
1151 1266
 	if(rules_doc)
1152 1267
 	{
1153 1268
 		if(rules_doc->s)
... ...
@@ -1173,7 +1288,12 @@ done:
1173 1288
 
1174 1289
 send_412:
1175 1290
 
1176
-	LM_ERR("No E_Tag match %*s\n", presentity->etag.len, presentity->etag.s);
1291
+	if (!ruid) {
1292
+		LM_ERR("No E_Tag match %*s\n", presentity->etag.len, presentity->etag.s);
1293
+	} else {
1294
+		LM_ERR("No ruid match %*s\n", ruid->len, ruid->s);
1295
+	}
1296
+	
1177 1297
 	if (msg != NULL)
1178 1298
 	{
1179 1299
 		if (slb.freply(msg, 412, &pu_412_rpl) < 0)
... ...
@@ -1198,6 +1318,8 @@ error:
1198 1318
 	if(pres_uri.s) {
1199 1319
 		pkg_free(pres_uri.s);
1200 1320
 	}
1321
+	if(cur_ruid.s)
1322
+		pkg_free(cur_ruid.s);
1201 1323
 
1202 1324
 	if (pa_dbf.abort_transaction) {
1203 1325
 		if (pa_dbf.abort_transaction(pa_db) < 0) {
... ...
@@ -1521,7 +1643,7 @@ error:
1521 1643
 
1522 1644
 }
1523 1645
 
1524
-int mark_presentity_for_delete(presentity_t *pres)
1646
+int mark_presentity_for_delete(presentity_t *pres, str *ruid)
1525 1647
 {
1526 1648
 	db_key_t query_cols[4], result_cols[1], update_cols[3];
1527 1649
 	db_val_t query_vals[4], update_vals[3], *value;
... ...
@@ -1535,7 +1657,7 @@ int mark_presentity_for_delete(presentity_t *pres)
1535 1657
 	if (pres->event->agg_nbody == NULL)
1536 1658
 	{
1537 1659
 		/* Nothing clever to do here... just delete */
1538
-		if (delete_presentity(pres) < 0)
1660
+		if (delete_presentity(pres, NULL) < 0)
1539 1661
 		{
1540 1662
 			LM_ERR("deleting presentity\n");
1541 1663
 			goto error;
... ...
@@ -1549,29 +1671,37 @@ int mark_presentity_for_delete(presentity_t *pres)
1549 1671
 		goto error;
1550 1672
 	}
1551 1673
 
1552
-	query_cols[n_query_cols] = &str_username_col;
1553
-	query_vals[n_query_cols].type = DB1_STR;
1554
-	query_vals[n_query_cols].nul = 0;
1555
-	query_vals[n_query_cols].val.str_val = pres->user;
1556
-	n_query_cols++;
1674
+	if (!ruid) {
1675
+		query_cols[n_query_cols] = &str_username_col;
1676
+		query_vals[n_query_cols].type = DB1_STR;
1677
+		query_vals[n_query_cols].nul = 0;
1678
+		query_vals[n_query_cols].val.str_val = pres->user;
1679
+		n_query_cols++;
1557 1680
 
1558
-	query_cols[n_query_cols] = &str_domain_col;
1559
-	query_vals[n_query_cols].type = DB1_STR;
1560
-	query_vals[n_query_cols].nul = 0;
1561
-	query_vals[n_query_cols].val.str_val = pres->domain;
1562
-	n_query_cols++;
1681
+		query_cols[n_query_cols] = &str_domain_col;
1682
+		query_vals[n_query_cols].type = DB1_STR;
1683
+		query_vals[n_query_cols].nul = 0;
1684
+		query_vals[n_query_cols].val.str_val = pres->domain;
1685
+		n_query_cols++;
1563 1686
 
1564
-	query_cols[n_query_cols] = &str_event_col;
1565
-	query_vals[n_query_cols].type = DB1_STR;
1566
-	query_vals[n_query_cols].nul = 0;
1567
-	query_vals[n_query_cols].val.str_val = pres->event->name;
1568
-	n_query_cols++;
1687
+		query_cols[n_query_cols] = &str_event_col;
1688
+		query_vals[n_query_cols].type = DB1_STR;
1689
+		query_vals[n_query_cols].nul = 0;
1690
+		query_vals[n_query_cols].val.str_val = pres->event->name;
1691
+		n_query_cols++;
1569 1692
 
1570
-	query_cols[n_query_cols] = &str_etag_col;
1571
-	query_vals[n_query_cols].type = DB1_STR;
1572
-	query_vals[n_query_cols].nul = 0;
1573
-	query_vals[n_query_cols].val.str_val = pres->etag;
1574
-	n_query_cols++;
1693
+		query_cols[n_query_cols] = &str_etag_col;
1694
+		query_vals[n_query_cols].type = DB1_STR;
1695
+		query_vals[n_query_cols].nul = 0;
1696
+		query_vals[n_query_cols].val.str_val = pres->etag;
1697
+		n_query_cols++;
1698
+	} else {
1699
+		query_cols[n_query_cols] = &str_ruid_col;
1700
+		query_vals[n_query_cols].type = DB1_STR;
1701
+		query_vals[n_query_cols].nul = 0;
1702
+		query_vals[n_query_cols].val.str_val = *ruid;
1703
+		n_query_cols++;
1704
+	}
1575 1705
 
1576 1706
 	result_cols[0] = &str_body_col;
1577 1707
 
... ...
@@ -1601,7 +1731,7 @@ int mark_presentity_for_delete(presentity_t *pres)
1601 1731
 		 * it anyway */
1602 1732
 		LM_ERR("Found %d presentities - expected 1\n", RES_ROW_N(result));
1603 1733
 
1604
-		if (delete_presentity(pres) < 0)
1734
+		if (delete_presentity(pres, ruid) < 0)
1605 1735
 		{
1606 1736
 			LM_ERR("deleting presentity\n");
1607 1737
 			goto error;
... ...
@@ -1669,7 +1799,7 @@ error:
1669 1799
 	return ret;
1670 1800
 }
1671 1801
 
1672
-int delete_presentity(presentity_t *pres)
1802
+int delete_presentity(presentity_t *pres, str *ruid)
1673 1803
 {
1674 1804
 	db_key_t query_cols[4];
1675 1805
 	db_val_t query_vals[4];
... ...
@@ -1681,29 +1811,37 @@ int delete_presentity(presentity_t *pres)
1681 1811
 		goto error;
1682 1812
 	}
1683 1813
 
1684
-	query_cols[n_query_cols] = &str_username_col;
1685
-	query_vals[n_query_cols].type = DB1_STR;
1686
-	query_vals[n_query_cols].nul = 0;
1687
-	query_vals[n_query_cols].val.str_val = pres->user;
1688
-	n_query_cols++;
1814
+	if (!ruid) {
1815
+		query_cols[n_query_cols] = &str_username_col;
1816
+		query_vals[n_query_cols].type = DB1_STR;
1817
+		query_vals[n_query_cols].nul = 0;
1818
+		query_vals[n_query_cols].val.str_val = pres->user;
1819
+		n_query_cols++;
1689 1820
 
1690
-	query_cols[n_query_cols] = &str_domain_col;
1691
-	query_vals[n_query_cols].type = DB1_STR;
1692
-	query_vals[n_query_cols].nul = 0;
1693
-	query_vals[n_query_cols].val.str_val = pres->domain;
1694
-	n_query_cols++;
1821
+		query_cols[n_query_cols] = &str_domain_col;
1822
+		query_vals[n_query_cols].type = DB1_STR;
1823
+		query_vals[n_query_cols].nul = 0;
1824
+		query_vals[n_query_cols].val.str_val = pres->domain;
1825
+		n_query_cols++;
1695 1826
 
1696
-	query_cols[n_query_cols] = &str_event_col;
1697
-	query_vals[n_query_cols].type = DB1_STR;
1698
-	query_vals[n_query_cols].nul = 0;
1699
-	query_vals[n_query_cols].val.str_val = pres->event->name;
1700
-	n_query_cols++;
1827
+		query_cols[n_query_cols] = &str_event_col;
1828
+		query_vals[n_query_cols].type = DB1_STR;
1829
+		query_vals[n_query_cols].nul = 0;
1830
+		query_vals[n_query_cols].val.str_val = pres->event->name;
1831
+		n_query_cols++;
1701 1832
 
1702
-	query_cols[n_query_cols] = &str_etag_col;
1703
-	query_vals[n_query_cols].type = DB1_STR;
1704
-	query_vals[n_query_cols].nul = 0;
1705
-	query_vals[n_query_cols].val.str_val = pres->etag;
1706
-	n_query_cols++;
1833
+		query_cols[n_query_cols] = &str_etag_col;
1834
+		query_vals[n_query_cols].type = DB1_STR;
1835
+		query_vals[n_query_cols].nul = 0;
1836
+		query_vals[n_query_cols].val.str_val = pres->etag;
1837
+		n_query_cols++;
1838
+	} else {
1839
+		query_cols[n_query_cols] = &str_ruid_col;
1840
+		query_vals[n_query_cols].type = DB1_STR;
1841
+		query_vals[n_query_cols].nul = 0;
1842
+		query_vals[n_query_cols].val.str_val = *ruid;
1843
+		n_query_cols++;		
1844
+	}
1707 1845
 
1708 1846
 	if(pa_dbf.delete(pa_db, query_cols, 0, query_vals, n_query_cols) < 0)
1709 1847
 	{
... ...
@@ -32,7 +32,7 @@
32 32
 #include "../../core/str.h"
33 33
 #include "../../core/parser/msg_parser.h" 
34 34
 #include "event_list.h"
35
-//#include "presence.h"
35
+#include "presence.h"
36 36
 
37 37
 extern char prefix;
38 38
 
... ...
@@ -55,7 +55,7 @@ presentity_t* new_presentity( str* domain,str* user,int expires,
55 55
 
56 56
 /* update presentity in database */
57 57
 int update_presentity(struct sip_msg* msg,presentity_t* p,str* body,int t_new,
58
-		int* sent_reply, char* sphere);
58
+		int* sent_reply, char* sphere, str* etag_override, str* ruid);
59 59
 
60 60
 /* free memory */
61 61
 void free_presentity(presentity_t* p);
... ...
@@ -69,8 +69,8 @@ char* extract_sphere(str body);
69 69
 char* get_sphere(str* pres_uri);
70 70
 typedef char* (*pres_get_sphere_t)(str* pres_uri);
71 71
 
72
-int mark_presentity_for_delete(presentity_t *pres);
73
-int delete_presentity(presentity_t *pres);
72
+int mark_presentity_for_delete(presentity_t *pres, str *ruid);
73
+int delete_presentity(presentity_t *pres, str *ruid);
74 74
 int delete_offline_presentities(str *pres_uri, pres_ev_t *event);
75 75
 
76 76
 #endif
... ...
@@ -157,7 +157,7 @@ void msg_presentity_clean(unsigned int ticks,void *param)
157 157
 
158 158
 			if (pres_force_delete == 1)
159 159
 			{
160
-				if (delete_presentity(&pres) < 0)
160
+				if (delete_presentity(&pres, NULL) < 0)
161 161
 				{
162 162
 					LM_ERR("Deleting presentity\n");
163 163
 					goto error;
... ...
@@ -186,7 +186,7 @@ void msg_presentity_clean(unsigned int ticks,void *param)
186 186
 
187 187
 				if (num_watchers > 0)
188 188
 				{
189
-					if (mark_presentity_for_delete(&pres) < 0)
189
+					if (mark_presentity_for_delete(&pres, NULL) < 0)
190 190
 					{
191 191
 						LM_ERR("Marking presentity\n");
192 192
 						if (pa_dbf.abort_transaction)
... ...
@@ -199,7 +199,7 @@ void msg_presentity_clean(unsigned int ticks,void *param)
199 199
 				}
200 200
 				else
201 201
 				{
202
-					if (delete_presentity(&pres) < 0)
202
+					if (delete_presentity(&pres, NULL) < 0)
203 203
 					{
204 204
 						LM_ERR("Deleting presentity\n");
205 205
 						goto error;
... ...
@@ -494,7 +494,7 @@ int ki_handle_publish_uri(struct sip_msg* msg, str* sender_uri)
494 494
 	}
495 495
 
496 496
 	/* querry the database and update or insert */
497
-	if(update_presentity(msg, presentity, &body, etag_gen, &sent_reply, sphere) <0)
497
+	if(update_presentity(msg, presentity, &body, etag_gen, &sent_reply, sphere, NULL, NULL) <0)
498 498
 	{
499 499
 		LM_ERR("when updating presentity\n");
500 500
 		goto error;
... ...
@@ -635,7 +635,7 @@ int update_hard_presentity(str *pres_uri, pres_ev_t *event, str *file_uri, str *
635 635
 		goto done;
636 636
 	}
637 637
 
638
-	if (update_presentity(NULL, pres, pidf_doc, new_t, NULL, sphere) < 0)
638
+	if (update_presentity(NULL, pres, pidf_doc, new_t, NULL, sphere, NULL, NULL) < 0)
639 639
 	{
640 640
 		LM_ERR("updating presentity\n");
641 641
 		goto done;
... ...
@@ -1,5 +1,5 @@
1 1
 METADATA_COLUMNS
2
-id(int) username(str) domain(str) event(str) etag(str) expires(int) received_time(int) body(str) sender(str) priority(int)
2
+id(int) username(str) domain(str) event(str) etag(str) expires(int) received_time(int) body(str) sender(str) priority(int) ruid(str)
3 3
 METADATA_KEY
4 4
 1 2 3 
5 5
 METADATA_READONLY
... ...
@@ -7,4 +7,4 @@ METADATA_READONLY
7 7
 METADATA_LOGFLAGS
8 8
 0
9 9
 METADATA_DEFAULTS
10
-NIL|NIL|NIL|NIL|NIL|NIL|NIL|NIL|NIL|0
10
+NIL|NIL|NIL|NIL|NIL|NIL|NIL|NIL|NIL|0|NIL
... ...
@@ -91,7 +91,7 @@ pdt|1
91 91
 pl_pipes|
92 92
 pl_pipes|1
93 93
 presentity|
94
-presentity|4
94
+presentity|5
95 95
 pua|
96 96
 pua|7
97 97
 purplemap|
... ...
@@ -9,13 +9,15 @@ CREATE TABLE presentity (
9 9
     body BLOB NOT NULL,
10 10
     sender VARCHAR(128) NOT NULL,
11 11
     priority INTEGER DEFAULT 0 NOT NULL,
12
-    CONSTRAINT presentity_presentity_idx UNIQUE (username, domain, event, etag)
12
+    ruid VARCHAR(64),
13
+    CONSTRAINT presentity_presentity_idx UNIQUE (username, domain, event, etag),
14
+    CONSTRAINT presentity_ruid_idx UNIQUE (ruid)
13 15
 );
14 16
 
15 17
 CREATE INDEX presentity_presentity_expires ON presentity (expires);
16 18
 CREATE INDEX presentity_account_idx ON presentity (username, domain, event);
17 19
 
18
-INSERT INTO version (table_name, table_version) values ('presentity','4');
20
+INSERT INTO version (table_name, table_version) values ('presentity','5');
19 21
 
20 22
 CREATE TABLE active_watchers (
21 23
     id INTEGER PRIMARY KEY NOT NULL,
... ...
@@ -1 +1 @@
1
-id(int,auto) username(string) domain(string) event(string) etag(string) expires(int) received_time(int) body(string) sender(string) priority(int) 
1
+id(int,auto) username(string) domain(string) event(string) etag(string) expires(int) received_time(int) body(string) sender(string) priority(int) ruid(string,null) 
... ...
@@ -40,7 +40,7 @@ mtree:1
40 40
 mtrees:2
41 41
 pdt:1
42 42
 pl_pipes:1
43
-presentity:4
43
+presentity:5
44 44
 pua:7
45 45
 purplemap:1
46 46
 re_grp:1
... ...
@@ -1,6 +1,6 @@
1 1
 {
2 2
   "name": "presentity",
3
-  "version": 4,
3
+  "version": 5,
4 4
   "columns": [
5 5
     "id": {
6 6
       "type": "int",
... ...
@@ -51,6 +51,11 @@
51 51
       "type": "int",
52 52
       "default": 0,
53 53
       "null": false
54
+    },
55
+    "ruid": {
56
+      "type": "string",
57
+      "default": null,
58
+      "null": true
54 59
     }
55 60
   ]
56 61
 }
57