Browse code

dialog: Send initial DMQ-sync only to the node which requested it

Do not broadcast it to all nodes.

Alex Hermann authored on 28/08/2014 12:27:34
Showing 4 changed files
... ...
@@ -73,13 +73,18 @@ error:
73 73
 }
74 74
 
75 75
 
76
-int dlg_dmq_broadcast(str* body) {
76
+int dlg_dmq_send(str* body, dmq_node_t* node) {
77 77
 	if (!dlg_dmq_peer) {
78 78
 		LM_ERR("dlg_dmq_peer is null!\n");
79 79
 		return -1;
80 80
 	}
81
-	LM_DBG("sending broadcast...\n");
82
-	dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type);
81
+	if (node) {
82
+		LM_DBG("sending dmq message ...\n");
83
+		dlg_dmqb.send_message(dlg_dmq_peer, body, node, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type);
84
+	} else {
85
+		LM_DBG("sending dmq broadcast...\n");
86
+		dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type);
87
+	}
83 88
 	return 0;
84 89
 }
85 90
 
... ...
@@ -293,7 +298,7 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dm
293 298
 			break;
294 299
 
295 300
 		case DLG_DMQ_SYNC:
296
-			dmq_send_all_dlgs();
301
+			dmq_send_all_dlgs(dmq_node);
297 302
 			break;
298 303
 
299 304
 		case DLG_DMQ_NONE:
... ...
@@ -343,7 +348,7 @@ int dlg_dmq_request_sync() {
343 348
 	}
344 349
 	jdoc.buf.len = strlen(jdoc.buf.s);
345 350
 	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
346
-	if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
351
+	if (dlg_dmq_send(&jdoc.buf, 0)!=0) {
347 352
 		goto error;
348 353
 	}
349 354
 
... ...
@@ -362,7 +367,7 @@ error:
362 367
 }
363 368
 
364 369
 
365
-int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) {
370
+int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock, dmq_node_t *node ) {
366 371
 
367 372
 	srjson_doc_t jdoc, prof_jdoc;
368 373
 
... ...
@@ -452,7 +457,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl
452 457
 	}
453 458
 	jdoc.buf.len = strlen(jdoc.buf.s);
454 459
 	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
455
-	if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
460
+	if (dlg_dmq_send(&jdoc.buf, node)!=0) {
456 461
 		goto error;
457 462
 	}
458 463
 
... ...
@@ -471,7 +476,7 @@ error:
471 476
 }
472 477
 
473 478
 
474
-int dmq_send_all_dlgs() {
479
+int dmq_send_all_dlgs(dmq_node_t* dmq_node) {
475 480
 	int index;
476 481
 	dlg_entry_t entry;
477 482
 	dlg_cell_t *dlg;
... ...
@@ -485,7 +490,7 @@ int dmq_send_all_dlgs() {
485 490
 
486 491
 		for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
487 492
 			dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
488
-			dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0);
493
+			dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node);
489 494
 		}
490 495
 
491 496
 		dlg_unlock( d_table, &entry);
... ...
@@ -43,6 +43,6 @@ typedef enum {
43 43
 
44 44
 int dlg_dmq_initialize();
45 45
 int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node);
46
-int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock);
46
+int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock, dmq_node_t* node);
47 47
 int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
48 48
 #endif
... ...
@@ -571,7 +571,7 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params *param)
571 571
 
572 572
 done:
573 573
 	if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) {
574
-		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
574
+		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0);
575 575
 	}
576 576
 
577 577
 done_early:
... ...
@@ -730,7 +730,7 @@ static void dlg_on_send(struct cell* t, int type, struct tmcb_params *param)
730 730
 
731 731
 	/* sync over dmq */
732 732
 	if (dlg_enable_dmq) {
733
-		dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1);
733
+		dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1, 0);
734 734
 	}
735 735
 
736 736
 	/* unref by 2: 1 set when adding in tm cb, 1 set by dlg_get_by_iuid() */
... ...
@@ -1394,7 +1394,7 @@ void dlg_onroute(struct sip_msg* req, str *route_params, void *param)
1394 1394
 
1395 1395
 done:
1396 1396
 	if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) {
1397
-		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
1397
+		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0);
1398 1398
 	}
1399 1399
 
1400 1400
 	dlg_release(dlg);
... ...
@@ -1469,7 +1469,7 @@ void dlg_ontimeout(struct dlg_tl *tl)
1469 1469
 	}
1470 1470
 
1471 1471
 	if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) {
1472
-		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
1472
+		dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0);
1473 1473
 	}
1474 1474
 
1475 1475
 	return;
... ...
@@ -399,7 +399,7 @@ inline void destroy_dlg(struct dlg_cell *dlg)
399 399
 	run_dlg_callbacks( DLGCB_DESTROY , dlg, NULL, NULL, DLG_DIR_NONE, 0);
400 400
 
401 401
 	if (dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC))
402
-		dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0);
402
+		dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0, 0);
403 403
 
404 404
 	/* delete the dialog from DB*/
405 405
 	if (dlg_db_mode)