Browse code

dmq_usrloc: always replicate contact updates made locally

- fixes issue reported by shuntongzhang@gmail.com on sr-dev
- we no longer set FL_RPL since it is otherwise meaningless
- dmq replicated contacts are no different now to those from a shared DB

Charles Chance authored on 27/05/2017 10:18:36
Showing 1 changed files
... ...
@@ -37,6 +37,8 @@ static str dmq_200_rpl  = str_init("OK");
37 37
 static str dmq_400_rpl  = str_init("Bad Request");
38 38
 static str dmq_500_rpl  = str_init("Server Internal Error");
39 39
 
40
+static int *usrloc_dmq_recv = 0;
41
+
40 42
 dmq_api_t usrloc_dmqb;
41 43
 dmq_peer_t* usrloc_dmq_peer = NULL;
42 44
 dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0};
... ...
@@ -388,7 +390,6 @@ static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) {
388 390
 	ci.callid = &callid;
389 391
 	ci.cseq = cseq;
390 392
 	ci.flags = flags;
391
-	ci.flags |= FL_RPL;
392 393
 	ci.cflags = cflags;
393 394
 	ci.user_agent = &user_agent;
394 395
 	ci.methods = methods;
... ...
@@ -419,6 +420,17 @@ static int usrloc_dmq_execute_action(srjson_t *jdoc_action, dmq_node_t* node) {
419 420
 	return 1;
420 421
 }
421 422
 
423
+static int init_usrloc_dmq_recv() {
424
+	if (!usrloc_dmq_recv) {
425
+		LM_DBG("Initializing usrloc_dmq_recv for pid (%d)\n", my_pid());
426
+		usrloc_dmq_recv = (int*)pkg_malloc(sizeof(int));
427
+		if (!usrloc_dmq_recv) {
428
+			LM_ERR("no more pkg memory\n");
429
+			return -1;
430
+		}
431
+	}
432
+	return 0;
433
+}
422 434
 
423 435
 /**
424 436
  * @brief ht dmq callback
... ...
@@ -429,6 +441,12 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
429 441
 	str body;
430 442
 	srjson_doc_t jdoc;
431 443
 
444
+	if (!usrloc_dmq_recv && init_usrloc_dmq_recv() < 0) {
445
+		return 0;
446
+	}
447
+
448
+	*usrloc_dmq_recv = 1;
449
+
432 450
 	srjson_InitDoc(&jdoc, NULL);
433 451
 	if (parse_from_header(msg)<0) {
434 452
 		LM_ERR("failed to parse from header\n");
... ...
@@ -481,18 +499,21 @@ int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
481 499
 		if (!usrloc_dmq_execute_action(jdoc.root->child, node)) goto invalid;
482 500
 	}
483 501
 
502
+	*usrloc_dmq_recv = 0;
484 503
 	srjson_DestroyDoc(&jdoc);
485 504
 	resp->reason = dmq_200_rpl;
486 505
 	resp->resp_code = 200;
487 506
 	return 0;
488 507
 
489 508
 invalid:
509
+	*usrloc_dmq_recv = 0;
490 510
 	srjson_DestroyDoc(&jdoc);
491 511
 	resp->reason = dmq_400_rpl;
492 512
 	resp->resp_code = 400;
493 513
 	return 0;
494 514
 
495 515
 error:
516
+	*usrloc_dmq_recv = 0;
496 517
 	srjson_DestroyDoc(&jdoc);
497 518
 	resp->reason = dmq_500_rpl;
498 519
 	resp->resp_code = 500;
... ...
@@ -619,10 +640,6 @@ int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node
619 640
 	srjson_doc_t *jdoc = &jdoc_contact_group.jdoc;
620 641
 	srjson_t *jdoc_contacts = jdoc_contact_group.jdoc_contacts;
621 642
 
622
-	int flags;
623
-	flags = ptr->flags;
624
-	flags &= ~FL_RPL;
625
-
626 643
 	srjson_t * jdoc_contact = srjson_CreateObject(jdoc);
627 644
 	if(!jdoc_contact) {
628 645
 		LM_ERR("cannot create json root\n");
... ...
@@ -654,8 +671,8 @@ int usrloc_dmq_send_multi_contact(ucontact_t* ptr, str aor, int action, dmq_node
654 671
 	jdoc_contact_group.size += snprintf(NULL,0,"%.0lf",(double)ptr->expires);
655 672
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "cseq", ptr->cseq);
656 673
 	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cseq);
657
-	srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", flags);
658
-	jdoc_contact_group.size += snprintf(NULL,0,"%d", flags);
674
+	srjson_AddNumberToObject(jdoc, jdoc_contact, "flags", ptr->flags);
675
+	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->flags);
659 676
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "cflags", ptr->cflags);
660 677
 	jdoc_contact_group.size += snprintf(NULL,0,"%d", ptr->cflags);
661 678
 	srjson_AddNumberToObject(jdoc, jdoc_contact, "q", ptr->q);
... ...
@@ -688,17 +705,12 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no
688 705
 	srjson_doc_t jdoc;
689 706
 	srjson_InitDoc(&jdoc, NULL);
690 707
 
691
-	int flags;
692
-
693 708
 	jdoc.root = srjson_CreateObject(&jdoc);
694 709
 	if(!jdoc.root) {
695 710
 		LM_ERR("cannot create json root\n");
696 711
 		goto error;
697 712
 	}
698 713
 
699
-	flags = ptr->flags;
700
-	flags &= ~FL_RPL;
701
-
702 714
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
703 715
 
704 716
 	srjson_AddStrToObject(&jdoc, jdoc.root, "aor", aor.s, aor.len);
... ...
@@ -711,7 +723,7 @@ int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* no
711 723
 	srjson_AddStrToObject(&jdoc, jdoc.root, "instance", ptr->instance.s, ptr->instance.len);
712 724
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "expires", ptr->expires);
713 725
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "cseq", ptr->cseq);
714
-	srjson_AddNumberToObject(&jdoc, jdoc.root, "flags", flags);
726
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "flags", ptr->flags);
715 727
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "cflags", ptr->cflags);
716 728
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "q", ptr->q);
717 729
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "last_modified", ptr->last_modified);
... ...
@@ -759,8 +771,12 @@ void dmq_ul_cb_contact(ucontact_t* ptr, int type, void* param)
759 771
 	aor.s = ptr->aor->s;
760 772
 	aor.len = ptr->aor->len;
761 773
 
762
-	if (!(ptr->flags & FL_RPL)) {
774
+	if (!usrloc_dmq_recv && init_usrloc_dmq_recv() < 0) {
775
+		return;
776
+	}
763 777
 
778
+	if (!*usrloc_dmq_recv) {
779
+		LM_DBG("Replicating local update to other nodes...\n");
764 780
 		switch(type){
765 781
 			case UL_CONTACT_INSERT:
766 782
 				usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0);