Browse code

dmq: fixes for changes in the 678f9c6ad8a8118741a921fcc01f9b23b5702c6e

- init the next field to first notification address str_list_t item
- init local variables to avoid compile warnings on using garbage value
- while condition on server_list instead of address of its ->s field
(which always not null)
- do not iterate using global dmq_notification_address_list variable, it
shifts it till becomes NULL

Daniel-Constantin Mierla authored on 05/04/2021 09:37:06
Showing 1 changed files
... ...
@@ -284,7 +284,7 @@ dmq_node_t *add_server_and_notify(str_list_t *server_list)
284 284
 {
285 285
 	char puri_data[MAXDMQHOSTS * (MAXDMQURILEN + 1)];
286 286
 	char *puri_list[MAXDMQHOSTS];
287
-	dmq_node_t *pfirst, *pnode;
287
+	dmq_node_t *pfirst = NULL, *pnode = NULL;
288 288
 	int host_cnt, index;
289 289
 	sip_uri_t puri[1];
290 290
 	str pstr[1];
... ...
@@ -296,7 +296,7 @@ dmq_node_t *add_server_and_notify(str_list_t *server_list)
296 296
 	**********/
297 297
 
298 298
 	if(!dmq_multi_notify) {
299
-		while (&server_list->s != NULL) {
299
+		while (server_list != NULL) {
300 300
 			LM_DBG("adding notification node %.*s\n",
301 301
 				server_list->s.len, server_list->s.s);
302 302
 			pfirst = add_dmq_node(dmq_node_list, &server_list->s);
... ...
@@ -592,6 +592,7 @@ int notification_resp_callback_f(
592 592
 {
593 593
 	int ret;
594 594
 	int nodes_recv;
595
+	str_list_t *slp;
595 596
 
596 597
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
597 598
 	if(code == 200) {
... ...
@@ -605,15 +606,16 @@ int notification_resp_callback_f(
605 606
 		}
606 607
 	} else if(code == 408) {
607 608
 		/* TODO this probably do not work for dmq_multi_notify */
608
-		while (&dmq_notification_address_list->s != NULL) {
609
-			if(STR_EQ(node->orig_uri, dmq_notification_address_list->s)) {
609
+		slp = dmq_notification_address_list;
610
+		while (slp != NULL) {
611
+			if(STR_EQ(node->orig_uri, slp->s)) {
610 612
 				LM_ERR("not deleting notification peer [%.*s]\n",
611
-					STR_FMT(&dmq_notification_address_list->s));
613
+					STR_FMT(&slp->s));
612 614
 				update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
613 615
 				return 0;
614 616
 			}
615
-			dmq_notification_address_list = dmq_notification_address_list->next;
616
-               }
617
+			slp = slp->next;
618
+		}
617 619
 		if (node->status == DMQ_NODE_DISABLED) {
618 620
 			/* deleting node - the server did not respond */
619 621
 			LM_ERR("deleting server node %.*s because of failed request\n",
Browse code

dmq: add support to specify a notification address multiple times in the cfg

- add support to specify a notification address multiple times in the cfg, e.g.:
- modparam("dmq", "notification_address", "sip:10.0.0.1:5060")
- modparam("dmq", "notification_address", "sip:10.0.0.2:5060") etc..
- this can be used to easily configure multiple notification server
- it is an alternative to the multi_notify mode and do not work together with it

Henning Westerholt authored on 29/03/2021 14:25:18
Showing 1 changed files
... ...
@@ -278,9 +278,9 @@ int get_dmq_host_list(
278 278
 }
279 279
 
280 280
 /**
281
- * @brief add a server node and notify it
281
+ * @brief add one or more server node(s) and notify it
282 282
  */
283
-dmq_node_t *add_server_and_notify(str *paddr)
283
+dmq_node_t *add_server_and_notify(str_list_t *server_list)
284 284
 {
285 285
 	char puri_data[MAXDMQHOSTS * (MAXDMQURILEN + 1)];
286 286
 	char *puri_list[MAXDMQHOSTS];
... ...
@@ -296,7 +296,12 @@ dmq_node_t *add_server_and_notify(str *paddr)
296 296
 	**********/
297 297
 
298 298
 	if(!dmq_multi_notify) {
299
-		pfirst = add_dmq_node(dmq_node_list, paddr);
299
+		while (&server_list->s != NULL) {
300
+			LM_DBG("adding notification node %.*s\n",
301
+				server_list->s.len, server_list->s.s);
302
+			pfirst = add_dmq_node(dmq_node_list, &server_list->s);
303
+			server_list = server_list->next;
304
+		}
300 305
 	} else {
301 306
 		/**********
302 307
 		* o init data area
... ...
@@ -307,7 +312,7 @@ dmq_node_t *add_server_and_notify(str *paddr)
307 312
 		for(index = 0; index < MAXDMQHOSTS; index++) {
308 313
 			puri_list[index] = &puri_data[index * (MAXDMQURILEN + 1)];
309 314
 		}
310
-		if(parse_uri(paddr->s, paddr->len, puri) < 0) {
315
+		if(parse_uri(server_list->s.s, server_list->s.len, puri) < 0) {
311 316
 			/* this is supposed to be good but just in case... */
312 317
 			LM_ERR("add_server_and_notify address invalid\n");
313 318
 			return 0;
... ...
@@ -599,12 +604,16 @@ int notification_resp_callback_f(
599 604
 			run_init_callbacks();
600 605
 		}
601 606
 	} else if(code == 408) {
602
-		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
603
-			LM_ERR("not deleting notification peer [%.*s]\n",
604
-					STR_FMT(&dmq_notification_address));
605
-			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
606
-			return 0;
607
-		}
607
+		/* TODO this probably do not work for dmq_multi_notify */
608
+		while (&dmq_notification_address_list->s != NULL) {
609
+			if(STR_EQ(node->orig_uri, dmq_notification_address_list->s)) {
610
+				LM_ERR("not deleting notification peer [%.*s]\n",
611
+					STR_FMT(&dmq_notification_address_list->s));
612
+				update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
613
+				return 0;
614
+			}
615
+			dmq_notification_address_list = dmq_notification_address_list->next;
616
+               }
608 617
 		if (node->status == DMQ_NODE_DISABLED) {
609 618
 			/* deleting node - the server did not respond */
610 619
 			LM_ERR("deleting server node %.*s because of failed request\n",
Browse code

dmq: log the address of notification peer on failure callback

Daniel-Constantin Mierla authored on 16/03/2021 11:55:46
Showing 1 changed files
... ...
@@ -600,13 +600,14 @@ int notification_resp_callback_f(
600 600
 		}
601 601
 	} else if(code == 408) {
602 602
 		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
603
-			LM_ERR("not deleting notification_peer\n");
604
-			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);	
603
+			LM_ERR("not deleting notification peer [%.*s]\n",
604
+					STR_FMT(&dmq_notification_address));
605
+			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
605 606
 			return 0;
606 607
 		}
607 608
 		if (node->status == DMQ_NODE_DISABLED) {
608 609
 			/* deleting node - the server did not respond */
609
-			LM_ERR("deleting server %.*s because of failed request\n",
610
+			LM_ERR("deleting server node %.*s because of failed request\n",
610 611
 				STR_FMT(&node->orig_uri));
611 612
 			ret = del_dmq_node(dmq_node_list, node);
612 613
 			LM_DBG("del_dmq_node returned %d\n", ret);
Browse code

dmq: new parameter notification_channel

- allow setting the channel id for peer availability notifications
- default is "notification_peer"

Daniel-Constantin Mierla authored on 04/11/2020 10:29:56
Showing 1 changed files
... ...
@@ -32,6 +32,7 @@ dmq_resp_cback_t dmq_notification_resp_callback = {&notification_resp_callback_f
32 32
 
33 33
 int *dmq_init_callback_done = 0;
34 34
 
35
+extern str dmq_notification_channel;
35 36
 
36 37
 /**
37 38
  * @brief add notification peer
... ...
@@ -43,10 +44,8 @@ int add_notification_peer()
43 44
 	memset(&not_peer, 0, sizeof(dmq_peer_t));
44 45
 	not_peer.callback = dmq_notification_callback_f;
45 46
 	not_peer.init_callback = NULL;
46
-	not_peer.description.s = "notification_peer";
47
-	not_peer.description.len = 17;
48
-	not_peer.peer_id.s = "notification_peer";
49
-	not_peer.peer_id.len = 17;
47
+	not_peer.description = dmq_notification_channel;
48
+	not_peer.peer_id = dmq_notification_channel;
50 49
 	dmq_notification_peer = register_dmq_peer(&not_peer);
51 50
 	if(!dmq_notification_peer) {
52 51
 		LM_ERR("error in register_dmq_peer\n");
Browse code

dmq: use module prefix for global variables

- avoid potential conflicts with other globals, given that this module
is used by other modules to perform replication

Daniel-Constantin Mierla authored on 18/05/2020 10:06:05
Showing 1 changed files
... ...
@@ -27,8 +27,8 @@
27 27
 #define MAXDMQURILEN 255
28 28
 #define MAXDMQHOSTS 30
29 29
 
30
-str notification_content_type = str_init("text/plain");
31
-dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
30
+str dmq_notification_content_type = str_init("text/plain");
31
+dmq_resp_cback_t dmq_notification_resp_callback = {&notification_resp_callback_f, 0};
32 32
 
33 33
 int *dmq_init_callback_done = 0;
34 34
 
... ...
@@ -41,7 +41,7 @@ int add_notification_peer()
41 41
 	dmq_peer_t not_peer;
42 42
 
43 43
 	memset(&not_peer, 0, sizeof(dmq_peer_t));
44
-	not_peer.callback = dmq_notification_callback;
44
+	not_peer.callback = dmq_notification_callback_f;
45 45
 	not_peer.init_callback = NULL;
46 46
 	not_peer.description.s = "notification_peer";
47 47
 	not_peer.description.len = 17;
... ...
@@ -53,14 +53,14 @@ int add_notification_peer()
53 53
 		goto error;
54 54
 	}
55 55
 	/* add itself to the node list */
56
-	self_node = add_dmq_node(node_list, &dmq_server_address);
57
-	if(!self_node) {
56
+	dmq_self_node = add_dmq_node(dmq_node_list, &dmq_server_address);
57
+	if(!dmq_self_node) {
58 58
 		LM_ERR("error adding self node\n");
59 59
 		goto error;
60 60
 	}
61 61
 	/* local node - only for self */
62
-	self_node->local = 1;
63
-	self_node->status = DMQ_NODE_ACTIVE;
62
+	dmq_self_node->local = 1;
63
+	dmq_self_node->status = DMQ_NODE_ACTIVE;
64 64
 	return 0;
65 65
 error:
66 66
 	return -1;
... ...
@@ -296,8 +296,8 @@ dmq_node_t *add_server_and_notify(str *paddr)
296 296
 	* o process list
297 297
 	**********/
298 298
 
299
-	if(!multi_notify) {
300
-		pfirst = add_dmq_node(node_list, paddr);
299
+	if(!dmq_multi_notify) {
300
+		pfirst = add_dmq_node(dmq_node_list, paddr);
301 301
 	} else {
302 302
 		/**********
303 303
 		* o init data area
... ...
@@ -319,8 +319,8 @@ dmq_node_t *add_server_and_notify(str *paddr)
319 319
 		for(index = 0; index < host_cnt; index++) {
320 320
 			pstr->s = puri_list[index];
321 321
 			pstr->len = strlen(puri_list[index]);
322
-			if(!find_dmq_node_uri(node_list, pstr)) { // check for duplicates
323
-				pnode = add_dmq_node(node_list, pstr);
322
+			if(!find_dmq_node_uri(dmq_node_list, pstr)) { // check for duplicates
323
+				pnode = add_dmq_node(dmq_node_list, pstr);
324 324
 				if(pnode && !pfirst) {
325 325
 					pfirst = pnode;
326 326
 				}
... ...
@@ -436,11 +436,11 @@ int run_init_callbacks()
436 436
 {
437 437
 	dmq_peer_t *crt;
438 438
 
439
-	if(peer_list == 0) {
439
+	if(dmq_peer_list == 0) {
440 440
 		LM_WARN("peer list is null\n");
441 441
 		return 0;
442 442
 	}
443
-	crt = peer_list->peers;
443
+	crt = dmq_peer_list->peers;
444 444
 	while(crt) {
445 445
 		if(crt->init_callback) {
446 446
 			crt->init_callback();
... ...
@@ -454,7 +454,7 @@ int run_init_callbacks()
454 454
 /**
455 455
  * @brief dmq notification callback
456 456
  */
457
-int dmq_notification_callback(
457
+int dmq_notification_callback_f(
458 458
 		struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node)
459 459
 {
460 460
 	int nodes_recv;
... ...
@@ -474,14 +474,14 @@ int dmq_notification_callback(
474 474
 			maxforwards--;
475 475
 		}
476 476
 	}
477
-	nodes_recv = extract_node_list(node_list, msg);
477
+	nodes_recv = extract_node_list(dmq_node_list, msg);
478 478
 	LM_DBG("received %d new or changed nodes\n", nodes_recv);
479 479
 	response_body = build_notification_body();
480 480
 	if(response_body == NULL) {
481 481
 		LM_ERR("no response body\n");
482 482
 		goto error;
483 483
 	}
484
-	resp->content_type = notification_content_type;
484
+	resp->content_type = dmq_notification_content_type;
485 485
 	resp->reason = dmq_200_rpl;
486 486
 	resp->body = *response_body;
487 487
 	resp->resp_code = 200;
... ...
@@ -490,8 +490,8 @@ int dmq_notification_callback(
490 490
 	if(nodes_recv > 0 && maxforwards > 0) {
491 491
 		/* maxforwards is set to 0 so that the message is will not be in a spiral */
492 492
 		bcast_dmq_message(dmq_notification_peer, response_body, 0,
493
-				&notification_callback, maxforwards,
494
-				&notification_content_type);
493
+				&dmq_notification_resp_callback, maxforwards,
494
+				&dmq_notification_content_type);
495 495
 	}
496 496
 	pkg_free(response_body);
497 497
 	if(dmq_init_callback_done && !*dmq_init_callback_done) {
... ...
@@ -533,8 +533,8 @@ str *build_notification_body()
533 533
 		return NULL;
534 534
 	}
535 535
 	/* we add each server to the body - each on a different line */
536
-	lock_get(&node_list->lock);
537
-	cur_node = node_list->nodes;
536
+	lock_get(&dmq_node_list->lock);
537
+	cur_node = dmq_node_list->nodes;
538 538
 	while(cur_node) {
539 539
 		if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
540 540
 			LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
... ...
@@ -550,11 +550,11 @@ str *build_notification_body()
550 550
 		}
551 551
 		cur_node = cur_node->next;
552 552
 	}
553
-	lock_release(&node_list->lock);
553
+	lock_release(&dmq_node_list->lock);
554 554
 	body->len = clen;
555 555
 	return body;
556 556
 error:
557
-	lock_release(&node_list->lock);
557
+	lock_release(&dmq_node_list->lock);
558 558
 	pkg_free(body->s);
559 559
 	pkg_free(body);
560 560
 	return NULL;
... ...
@@ -573,7 +573,8 @@ int request_nodelist(dmq_node_t *node, int forward)
573 573
 		return -1;
574 574
 	}
575 575
 	ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
576
-			&notification_callback, forward, &notification_content_type, 1);
576
+			&dmq_notification_resp_callback, forward,
577
+			&dmq_notification_content_type, 1);
577 578
 	pkg_free(body->s);
578 579
 	pkg_free(body);
579 580
 	return ret;
... ...
@@ -591,8 +592,8 @@ int notification_resp_callback_f(
591 592
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
592 593
 	if(code == 200) {
593 594
 		/* be sure that the node that answered is in active state */
594
-		update_dmq_node_status(node_list, node, DMQ_NODE_ACTIVE);
595
-		nodes_recv = extract_node_list(node_list, msg);
595
+		update_dmq_node_status(dmq_node_list, node, DMQ_NODE_ACTIVE);
596
+		nodes_recv = extract_node_list(dmq_node_list, msg);
596 597
 		LM_DBG("received %d new or changed nodes\n", nodes_recv);
597 598
 		if(dmq_init_callback_done && !*dmq_init_callback_done) {
598 599
 			*dmq_init_callback_done = 1;
... ...
@@ -601,18 +602,18 @@ int notification_resp_callback_f(
601 602
 	} else if(code == 408) {
602 603
 		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
603 604
 			LM_ERR("not deleting notification_peer\n");
604
-			update_dmq_node_status(node_list, node, DMQ_NODE_PENDING);	
605
+			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);	
605 606
 			return 0;
606 607
 		}
607 608
 		if (node->status == DMQ_NODE_DISABLED) {
608 609
 			/* deleting node - the server did not respond */
609 610
 			LM_ERR("deleting server %.*s because of failed request\n",
610 611
 				STR_FMT(&node->orig_uri));
611
-			ret = del_dmq_node(node_list, node);
612
+			ret = del_dmq_node(dmq_node_list, node);
612 613
 			LM_DBG("del_dmq_node returned %d\n", ret);
613 614
 		} else {
614 615
 			/* put the node in disabled state and wait for the next ping before deleting it */
615
-			update_dmq_node_status(node_list, node, DMQ_NODE_DISABLED);
616
+			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_DISABLED);
616 617
 		}
617 618
 	}
618 619
 	return 0;
Browse code

dmq: wait for a 2nd failed ping before deleting a node

Federico Cabiddu authored on 07/02/2019 10:39:27
Showing 1 changed files
... ...
@@ -412,7 +412,9 @@ int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
412 412
 			update_list->nodes = cur;
413 413
 			update_list->count++;
414 414
 			total_nodes++;
415
-		} else if(!ret->local && find->uri.params.s && ret->status != find->status) {
415
+		} else if(!ret->local && find->uri.params.s && 
416
+					ret->status != find->status && ret->status != DMQ_NODE_DISABLED) {
417
+			/* don't update the node if it is in ending state */
416 418
 			LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri),
417 419
 					ret->status, find->status);
418 420
 			ret->status = find->status;
... ...
@@ -588,6 +590,8 @@ int notification_resp_callback_f(
588 590
 
589 591
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
590 592
 	if(code == 200) {
593
+		/* be sure that the node that answered is in active state */
594
+		update_dmq_node_status(node_list, node, DMQ_NODE_ACTIVE);
591 595
 		nodes_recv = extract_node_list(node_list, msg);
592 596
 		LM_DBG("received %d new or changed nodes\n", nodes_recv);
593 597
 		if(dmq_init_callback_done && !*dmq_init_callback_done) {
... ...
@@ -595,16 +599,21 @@ int notification_resp_callback_f(
595 599
 			run_init_callbacks();
596 600
 		}
597 601
 	} else if(code == 408) {
598
-		/* deleting node - the server did not respond */
599
-		LM_ERR("deleting server %.*s because of failed request\n",
600
-				STR_FMT(&node->orig_uri));
601 602
 		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
602 603
 			LM_ERR("not deleting notification_peer\n");
603
-			update_dmq_node_status(node_list, node, DMQ_NODE_PENDING);
604
+			update_dmq_node_status(node_list, node, DMQ_NODE_PENDING);	
604 605
 			return 0;
605 606
 		}
606
-		ret = del_dmq_node(node_list, node);
607
-		LM_DBG("del_dmq_node returned %d\n", ret);
607
+		if (node->status == DMQ_NODE_DISABLED) {
608
+			/* deleting node - the server did not respond */
609
+			LM_ERR("deleting server %.*s because of failed request\n",
610
+				STR_FMT(&node->orig_uri));
611
+			ret = del_dmq_node(node_list, node);
612
+			LM_DBG("del_dmq_node returned %d\n", ret);
613
+		} else {
614
+			/* put the node in disabled state and wait for the next ping before deleting it */
615
+			update_dmq_node_status(node_list, node, DMQ_NODE_DISABLED);
616
+		}
608 617
 	}
609 618
 	return 0;
610 619
 }
Browse code

dmq: use memcpy() instead of strncpy()

> notification_peer.c: In function 'create_IP_uri':
> notification_peer.c:100:3: warning: 'strncpy' output truncated before terminating nul copying 5 bytes from a string of the same length [-Wstringop-truncation]
> strncpy(plist, "sips:", 5);
> ^~~~~~~~~~~~~~~~~~~~~~~~~~
> notification_peer.c:103:3: warning: 'strncpy' output truncated before terminating nul copying 4 bytes from a string of the same length [-Wstringop-truncation]
> strncpy(plist, "sip:", 4);
> ^~~~~~~~~~~~~~~~~~~~~~~~~

Victor Seva authored on 27/09/2018 11:37:16
Showing 1 changed files
... ...
@@ -97,18 +97,18 @@ int create_IP_uri(char **puri_list, int host_index, char *phost, int hostlen,
97 97
 
98 98
 	plist = puri_list[host_index];
99 99
 	if(puri->type == SIPS_URI_T) {
100
-		strncpy(plist, "sips:", 5);
100
+		memcpy(plist, "sips:", 5);
101 101
 		pos = 5;
102 102
 	} else {
103
-		strncpy(plist, "sip:", 4);
103
+		memcpy(plist, "sip:", 4);
104 104
 		pos = 4;
105 105
 	}
106 106
 	if(puri->user.s) {
107
-		strncpy(&plist[pos], puri->user.s, puri->user.len);
107
+		memcpy(&plist[pos], puri->user.s, puri->user.len);
108 108
 		pos += puri->user.len;
109 109
 		if(puri->passwd.s) {
110 110
 			plist[pos++] = ':';
111
-			strncpy(&plist[pos], puri->passwd.s, puri->passwd.len);
111
+			memcpy(&plist[pos], puri->passwd.s, puri->passwd.len);
112 112
 			pos += puri->passwd.len;
113 113
 		}
114 114
 		plist[pos++] = '@';
... ...
@@ -117,7 +117,7 @@ int create_IP_uri(char **puri_list, int host_index, char *phost, int hostlen,
117 117
 		LM_WARN("%s", perr);
118 118
 		return 0;
119 119
 	}
120
-	strncpy(&plist[pos], phost, hostlen);
120
+	memcpy(&plist[pos], phost, hostlen);
121 121
 	pos += hostlen;
122 122
 	if(puri->port_no) {
123 123
 		if((pos + 6) > MAXDMQURILEN) {
... ...
@@ -133,7 +133,7 @@ int create_IP_uri(char **puri_list, int host_index, char *phost, int hostlen,
133 133
 			return 0;
134 134
 		}
135 135
 		plist[pos++] = ';';
136
-		strncpy(&plist[pos], puri->params.s, puri->params.len);
136
+		memcpy(&plist[pos], puri->params.s, puri->params.len);
137 137
 		pos += puri->params.len;
138 138
 	}
139 139
 	plist[pos] = '\0';
Browse code

dmq: include non-active nodes when requesting initial node list

Charles Chance authored on 25/07/2018 12:58:41
Showing 1 changed files
... ...
@@ -570,8 +570,8 @@ int request_nodelist(dmq_node_t *node, int forward)
570 570
 		LM_ERR("no notification body\n");
571 571
 		return -1;
572 572
 	}
573
-	ret = bcast_dmq_message(dmq_notification_peer, body, NULL,
574
-			&notification_callback, forward, &notification_content_type);
573
+	ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
574
+			&notification_callback, forward, &notification_content_type, 1);
575 575
 	pkg_free(body->s);
576 576
 	pkg_free(body);
577 577
 	return ret;
Browse code

dmq: improve bus stability and reduce unnecessary state transfer

- prevents split cluster in certain scenarios (e.g. GH issue #1349)
- add 'pending' state for new, locally added nodes, until confirmed
- continue to probe nodes marked as inactive/disabled so that they
are eventually removed (but still exclude from normal replication)

Charles Chance authored on 25/07/2018 12:56:59
Showing 1 changed files
... ...
@@ -60,6 +60,7 @@ int add_notification_peer()
60 60
 	}
61 61
 	/* local node - only for self */
62 62
 	self_node->local = 1;
63
+	self_node->status = DMQ_NODE_ACTIVE;
63 64
 	return 0;
64 65
 error:
65 66
 	return -1;
... ...
@@ -411,7 +412,7 @@ int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
411 412
 			update_list->nodes = cur;
412 413
 			update_list->count++;
413 414
 			total_nodes++;
414
-		} else if(find->uri.params.s && ret->status != find->status) {
415
+		} else if(!ret->local && find->uri.params.s && ret->status != find->status) {
415 416
 			LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri),
416 417
 					ret->status, find->status);
417 418
 			ret->status = find->status;
... ...
@@ -533,16 +534,18 @@ str *build_notification_body()
533 534
 	lock_get(&node_list->lock);
534 535
 	cur_node = node_list->nodes;
535 536
 	while(cur_node) {
536
-		LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
537
-		/* body->len - clen - 2 bytes left to write - including the \r\n */
538
-		slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
539
-		if(slen < 0) {
540
-			LM_ERR("cannot build_node_string\n");
541
-			goto error;
537
+		if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
538
+			LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
539
+			/* body->len - clen - 2 bytes left to write - including the \r\n */
540
+			slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
541
+			if(slen < 0) {
542
+				LM_ERR("cannot build_node_string\n");
543
+				goto error;
544
+			}
545
+			clen += slen;
546
+			body->s[clen++] = '\r';
547
+			body->s[clen++] = '\n';
542 548
 		}
543
-		clen += slen;
544
-		body->s[clen++] = '\r';
545
-		body->s[clen++] = '\n';
546 549
 		cur_node = cur_node->next;
547 550
 	}
548 551
 	lock_release(&node_list->lock);
... ...
@@ -597,6 +600,7 @@ int notification_resp_callback_f(
597 600
 				STR_FMT(&node->orig_uri));
598 601
 		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
599 602
 			LM_ERR("not deleting notification_peer\n");
603
+			update_dmq_node_status(node_list, node, DMQ_NODE_PENDING);
600 604
 			return 0;
601 605
 		}
602 606
 		ret = del_dmq_node(node_list, node);
Browse code

dmq: exported functions to kemi framework

Daniel-Constantin Mierla authored on 05/12/2017 12:19:17
Showing 1 changed files
... ...
@@ -24,8 +24,8 @@
24 24
 
25 25
 #include "notification_peer.h"
26 26
 
27
-#define MAXDMQURILEN	255
28
-#define MAXDMQHOSTS	30
27
+#define MAXDMQURILEN 255
28
+#define MAXDMQHOSTS 30
29 29
 
30 30
 str notification_content_type = str_init("text/plain");
31 31
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
... ...
@@ -77,8 +77,8 @@ error:
77 77
 * OUTPUT: 0=unable to create URI
78 78
 **********/
79 79
 
80
-int create_IP_uri (char **puri_list, int host_index, char *phost,
81
-	int hostlen, sip_uri_t *puri)
80
+int create_IP_uri(char **puri_list, int host_index, char *phost, int hostlen,
81
+		sip_uri_t *puri)
82 82
 
83 83
 {
84 84
 	int pos;
... ...
@@ -94,48 +94,48 @@ int create_IP_uri (char **puri_list, int host_index, char *phost,
94 94
 	* o parameters
95 95
 	**********/
96 96
 
97
-	plist = puri_list [host_index];
98
-	if (puri->type == SIPS_URI_T) {
99
-		strncpy (plist, "sips:", 5);
97
+	plist = puri_list[host_index];
98
+	if(puri->type == SIPS_URI_T) {
99
+		strncpy(plist, "sips:", 5);
100 100
 		pos = 5;
101 101
 	} else {
102
-		strncpy (plist, "sip:", 4);
102
+		strncpy(plist, "sip:", 4);
103 103
 		pos = 4;
104 104
 	}
105
-	if (puri->user.s) {
106
-		strncpy (&plist [pos], puri->user.s, puri->user.len);
105
+	if(puri->user.s) {
106
+		strncpy(&plist[pos], puri->user.s, puri->user.len);
107 107
 		pos += puri->user.len;
108
-		if (puri->passwd.s) {
109
-			plist [pos++] = ':';
110
-			strncpy (&plist [pos], puri->passwd.s, puri->passwd.len);
108
+		if(puri->passwd.s) {
109
+			plist[pos++] = ':';
110
+			strncpy(&plist[pos], puri->passwd.s, puri->passwd.len);
111 111
 			pos += puri->passwd.len;
112 112
 		}
113
-		plist [pos++] = '@';
113
+		plist[pos++] = '@';
114 114
 	}
115
-	if ((pos + hostlen) > MAXDMQURILEN) {
116
-		LM_WARN ("%s", perr);
115
+	if((pos + hostlen) > MAXDMQURILEN) {
116
+		LM_WARN("%s", perr);
117 117
 		return 0;
118 118
 	}
119
-	strncpy (&plist [pos], phost, hostlen);
119
+	strncpy(&plist[pos], phost, hostlen);
120 120
 	pos += hostlen;
121
-	if (puri->port_no) {
122
-		if ((pos + 6) > MAXDMQURILEN) {
123
-			LM_WARN ("%s", perr);
121
+	if(puri->port_no) {
122
+		if((pos + 6) > MAXDMQURILEN) {
123
+			LM_WARN("%s", perr);
124 124
 			return 0;
125 125
 		}
126
-		plist [pos++] = ':';
127
-		pos += ushort2sbuf (puri->port_no, &plist [pos], 5);
126
+		plist[pos++] = ':';
127
+		pos += ushort2sbuf(puri->port_no, &plist[pos], 5);
128 128
 	}
129
-	if (puri->params.s) {
130
-		if ((pos + puri->params.len) >= MAXDMQURILEN) {
131
-			LM_WARN ("%s", perr);
129
+	if(puri->params.s) {
130
+		if((pos + puri->params.len) >= MAXDMQURILEN) {
131
+			LM_WARN("%s", perr);
132 132
 			return 0;
133 133
 		}
134
-		plist [pos++] = ';';
135
-		strncpy (&plist [pos], puri->params.s, puri->params.len);
134
+		plist[pos++] = ';';
135
+		strncpy(&plist[pos], puri->params.s, puri->params.len);
136 136
 		pos += puri->params.len;
137 137
 	}
138
-	plist [pos] = '\0';
138
+	plist[pos] = '\0';
139 139
 	return 1;
140 140
 }
141 141
 
... ...
@@ -151,14 +151,14 @@ int create_IP_uri (char **puri_list, int host_index, char *phost,
151 151
 * OUTPUT: number of hosts found
152 152
 **********/
153 153
 
154
-int get_dmq_host_list (char **puri_list, int max_hosts, str *phost,
155
-	sip_uri_t *puri, int bSRV)
154
+int get_dmq_host_list(
155
+		char **puri_list, int max_hosts, str *phost, sip_uri_t *puri, int bSRV)
156 156
 
157 157
 {
158 158
 	int host_cnt, len;
159 159
 	unsigned short origport, port;
160
-	str pstr [1];
161
-	char pname [256], pIP [IP6_MAX_STR_SIZE + 2];
160
+	str pstr[1];
161
+	char pname[256], pIP[IP6_MAX_STR_SIZE + 2];
162 162
 	struct rdata *phead, *prec;
163 163
 	struct srv_rdata *psrv;
164 164
 
... ...
@@ -168,25 +168,25 @@ int get_dmq_host_list (char **puri_list, int max_hosts, str *phost,
168 168
 	* o search SRV?
169 169
 	**********/
170 170
 
171
-	if (str2ip (phost) || str2ip6 (phost)) {
172
-		if (!create_IP_uri (puri_list, 0, phost->s, phost->len, puri)) {
173
-			LM_DBG ("adding DMQ node IP host %.*s=%s\n",
174
-				phost->len, phost->s, puri_list [0]);
171
+	if(str2ip(phost) || str2ip6(phost)) {
172
+		if(!create_IP_uri(puri_list, 0, phost->s, phost->len, puri)) {
173
+			LM_DBG("adding DMQ node IP host %.*s=%s\n", phost->len, phost->s,
174
+					puri_list[0]);
175 175
 			return 0;
176 176
 		}
177 177
 		return 1;
178 178
 	}
179
-	strncpy (pname, phost->s, phost->len);
180
-	pname [phost->len] = '\0';
179
+	strncpy(pname, phost->s, phost->len);
180
+	pname[phost->len] = '\0';
181 181
 	host_cnt = 0;
182
-	if (bSRV) {
182
+	if(bSRV) {
183 183
 		/**********
184 184
 		* get SRV records
185 185
 		**********/
186 186
 
187 187
 		port = puri->port_no;
188
-		phead = get_record (pname, T_SRV, RES_ONLY_TYPE);
189
-		for (prec = phead; prec; prec = prec->next) {
188
+		phead = get_record(pname, T_SRV, RES_ONLY_TYPE);
189
+		for(prec = phead; prec; prec = prec->next) {
190 190
 			/**********
191 191
 			* o matching port?
192 192
 			* o check max
... ...
@@ -195,96 +195,99 @@ int get_dmq_host_list (char **puri_list, int max_hosts, str *phost,
195 195
 			* o restore port
196 196
 			**********/
197 197
 
198
-			psrv = (struct srv_rdata *) prec->rdata;
199
-			if (port && (port != psrv->port))
200
-				{ continue; }
201
-			if (host_cnt == max_hosts) {
202
-				LM_WARN ("notification host count reached max!\n");
203
-				free_rdata_list (phead);
198
+			psrv = (struct srv_rdata *)prec->rdata;
199
+			if(port && (port != psrv->port)) {
200
+				continue;
201
+			}
202
+			if(host_cnt == max_hosts) {
203
+				LM_WARN("notification host count reached max!\n");
204
+				free_rdata_list(phead);
204 205
 				return host_cnt;
205 206
 			}
206 207
 			pstr->s = psrv->name;
207 208
 			pstr->len = psrv->name_len;
208 209
 			origport = puri->port_no;
209 210
 			puri->port_no = psrv->port;
210
-			host_cnt += get_dmq_host_list (&puri_list [host_cnt],
211
-				MAXDMQHOSTS - host_cnt, pstr, puri, 0);
211
+			host_cnt += get_dmq_host_list(&puri_list[host_cnt],
212
+					MAXDMQHOSTS - host_cnt, pstr, puri, 0);
212 213
 			puri->port_no = origport;
213 214
 		}
214
-		if (phead)
215
-			free_rdata_list (phead);
215
+		if(phead)
216
+			free_rdata_list(phead);
216 217
 	}
217 218
 
218 219
 	/**********
219 220
 	* get A records
220 221
 	**********/
221 222
 
222
-	phead = get_record (pname, T_A, RES_ONLY_TYPE);
223
-	for (prec = phead; prec; prec = prec->next) {
223
+	phead = get_record(pname, T_A, RES_ONLY_TYPE);
224
+	for(prec = phead; prec; prec = prec->next) {
224 225
 		/**********
225 226
 		* o check max
226 227
 		* o create URI
227 228
 		**********/
228 229
 
229
-		if (host_cnt == max_hosts) {
230
-			LM_WARN ("notification host count reached max!\n");
231
-			free_rdata_list (phead);
230
+		if(host_cnt == max_hosts) {
231
+			LM_WARN("notification host count reached max!\n");
232
+			free_rdata_list(phead);
232 233
 			return host_cnt;
233 234
 		}
234
-		len = ip4tosbuf (((struct a_rdata *) prec->rdata)->ip,
235
-			pIP, IP4_MAX_STR_SIZE);
236
-		pIP [len] = '\0';
237
-		if (create_IP_uri (puri_list, host_cnt, pIP, len, puri)) {
238
-			LM_DBG ("adding DMQ node A host %s=%s\n", pname, puri_list [host_cnt]);
235
+		len = ip4tosbuf(
236
+				((struct a_rdata *)prec->rdata)->ip, pIP, IP4_MAX_STR_SIZE);
237
+		pIP[len] = '\0';
238
+		if(create_IP_uri(puri_list, host_cnt, pIP, len, puri)) {
239
+			LM_DBG("adding DMQ node A host %s=%s\n", pname,
240
+					puri_list[host_cnt]);
239 241
 			host_cnt++;
240 242
 		}
241 243
 	}
242
-	if (phead)
243
-		free_rdata_list (phead);
244
+	if(phead)
245
+		free_rdata_list(phead);
244 246
 
245 247
 	/**********
246 248
 	* get AAAA records
247 249
 	**********/
248 250
 
249
-	phead = get_record (pname, T_AAAA, RES_ONLY_TYPE);
250
-	for (prec = phead; prec; prec = prec->next) {
251
+	phead = get_record(pname, T_AAAA, RES_ONLY_TYPE);
252
+	for(prec = phead; prec; prec = prec->next) {
251 253
 		/**********
252 254
 		* o check max
253 255
 		* o create URI
254 256
 		**********/
255 257
 
256
-		if (host_cnt == max_hosts) {
257
-			LM_WARN ("notification host count reached max!\n");
258
-			free_rdata_list (phead);
258
+		if(host_cnt == max_hosts) {
259
+			LM_WARN("notification host count reached max!\n");
260
+			free_rdata_list(phead);
259 261
 			return host_cnt;
260 262
 		}
261
-		pIP [0] = '[';
262
-		len = ip6tosbuf (((struct aaaa_rdata *) prec->rdata)->ip6,
263
-			&pIP [1], IP6_MAX_STR_SIZE) + 1;
264
-		pIP [len++] = ']';
265
-		pIP [len] = '\0';
266
-		if (create_IP_uri (puri_list, host_cnt, pIP, len, puri)) {
267
-			LM_DBG
268
-        ("adding DMQ node AAAA host %s=%s\n", pname, puri_list [host_cnt]);
263
+		pIP[0] = '[';
264
+		len = ip6tosbuf(((struct aaaa_rdata *)prec->rdata)->ip6, &pIP[1],
265
+					  IP6_MAX_STR_SIZE)
266
+			  + 1;
267
+		pIP[len++] = ']';
268
+		pIP[len] = '\0';
269
+		if(create_IP_uri(puri_list, host_cnt, pIP, len, puri)) {
270
+			LM_DBG("adding DMQ node AAAA host %s=%s\n", pname,
271
+					puri_list[host_cnt]);
269 272
 			host_cnt++;
270 273
 		}
271 274
 	}
272
-	if (phead)
273
-		free_rdata_list (phead);
275
+	if(phead)
276
+		free_rdata_list(phead);
274 277
 	return host_cnt;
275 278
 }
276 279
 
277 280
 /**
278 281
  * @brief add a server node and notify it
279 282
  */
280
-dmq_node_t* add_server_and_notify(str *paddr)
283
+dmq_node_t *add_server_and_notify(str *paddr)
281 284
 {
282
-	char puri_data [MAXDMQHOSTS * (MAXDMQURILEN + 1)];
283
-	char *puri_list [MAXDMQHOSTS];
285
+	char puri_data[MAXDMQHOSTS * (MAXDMQURILEN + 1)];
286
+	char *puri_list[MAXDMQHOSTS];
284 287
 	dmq_node_t *pfirst, *pnode;
285 288
 	int host_cnt, index;
286
-	sip_uri_t puri [1];
287
-	str pstr [1];
289
+	sip_uri_t puri[1];
290
+	str pstr[1];
288 291
 
289 292
 	/**********
290 293
 	* o init data area
... ...
@@ -292,8 +295,8 @@ dmq_node_t* add_server_and_notify(str *paddr)
292 295
 	* o process list
293 296
 	**********/
294 297
 
295
-	if (!multi_notify) {
296
-		pfirst = add_dmq_node (node_list, paddr);
298
+	if(!multi_notify) {
299
+		pfirst = add_dmq_node(node_list, paddr);
297 300
 	} else {
298 301
 		/**********
299 302
 		* o init data area
... ...
@@ -301,24 +304,25 @@ dmq_node_t* add_server_and_notify(str *paddr)
301 304
 		* o process list
302 305
 		**********/
303 306
 
304
-		for (index = 0; index < MAXDMQHOSTS; index++) {
305
-			puri_list [index] = &puri_data [index * (MAXDMQURILEN + 1)];
307
+		for(index = 0; index < MAXDMQHOSTS; index++) {
308
+			puri_list[index] = &puri_data[index * (MAXDMQURILEN + 1)];
306 309
 		}
307
-		if (parse_uri (paddr->s, paddr->len, puri) < 0) {
310
+		if(parse_uri(paddr->s, paddr->len, puri) < 0) {
308 311
 			/* this is supposed to be good but just in case... */
309
-			LM_ERR ("add_server_and_notify address invalid\n");
312
+			LM_ERR("add_server_and_notify address invalid\n");
310 313
 			return 0;
311 314
 		}
312 315
 		pfirst = NULL;
313 316
 		host_cnt =
314
-			get_dmq_host_list (puri_list, MAXDMQHOSTS, &puri->host, puri, 1);
315
-		for (index = 0; index < host_cnt; index++) {
316
-			pstr->s = puri_list [index];
317
-			pstr->len = strlen (puri_list [index]);
318
-			if (!find_dmq_node_uri(node_list, pstr)) { // check for duplicates
319
-				pnode = add_dmq_node (node_list, pstr);
320
-				if (pnode && !pfirst)
321
-					{ pfirst = pnode; }
317
+				get_dmq_host_list(puri_list, MAXDMQHOSTS, &puri->host, puri, 1);
318
+		for(index = 0; index < host_cnt; index++) {
319
+			pstr->s = puri_list[index];
320
+			pstr->len = strlen(puri_list[index]);
321
+			if(!find_dmq_node_uri(node_list, pstr)) { // check for duplicates
322
+				pnode = add_dmq_node(node_list, pstr);
323
+				if(pnode && !pfirst) {
324
+					pfirst = pnode;
325
+				}
322 326
 			}
323 327
 		}
324 328
 	}
... ...
@@ -328,12 +332,12 @@ dmq_node_t* add_server_and_notify(str *paddr)
328 332
 	* o request node list
329 333
 	**********/
330 334
 
331
-	if (!pfirst) {
332
-		LM_ERR ("error adding notification node\n");
335
+	if(!pfirst) {
336
+		LM_ERR("error adding notification node\n");
333 337
 		return NULL;
334 338
 	}
335
-	if (request_nodelist (pfirst, 2) < 0) {
336
-		LM_ERR ("error requesting initial nodelist\n");
339
+	if(request_nodelist(pfirst, 2) < 0) {
340
+		LM_ERR("error requesting initial nodelist\n");
337 341
 		return NULL;
338 342
 	}
339 343
 	return pfirst;
... ...
@@ -352,7 +356,7 @@ dmq_node_t* add_server_and_notify(str *paddr)
352 356
  * 	sip:host2:port2;param2=value2
353 357
  * 	...
354 358
  */
355
-int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
359
+int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
356 360
 {
357 361
 	int content_length, total_nodes = 0;
358 362
 	str body;
... ...
@@ -361,7 +365,8 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
361 365
 	dmq_node_t *ret, *find;
362 366
 	char *tmp, *end, *match;
363 367
 
364
-	if(!msg->content_length && (parse_headers(msg,HDR_CONTENTLENGTH_F,0)<0 || !msg->content_length)) {
368
+	if(!msg->content_length && (parse_headers(msg, HDR_CONTENTLENGTH_F, 0) < 0
369
+									   || !msg->content_length)) {
365 370
 		LM_ERR("no content length header found\n");
366 371
 		return -1;
367 372
 	}
... ...
@@ -374,7 +379,7 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
374 379
 	body.len = content_length;
375 380
 	tmp = body.s;
376 381
 	end = body.s + body.len;
377
-	
382
+
378 383
 	/* acquire big list lock */
379 384
 	lock_get(&update_list->lock);
380 385
 	while(tmp < end) {
... ...
@@ -392,10 +397,10 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
392 397
 		/* trim the \r, \n and \0's */
393 398
 		trim_r(tmp_uri);
394 399
 		find = build_dmq_node(&tmp_uri, 0);
395
-		if(find==NULL)
400
+		if(find == NULL)
396 401
 			return -1;
397 402
 		ret = find_dmq_node(update_list, find);
398
-		if (!ret) {
403
+		if(!ret) {
399 404
 			LM_DBG("found new node %.*s\n", STR_FMT(&tmp_uri));
400 405
 			cur = build_dmq_node(&tmp_uri, 1);
401 406
 			if(!cur) {
... ...
@@ -406,9 +411,9 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
406 411
 			update_list->nodes = cur;
407 412
 			update_list->count++;
408 413
 			total_nodes++;
409
-		} else if (find->uri.params.s && ret->status != find->status) {
410
-			LM_DBG("updating status on %.*s from %d to %d\n",
411
-				STR_FMT(&tmp_uri), ret->status, find->status);
414
+		} else if(find->uri.params.s && ret->status != find->status) {
415
+			LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri),
416
+					ret->status, find->status);
412 417
 			ret->status = find->status;
413 418
 			total_nodes++;
414 419
 		}
... ...
@@ -424,16 +429,17 @@ error:
424 429
 }
425 430
 
426 431
 
427
-int run_init_callbacks() {
428
-	dmq_peer_t* crt;
432
+int run_init_callbacks()
433
+{
434
+	dmq_peer_t *crt;
429 435
 
430
-	if(peer_list==0) {
436
+	if(peer_list == 0) {
431 437
 		LM_WARN("peer list is null\n");
432 438
 		return 0;
433 439
 	}
434 440
 	crt = peer_list->peers;
435 441
 	while(crt) {
436
-		if (crt->init_callback) {