Browse code

dialog: DMQ-sync dialogs with peers on startup

Use DMQ's init_callback() to request the peers to send all dialogs.

Alex Hermann authored on 26/08/2014 14:26:03
Showing 5 changed files
... ...
@@ -35,6 +35,10 @@ dmq_api_t dlg_dmqb;
35 35
 dmq_peer_t* dlg_dmq_peer = NULL;
36 36
 dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0};
37 37
 
38
+int dmq_send_all_dlgs();
39
+int dlg_dmq_request_sync();
40
+
41
+
38 42
 /**
39 43
 * @brief add notification peer
40 44
 */
... ...
@@ -51,7 +55,7 @@ int dlg_dmq_initialize()
51 55
 	}
52 56
 
53 57
 	not_peer.callback = dlg_dmq_handle_msg;
54
-	not_peer.init_callback = NULL;
58
+	not_peer.init_callback = dlg_dmq_request_sync;
55 59
 	not_peer.description.s = "dialog";
56 60
 	not_peer.description.len = 6;
57 61
 	not_peer.peer_id.s = "dialog";
... ...
@@ -288,6 +292,10 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
288 292
 			unref++;
289 293
 			break;
290 294
 
295
+		case DLG_DMQ_SYNC:
296
+			dmq_send_all_dlgs();
297
+			break;
298
+
291 299
 		case DLG_DMQ_NONE:
292 300
 			break;
293 301
 	}
... ...
@@ -314,6 +322,46 @@ error:
314 322
 }
315 323
 
316 324
 
325
+int dlg_dmq_request_sync() {
326
+	srjson_doc_t jdoc;
327
+
328
+	LM_DBG("requesting sync from dmq peers\n");
329
+
330
+	srjson_InitDoc(&jdoc, NULL);
331
+
332
+	jdoc.root = srjson_CreateObject(&jdoc);
333
+	if(jdoc.root==NULL) {
334
+		LM_ERR("cannot create json root\n");
335
+		goto error;
336
+	}
337
+
338
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC);
339
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
340
+	if(jdoc.buf.s==NULL) {
341
+		LM_ERR("unable to serialize data\n");
342
+		goto error;
343
+	}
344
+	jdoc.buf.len = strlen(jdoc.buf.s);
345
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
346
+	if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
347
+		goto error;
348
+	}
349
+
350
+	jdoc.free_fn(jdoc.buf.s);
351
+	jdoc.buf.s = NULL;
352
+	srjson_DestroyDoc(&jdoc);
353
+	return 0;
354
+
355
+error:
356
+	if(jdoc.buf.s!=NULL) {
357
+		jdoc.free_fn(jdoc.buf.s);
358
+		jdoc.buf.s = NULL;
359
+	}
360
+	srjson_DestroyDoc(&jdoc);
361
+	return -1;
362
+}
363
+
364
+
317 365
 int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) {
318 366
 
319 367
 	srjson_doc_t jdoc, prof_jdoc;
... ...
@@ -391,6 +439,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl
391 439
 			break;
392 440
 
393 441
 		case DLG_DMQ_NONE:
442
+		case DLG_DMQ_SYNC:
394 443
 			break;
395 444
 	}
396 445
 	if (needlock)
... ...
@@ -422,6 +471,30 @@ error:
422 471
 }
423 472
 
424 473
 
474
+int dmq_send_all_dlgs() {
475
+	int index;
476
+	dlg_entry_t entry;
477
+	dlg_cell_t *dlg;
478
+
479
+	LM_DBG("sending all dialogs \n");
480
+
481
+	for(index = 0; index< d_table->size; index++){
482
+		/* lock the whole entry */
483
+		entry = (d_table->entries)[index];
484
+		dlg_lock( d_table, &entry);
485
+
486
+		for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
487
+			dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
488
+			dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0);
489
+		}
490
+
491
+		dlg_unlock( d_table, &entry);
492
+	}
493
+
494
+	return 0;
495
+}
496
+
497
+
425 498
 /**
426 499
 * @brief dmq response callback
427 500
 */
... ...
@@ -38,6 +38,7 @@ typedef enum {
38 38
 	DLG_DMQ_UPDATE,
39 39
 	DLG_DMQ_STATE,
40 40
 	DLG_DMQ_RM,
41
+	DLG_DMQ_SYNC,
41 42
 } dlg_dmq_action_t;
42 43
 
43 44
 int dlg_dmq_initialize();
... ...
@@ -229,7 +229,14 @@ static int mod_init(void)
229 229
 		LM_ERR("error in shm_malloc\n");
230 230
 		return -1;
231 231
 	}
232
-	
232
+
233
+	dmq_init_callback_done = shm_malloc(sizeof(int));
234
+	if (!dmq_init_callback_done) {
235
+		LM_ERR("no more shm\n");
236
+		return -1;
237
+	}
238
+	*dmq_init_callback_done = 0;
239
+
233 240
 	/**
234 241
 	 * add the dmq notification peer.
235 242
 	 * the dmq is a peer itself so that it can receive node notifications
... ...
@@ -29,6 +29,9 @@
29 29
 str notification_content_type = str_init("text/plain");
30 30
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
31 31
 
32
+int *dmq_init_callback_done;
33
+
34
+
32 35
 /**
33 36
  * @brief add notification peer
34 37
  */
... ...
@@ -186,7 +189,6 @@ int run_init_callbacks() {
186 189
  */
187 190
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
188 191
 {
189
-	static int firstrun = 1;
190 192
 	int nodes_recv;
191 193
 	str* response_body = NULL;
192 194
 	int maxforwards = 0;
... ...
@@ -223,9 +225,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
223 225
 				&notification_callback, maxforwards, &notification_content_type);
224 226
 	}
225 227
 	pkg_free(response_body);
226
-	if (firstrun) {
228
+	if (!*dmq_init_callback_done) {
229
+		*dmq_init_callback_done = 1;
227 230
 		run_init_callbacks();
228
-		firstrun = 0;
229 231
 	}
230 232
 	return 0;
231 233
 error:
... ...
@@ -312,8 +314,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code,
312 314
 		dmq_node_t* node, void* param)
313 315
 {
314 316
 	int ret;
317
+	int nodes_recv;
318
+
315 319
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
316
-	if(code == 408) {
320
+	if(code == 200) {
321
+		nodes_recv = extract_node_list(node_list, msg);
322
+		LM_DBG("received %d new or changed nodes\n", nodes_recv);
323
+		if (!*dmq_init_callback_done) {
324
+			*dmq_init_callback_done = 1;
325
+			run_init_callbacks();
326
+		}
327
+	} else if(code == 408) {
317 328
 		/* deleting node - the server did not respond */
318 329
 		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
319 330
 		if (STR_EQ(node->orig_uri, dmq_notification_address)) {
... ...
@@ -34,6 +34,7 @@
34 34
 #include "dmq_funcs.h"
35 35
 
36 36
 extern str notification_content_type;
37
+extern int *dmq_init_callback_done;
37 38
 
38 39
 int add_notification_peer();
39 40
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);