Browse code

dialog: Add syncing via DMQ

Primary purpose is to sync profiles. This implements just that.
Enough of the state is synced to maintain identical profiles counts on
all kamailio instances.

Changed profiles are only synced as the request is sent out, adding
multiple profiles will result in just 1 sync message at the moment
the request is being sent.

It is not possible for the 'other' instances to send in-dialog requests.

Alex Hermann authored on 07/08/2014 15:13:17
Showing 8 changed files
... ...
@@ -86,6 +86,7 @@
86 86
 #include "dlg_profile.h"
87 87
 #include "dlg_var.h"
88 88
 #include "dlg_transfer.h"
89
+#include "dlg_dmq.h"
89 90
 
90 91
 MODULE_VERSION
91 92
 
... ...
@@ -110,6 +111,7 @@ static int db_fetch_rows = 200;
110 111
 int initial_cbs_inscript = 1;
111 112
 int dlg_wait_ack = 1;
112 113
 static int dlg_timer_procs = 0;
114
+int dlg_enable_dmq = 0;
113 115
 
114 116
 int dlg_event_rt[DLG_EVENTRT_MAX];
115 117
 
... ...
@@ -294,6 +296,7 @@ static param_export_t mod_params[]={
294 296
 	{ "ka_interval",           INT_PARAM, &dlg_ka_interval          },
295 297
 	{ "timeout_noreset",       INT_PARAM, &dlg_timeout_noreset      },
296 298
 	{ "timer_procs",           PARAM_INT, &dlg_timer_procs          },
299
+	{ "enable_dmq",            INT_PARAM, &dlg_enable_dmq           },
297 300
 	{ 0,0,0 }
298 301
 };
299 302
 
... ...
@@ -696,6 +699,12 @@ static int mod_init(void)
696 699
 	/* timer process to clean old unconfirmed dialogs */
697 700
 	register_sync_timers(1);
698 701
 
702
+	if (dlg_enable_dmq>0 && dlg_dmq_initialize()!=0) {
703
+		LM_ERR("failed to initialize dmq integration\n");
704
+		return -1;
705
+	}
706
+
707
+
699 708
 	return 0;
700 709
 }
701 710
 
702 711
new file mode 100644
... ...
@@ -0,0 +1,425 @@
1
+/**
2
+* 
3
+* Copyright (C) 2014 Alex Hermann (SpeakUp BV)
4
+* Based on ht_dmq.c Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
5
+*
6
+* This file is part of Kamailio, a free SIP server.
7
+*
8
+* Kamailio is free software; you can redistribute it and/or modify
9
+* it under the terms of the GNU General Public License as published by
10
+* the Free Software Foundation; either version 2 of the License, or
11
+* (at your option) any later version
12
+*
13
+* Kamailio is distributed in the hope that it will be useful,
14
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+* GNU General Public License for more details.
17
+*
18
+* You should have received a copy of the GNU General Public License 
19
+* along with this program; if not, write to the Free Software 
20
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+*
22
+*/
23
+
24
+
25
+#include "dlg_dmq.h"
26
+#include "dlg_hash.h"
27
+#include "dlg_profile.h"
28
+
29
+static str dlg_dmq_content_type = str_init("application/json");
30
+static str dmq_200_rpl  = str_init("OK");
31
+static str dmq_400_rpl  = str_init("Bad Request");
32
+static str dmq_500_rpl  = str_init("Server Internal Error");
33
+
34
+dmq_api_t dlg_dmqb;
35
+dmq_peer_t* dlg_dmq_peer = NULL;
36
+dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0};
37
+
38
+/**
39
+* @brief add notification peer
40
+*/
41
+int dlg_dmq_initialize()
42
+{
43
+	dmq_peer_t not_peer;
44
+
45
+	/* load the DMQ API */
46
+	if (dmq_load_api(&dlg_dmqb)!=0) {
47
+		LM_ERR("cannot load dmq api\n");
48
+		return -1;
49
+	} else {
50
+		LM_DBG("loaded dmq api\n");
51
+	}
52
+
53
+	not_peer.callback = dlg_dmq_handle_msg;
54
+	not_peer.description.s = "dialog";
55
+	not_peer.description.len = 6;
56
+	not_peer.peer_id.s = "dialog";
57
+	not_peer.peer_id.len = 6;
58
+	dlg_dmq_peer = dlg_dmqb.register_dmq_peer(&not_peer);
59
+	if(!dlg_dmq_peer) {
60
+		LM_ERR("error in register_dmq_peer\n");
61
+		goto error;
62
+	} else {
63
+		LM_DBG("dmq peer registered\n");
64
+	}
65
+	return 0;
66
+error:
67
+	return -1;
68
+}
69
+
70
+
71
+int dlg_dmq_broadcast(str* body) {
72
+	if (!dlg_dmq_peer) {
73
+		LM_ERR("dlg_dmq_peer is null!\n");
74
+		return -1;
75
+	}
76
+	LM_DBG("sending broadcast...\n");
77
+	dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type);
78
+	return 0;
79
+}
80
+
81
+
82
+/**
83
+* @brief ht dmq callback
84
+*/
85
+int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
86
+{
87
+	int content_length;
88
+	str body;
89
+	dlg_cell_t *dlg;
90
+	int unref = 0;
91
+	int ret;
92
+	srjson_doc_t jdoc, prof_jdoc;
93
+	srjson_t *it = NULL;
94
+
95
+	dlg_dmq_action_t action = DLG_DMQ_NONE;
96
+	dlg_iuid_t iuid;
97
+	str profiles = {0, 0}, callid = {0, 0};
98
+	str dummy = {0, 0};
99
+	unsigned int init_ts = 0, start_ts = 0, lifetime = 0;
100
+	unsigned int state = 0;
101
+
102
+	/* received dmq message */
103
+	LM_DBG("dmq message received\n");
104
+	
105
+	if(!msg->content_length) {
106
+		LM_ERR("no content length header found\n");
107
+		goto invalid2;
108
+	}
109
+	content_length = get_content_length(msg);
110
+	if(!content_length) {
111
+		LM_DBG("content length is 0\n");
112
+		goto invalid2;
113
+	}
114
+
115
+	body.s = get_body(msg);
116
+	body.len = content_length;
117
+
118
+	if (!body.s) {
119
+		LM_ERR("unable to get body\n");
120
+		goto error;
121
+	}
122
+
123
+	/* parse body */
124
+	LM_DBG("body: %.*s\n", body.len, body.s);	
125
+
126
+	srjson_InitDoc(&jdoc, NULL);
127
+	jdoc.buf = body;
128
+
129
+	if(jdoc.root == NULL) {
130
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
131
+		if(jdoc.root == NULL)
132
+		{
133
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
134
+			goto invalid;
135
+		}
136
+	}
137
+
138
+	for(it=jdoc.root->child; it; it = it->next)
139
+	{
140
+		LM_DBG("found field: %s\n", it->string);
141
+		if (strcmp(it->string, "action")==0) {
142
+			action = it->valueint;
143
+		} else if (strcmp(it->string, "h_entry")==0) {
144
+			iuid.h_entry = it->valueint;
145
+		} else if (strcmp(it->string, "h_id")==0) {
146
+			iuid.h_id = it->valueint;
147
+		} else if (strcmp(it->string, "init_ts")==0) {
148
+			init_ts = it->valueint;
149
+		} else if (strcmp(it->string, "start_ts")==0) {
150
+			start_ts = it->valueint;
151
+		} else if (strcmp(it->string, "state")==0) {
152
+			state = it->valueint;
153
+		} else if (strcmp(it->string, "lifetime")==0) {
154
+			lifetime = it->valueint;
155
+		} else if (strcmp(it->string, "callid")==0) {
156
+			callid.s = it->valuestring;
157
+			callid.len = strlen(callid.s);
158
+		} else if (strcmp(it->string, "profiles")==0) {
159
+			profiles.s = it->valuestring;
160
+			profiles.len = strlen(profiles.s);
161
+		} else {
162
+			LM_ERR("unrecognized field in json object\n");
163
+		}
164
+	}
165
+
166
+	dlg = dlg_get_by_iuid(&iuid);
167
+	if (dlg) {
168
+		LM_DBG("found dialog [%u:%u] at %p\n", iuid.h_entry, iuid.h_id, dlg);
169
+		unref++;
170
+	}
171
+
172
+	switch(action) {
173
+		case DLG_DMQ_UPDATE: 
174
+			LM_DBG("Updating dlg [%u:%u] with callid [%.*s]\n", iuid.h_entry, iuid.h_id,
175
+					callid.len, callid.s);
176
+			if (!dlg) {
177
+				dlg = build_new_dlg(&callid, &dummy, &dummy, &dummy, &dummy);
178
+				if (!dlg) {
179
+					LM_ERR("failed to build new dialog\n");
180
+					goto error;
181
+				}
182
+
183
+				if(dlg->h_entry != iuid.h_entry){
184
+					LM_ERR("inconsistent hash data from peer: "
185
+						"make sure all Kamailio's use the same hash size\n");
186
+					shm_free(dlg);
187
+					goto error;
188
+				}
189
+
190
+				/* link the dialog */
191
+				link_dlg(dlg, 0, 0);
192
+
193
+				/* override generated h_id */
194
+				dlg->h_id = iuid.h_id;
195
+				/* prevent DB sync */
196
+				dlg->dflags &= ~(DLG_FLAG_NEW|DLG_FLAG_CHANGED);
197
+				dlg->init_ts = init_ts;
198
+				dlg->start_ts = start_ts;
199
+			} else {
200
+				/* remove existing profiles */
201
+				if (dlg->profile_links!=NULL) {
202
+					destroy_linkers(dlg->profile_links);
203
+					dlg->profile_links = NULL;
204
+				}
205
+			}
206
+
207
+			/* add profiles */
208
+			if(profiles.s!=NULL) {
209
+				srjson_InitDoc(&prof_jdoc, NULL);
210
+				prof_jdoc.buf = profiles;
211
+				dlg_json_to_profiles(dlg, &prof_jdoc);
212
+				srjson_DestroyDoc(&prof_jdoc);
213
+			}
214
+			if (dlg->state == state) {
215
+				break;
216
+			}
217
+			/* intentional fallthrough */
218
+
219
+		case DLG_DMQ_STATE:
220
+			if (!dlg) {
221
+				LM_ERR("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id);
222
+				goto error;
223
+			}
224
+			LM_DBG("State update dlg [%u:%u] with callid [%.*s] from state [%u] to state [%u]\n", iuid.h_entry, iuid.h_id,
225
+					dlg->callid.len, dlg->callid.s, dlg->state, state);
226
+			switch (state) {
227
+				case DLG_STATE_CONFIRMED:
228
+					dlg->start_ts = start_ts;
229
+					dlg->lifetime = lifetime;
230
+					if (insert_dlg_timer( &dlg->tl, dlg->lifetime ) != 0) {
231
+						LM_CRIT("Unable to insert dlg timer %p [%u:%u]\n",
232
+							dlg, dlg->h_entry, dlg->h_id);
233
+					} else {
234
+						/* dialog pointer inserted in timer list */
235
+						dlg_ref(dlg, 1);
236
+					}
237
+					break;
238
+				case DLG_STATE_DELETED:
239
+					if (dlg->state == DLG_STATE_CONFIRMED) {
240
+						ret = remove_dialog_timer(&dlg->tl);
241
+						if (ret == 0) {
242
+							/* one extra unref due to removal from timer list */
243
+							unref++;
244
+						} else if (ret < 0) {
245
+							LM_CRIT("unable to unlink the timer on dlg %p [%u:%u]\n",
246
+								dlg, dlg->h_entry, dlg->h_id);
247
+						}
248
+					}
249
+					/* prevent DB sync */
250
+					dlg->dflags |= DLG_FLAG_NEW;
251
+					unref++;
252
+					break;
253
+				default:
254
+					LM_ERR("unhandled state update to state %u\n", state);
255
+					dlg_unref(dlg, unref);
256
+					goto error;
257
+			}
258
+			dlg->state = state;
259
+			break;
260
+
261
+		case DLG_DMQ_RM:
262
+			if (!dlg) {
263
+				LM_DBG("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id);
264
+				goto error;
265
+			}
266
+			LM_DBG("Removed dlg [%u:%u] with callid [%.*s] int state [%u]\n", iuid.h_entry, iuid.h_id,
267
+					dlg->callid.len, dlg->callid.s, dlg->state);
268
+			if (dlg->state == DLG_STATE_CONFIRMED) {
269
+				ret = remove_dialog_timer(&dlg->tl);
270
+				if (ret == 0) {
271
+					/* one extra unref due to removal from timer list */
272
+					unref++;
273
+				} else if (ret < 0) {
274
+					LM_CRIT("unable to unlink the timer on dlg %p [%u:%u]\n",
275
+						dlg, dlg->h_entry, dlg->h_id);
276
+				}
277
+			}
278
+			/* prevent DB sync */
279
+			dlg->dflags |= DLG_FLAG_NEW;
280
+			unref++;
281
+			break;
282
+
283
+		case DLG_DMQ_NONE:
284
+			break;
285
+	}
286
+	if (dlg && unref)
287
+		dlg_unref(dlg, unref);
288
+
289
+	srjson_DestroyDoc(&jdoc);
290
+	resp->reason = dmq_200_rpl;
291
+	resp->resp_code = 200;
292
+	return 0;
293
+
294
+invalid:
295
+	srjson_DestroyDoc(&jdoc);
296
+invalid2:
297
+	resp->reason = dmq_400_rpl;
298
+	resp->resp_code = 400;
299
+	return 0;
300
+
301
+error:
302
+	srjson_DestroyDoc(&jdoc);
303
+	resp->reason = dmq_500_rpl;
304
+	resp->resp_code = 500;
305
+	return 0;
306
+}
307
+
308
+
309
+int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) {
310
+
311
+	srjson_doc_t jdoc, prof_jdoc;
312
+
313
+	LM_DBG("replicating action [%d] on [%u:%u] to dmq peers\n", action, dlg->h_entry, dlg->h_id);
314
+
315
+	if (action == DLG_DMQ_UPDATE) {
316
+		if ((dlg->iflags & DLG_IFLAG_DMQ_SYNC) && ((dlg->dflags & DLG_FLAG_CHANGED_PROF) == 0)) {
317
+			LM_DBG("dlg not changed, no sync\n");
318
+			return 1;
319
+		}
320
+	} else if ((dlg->iflags & DLG_IFLAG_DMQ_SYNC) == 0) {
321
+		LM_DBG("dlg not synced, no sync\n");
322
+		return 1;
323
+	}
324
+	if (action == DLG_DMQ_STATE && (dlg->state != DLG_STATE_CONFIRMED && dlg->state != DLG_STATE_DELETED)){
325
+		LM_DBG("not syncing state %u\n", dlg->state);
326
+		return 1;
327
+	}
328
+
329
+	srjson_InitDoc(&jdoc, NULL);
330
+
331
+	jdoc.root = srjson_CreateObject(&jdoc);
332
+	if(jdoc.root==NULL) {
333
+		LM_ERR("cannot create json root\n");
334
+		goto error;
335
+	}
336
+
337
+	if (needlock)
338
+		dlg_lock(d_table, &(d_table->entries[dlg->h_entry]));
339
+
340
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
341
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "h_entry", dlg->h_entry);
342
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "h_id", dlg->h_id);
343
+
344
+	switch(action) {
345
+		case DLG_DMQ_UPDATE:
346
+			dlg->iflags |= DLG_IFLAG_DMQ_SYNC;
347
+			dlg->dflags &= ~DLG_FLAG_CHANGED_PROF;
348
+			srjson_AddNumberToObject(&jdoc, jdoc.root, "init_ts", dlg->init_ts);
349
+			srjson_AddStrToObject(&jdoc, jdoc.root, "callid", dlg->callid.s, dlg->callid.len);
350
+
351
+			if (dlg->profile_links) {
352
+				srjson_InitDoc(&prof_jdoc, NULL);
353
+				dlg_profiles_to_json(dlg, &prof_jdoc);
354
+				if(prof_jdoc.buf.s!=NULL) {
355
+					LM_DBG("adding profiles: [%.*s]\n", prof_jdoc.buf.len, prof_jdoc.buf.s);
356
+					srjson_AddStrToObject(&jdoc, jdoc.root, "profiles",
357
+							prof_jdoc.buf.s, prof_jdoc.buf.len);
358
+					prof_jdoc.free_fn(prof_jdoc.buf.s);
359
+					prof_jdoc.buf.s = NULL;
360
+				}
361
+				srjson_DestroyDoc(&prof_jdoc);
362
+			}
363
+			/* intentional fallthrough */
364
+
365
+		case DLG_DMQ_STATE:
366
+			srjson_AddNumberToObject(&jdoc, jdoc.root, "state", dlg->state);
367
+			switch (dlg->state) {
368
+				case DLG_STATE_CONFIRMED:
369
+					srjson_AddNumberToObject(&jdoc, jdoc.root, "start_ts", dlg->start_ts);
370
+					srjson_AddNumberToObject(&jdoc, jdoc.root, "lifetime", dlg->lifetime);
371
+					break;
372
+				case DLG_STATE_DELETED:
373
+					dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
374
+					break;
375
+				default:
376
+					LM_DBG("not syncing state %u\n", dlg->state);
377
+			}
378
+			break;
379
+
380
+		case DLG_DMQ_RM:
381
+			srjson_AddNumberToObject(&jdoc, jdoc.root, "state", dlg->state);
382
+			dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
383
+			break;
384
+
385
+		case DLG_DMQ_NONE:
386
+			break;
387
+	}
388
+	if (needlock)
389
+		dlg_unlock(d_table, &(d_table->entries[dlg->h_entry]));
390
+
391
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
392
+	if(jdoc.buf.s==NULL) {
393
+		LM_ERR("unable to serialize data\n");
394
+		goto error;
395
+	}
396
+	jdoc.buf.len = strlen(jdoc.buf.s);
397
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
398
+	if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
399
+		goto error;
400
+	}
401
+
402
+	jdoc.free_fn(jdoc.buf.s);
403
+	jdoc.buf.s = NULL;
404
+	srjson_DestroyDoc(&jdoc);
405
+	return 0;
406
+
407
+error:
408
+	if(jdoc.buf.s!=NULL) {
409
+		jdoc.free_fn(jdoc.buf.s);
410
+		jdoc.buf.s = NULL;
411
+	}
412
+	srjson_DestroyDoc(&jdoc);
413
+	return -1;
414
+}
415
+
416
+
417
+/**
418
+* @brief dmq response callback
419
+*/
420
+int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code,
421
+                            dmq_node_t* node, void* param)
422
+{
423
+	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
424
+	return 0;
425
+}
0 426
new file mode 100644
... ...
@@ -0,0 +1,47 @@
1
+/**
2
+ *
3
+ * Copyright (C) 2014 Alex Hermann (SpeakUp BV)
4
+ * Based on ht_dmq.c Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
5
+ *
6
+ * This file is part of Kamailio, a free SIP server.
7
+ *
8
+ * Kamailio is free software; you can redistribute it and/or modify
9
+ * it under the terms of the GNU General Public License as published by
10
+ * the Free Software Foundation; either version 2 of the License, or
11
+ * (at your option) any later version
12
+ *
13
+ * Kamailio is distributed in the hope that it will be useful,
14
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
+ * GNU General Public License for more details.
17
+ *
18
+ * You should have received a copy of the GNU General Public License 
19
+ * along with this program; if not, write to the Free Software 
20
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21
+ */
22
+
23
+#ifndef _DLG_DMQ_H_
24
+#define _DLG_DMQ_H_
25
+
26
+#include "dlg_hash.h"
27
+#include "../dmq/bind_dmq.h"
28
+#include "../../lib/srutils/srjson.h"
29
+#include "../../parser/msg_parser.h"
30
+#include "../../parser/parse_content.h"
31
+
32
+extern dmq_api_t dlg_dmqb;
33
+extern dmq_peer_t* dlg_dmq_peer;
34
+extern dmq_resp_cback_t dlg_dmq_resp_callback;
35
+
36
+typedef enum {
37
+	DLG_DMQ_NONE,
38
+	DLG_DMQ_UPDATE,
39
+	DLG_DMQ_STATE,
40
+	DLG_DMQ_RM,
41
+} dlg_dmq_action_t;
42
+
43
+int dlg_dmq_initialize();
44
+int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
45
+int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock);
46
+int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
47
+#endif
... ...
@@ -75,6 +75,7 @@
75 75
 #include "dlg_db_handler.h"
76 76
 #include "dlg_profile.h"
77 77
 #include "dlg_var.h"
78
+#include "dlg_dmq.h"
78 79
 
79 80
 static str       rr_param;		/*!< record-route parameter for matching */
80 81
 static int       dlg_flag;		/*!< flag for dialog tracking */
... ...
@@ -88,6 +89,7 @@ extern int       initial_cbs_inscript;
88 89
 extern int       dlg_send_bye;
89 90
 extern int       dlg_event_rt[DLG_EVENTRT_MAX];
90 91
 extern int       dlg_wait_ack;
92
+extern int       dlg_enable_dmq;
91 93
 int              spiral_detected = -1;
92 94
 
93 95
 extern struct rr_binds d_rrb;		/*!< binding to record-routing module */
... ...
@@ -457,13 +459,13 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params *param)
457 459
 		/* Set the dialog context so it is available in onreply_route and failure_route*/
458 460
 		set_current_dialog(req, dlg);
459 461
 		dlg_set_ctx_iuid(dlg);
460
-		goto done;
462
+		goto done_early;
461 463
 	}
462 464
 
463 465
 	if (type==TMCB_RESPONSE_FWDED) {
464 466
 		/* The state does not change, but the msg is mutable in this callback*/
465 467
 		run_dlg_callbacks(DLGCB_RESPONSE_FWDED, dlg, req, rpl, DLG_DIR_UPSTREAM, 0);
466
-		goto done;
468
+		goto done_early;
467 469
 	}
468 470
 
469 471
 	if (type & (TMCB_DESTROY|TMCB_ACK_NEG_IN))
... ...
@@ -568,6 +570,11 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params *param)
568 570
 	if (unref) dlg_unref(dlg, unref);
569 571
 
570 572
 done:
573
+	if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state!=old_state) {
574
+		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
575
+	}
576
+
577
+done_early:
571 578
 	/* unref due to dlg_get_by_iuid() */
572 579
 	dlg_release(dlg);
573 580
 	return;
... ...
@@ -701,6 +708,36 @@ static inline int pre_match_parse( struct sip_msg *req, str *callid,
701 708
 }
702 709
 
703 710
 
711
+/*!
712
+ * \brief Sync dialog from tm callback (another wrapper)
713
+ * \param t transaction, unused
714
+ * \param type type of the entered callback
715
+ * \param param saved dialog structure in the callback
716
+ */
717
+static void dlg_on_send(struct cell* t, int type, struct tmcb_params *param)
718
+{
719
+	dlg_cell_t *dlg = NULL;
720
+	dlg_iuid_t *iuid = NULL;
721
+
722
+	LM_DBG("dialog_on_send CB\n");
723
+	iuid = (dlg_iuid_t*)(*param->param);
724
+	if (iuid==NULL)
725
+		return;
726
+
727
+	dlg = dlg_get_by_iuid(iuid);
728
+	if(dlg==NULL)
729
+		return;
730
+
731
+	/* sync over dmq */
732
+	if (dlg_enable_dmq) {
733
+		dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1);
734
+	}
735
+
736
+	/* unref by 2: 1 set when adding in tm cb, 1 set by dlg_get_by_iuid() */
737
+	dlg_unref(dlg, 1);
738
+}
739
+
740
+
704 741
 /*!
705 742
  * \brief Function that is registered as TM callback and called on requests
706 743
  * \see dlg_new_dialog
... ...
@@ -712,6 +749,7 @@ void dlg_onreq(struct cell* t, int type, struct tmcb_params *param)
712 749
 {
713 750
 	sip_msg_t *req = param->req;
714 751
 	dlg_cell_t *dlg = NULL;
752
+	dlg_iuid_t *iuid = NULL;
715 753
 
716 754
 	if(req->first_line.u.request.method_value != METHOD_INVITE)
717 755
 		return;
... ...
@@ -740,6 +778,20 @@ void dlg_onreq(struct cell* t, int type, struct tmcb_params *param)
740 778
 		_dlg_ctx.t = 1;
741 779
 		dlg_release(dlg);
742 780
 	}
781
+
782
+	iuid = dlg_get_iuid_shm_clone(dlg);
783
+	if(iuid==NULL)
784
+	{
785
+		LM_ERR("failed to create dialog unique id clone\n");
786
+	} else {
787
+		/* register callback for when the request is sent */
788
+		if ( d_tmb.register_tmcb(req, t, TMCB_REQUEST_FWDED,
789
+				dlg_on_send,
790
+				(void*)iuid, dlg_iuid_sfree)<0 ) {
791
+			LM_ERR("failed to register TMCB_REQUEST_FWDED\n");
792
+			shm_free(iuid);
793
+		}
794
+	}
743 795
 }
744 796
 
745 797
 
... ...
@@ -1341,6 +1393,10 @@ void dlg_onroute(struct sip_msg* req, str *route_params, void *param)
1341 1393
 	}
1342 1394
 
1343 1395
 done:
1396
+	if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state!=old_state) {
1397
+		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
1398
+	}
1399
+
1344 1400
 	dlg_release(dlg);
1345 1401
 	return;
1346 1402
 }
... ...
@@ -1412,6 +1468,10 @@ void dlg_ontimeout(struct dlg_tl *tl)
1412 1468
 		dlg_unref(dlg, 1);
1413 1469
 	}
1414 1470
 
1471
+	if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state!=old_state) {
1472
+		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
1473
+	}
1474
+
1415 1475
 	return;
1416 1476
 }
1417 1477
 
... ...
@@ -62,11 +62,13 @@
62 62
 #include "dlg_profile.h"
63 63
 #include "dlg_req_within.h"
64 64
 #include "dlg_db_handler.h"
65
+#include "dlg_dmq.h"
65 66
 
66 67
 #define MAX_LDG_LOCKS  2048
67 68
 #define MIN_LDG_LOCKS  2
68 69
 
69 70
 extern int dlg_ka_interval;
71
+extern int dlg_enable_dmq;
70 72
 
71 73
 /*! global dialog table */
72 74
 struct dlg_table *d_table = 0;
... ...
@@ -373,6 +375,8 @@ inline void destroy_dlg(struct dlg_cell *dlg)
373 375
 
374 376
 	run_dlg_callbacks( DLGCB_DESTROY , dlg, NULL, NULL, DLG_DIR_NONE, 0);
375 377
 
378
+	if (dlg_enable_dmq)
379
+		dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0);
376 380
 
377 381
 	/* delete the dialog from DB*/
378 382
 	if (dlg_db_mode)
... ...
@@ -1052,18 +1056,11 @@ void next_state_dlg(dlg_cell_t *dlg, int event,
1052 1056
 	}
1053 1057
 	*new_state = dlg->state;
1054 1058
 
1055
-	/* remove the dialog from profiles when is not no longer active */
1056
-	if(*new_state==DLG_STATE_DELETED && dlg->profile_links!=NULL
1057
-				&& *old_state!=*new_state) {
1058
-		destroy_linkers(dlg->profile_links);
1059
-		dlg->profile_links = NULL;
1060
-	}
1061
-
1062
-	dlg_unlock( d_table, d_entry);
1063
-
1064 1059
 	LM_DBG("dialog %p changed from state %d to "
1065 1060
 		"state %d, due event %d (ref %d)\n", dlg, *old_state, *new_state, event,
1066 1061
 		dlg->ref);
1062
+
1063
+	dlg_unlock( d_table, d_entry);
1067 1064
 }
1068 1065
 
1069 1066
 /**
... ...
@@ -80,12 +80,14 @@
80 80
 #define DLG_FLAG_DEL           (1<<8) /*!< delete this var */
81 81
 
82 82
 #define DLG_FLAG_TM            (1<<9) /*!< dialog is set in transaction */
83
+#define DLG_FLAG_CHANGED_PROF  (1<<10) /*!< dialog-profiles changed */
83 84
 
84 85
 /* internal flags stored in db */
85 86
 #define DLG_IFLAG_TIMEOUTBYE        (1<<0) /*!< send bye on time-out */
86 87
 #define DLG_IFLAG_KA_SRC            (1<<1) /*!< send keep alive to src */
87 88
 #define DLG_IFLAG_KA_DST            (1<<2) /*!< send keep alive to dst */
88 89
 #define DLG_IFLAG_TIMER_NORESET     (1<<3) /*!< don't reset dialog timers on in-dialog messages reception */
90
+#define DLG_IFLAG_DMQ_SYNC          (1<<4) /*!< sync this dialog via dmq */
89 91
 
90 92
 #define DLG_CALLER_LEG         0 /*!< attribute that belongs to a caller leg */
91 93
 #define DLG_CALLEE_LEG         1 /*!< attribute that belongs to a callee leg */
... ...
@@ -497,6 +497,7 @@ static void link_dlg_profile(struct dlg_profile_link *linker, struct dlg_cell *d
497 497
 		linker->hash_linker.dlg = dlg;
498 498
 	}
499 499
 
500
+	atomic_or_int((volatile int*)&dlg->dflags, DLG_FLAG_CHANGED_PROF);
500 501
 	link_profile(linker, &dlg->callid);
501 502
 }
502 503
 
... ...
@@ -709,6 +710,7 @@ int unset_dlg_profile(sip_msg_t *msg, str *value,
709 710
 			 */
710 711
 		}
711 712
 	}
713
+	atomic_or_int((volatile int*)&dlg->dflags, DLG_FLAG_CHANGED_PROF);
712 714
 	dlg_unlock( d_table, d_entry);
713 715
 	dlg_release(dlg);
714 716
 	return -1;
... ...
@@ -1278,6 +1278,29 @@ modparam("dialog", "timer_procs", 1)
1278 1278
 		</example>
1279 1279
 	</section>
1280 1280
 
1281
+	<section>
1282
+		<title><varname>enable_dmq</varname> (int)</title>
1283
+		<para>
1284
+			If set to 1, the dialog will be synced via dmq.
1285
+			For now, only very basic dialog info is shared, just enough to have synced
1286
+			profiles. Notably, it is not possible to send in-dialog requests on any
1287
+			but the original proxy instance.
1288
+		</para>
1289
+		<para>
1290
+		<emphasis>
1291
+			Default value is <quote>0</quote>.
1292
+		</emphasis>
1293
+		</para>
1294
+		<example>
1295
+		<title>Set <varname>enable_dmq</varname> parameter</title>
1296
+		<programlisting format="linespecific">
1297
+...
1298
+modparam("dialog", "enable_dmq", 1)
1299
+...
1300
+</programlisting>
1301
+		</example>
1302
+	</section>
1303
+
1281 1304
 	</section>
1282 1305
 
1283 1306