Browse code

dmq: add support to specify a notification address multiple times in the cfg

- add support to specify a notification address multiple times in the cfg, e.g.:
- modparam("dmq", "notification_address", "sip:10.0.0.1:5060")
- modparam("dmq", "notification_address", "sip:10.0.0.2:5060") etc..
- this can be used to easily configure multiple notification server
- it is an alternative to the multi_notify mode and do not work together with it

Henning Westerholt authored on 29/03/2021 14:25:18
Showing 6 changed files
... ...
@@ -60,10 +60,11 @@ str dmq_server_address = {0, 0};
60 60
 str dmq_server_socket = {0, 0};
61 61
 sip_uri_t dmq_server_uri = {0};
62 62
 
63
-str dmq_notification_address = {0, 0};
63
+str_list_t *dmq_notification_address_list = NULL;
64
+static str_list_t *dmq_tmp_list = NULL;
64 65
 str dmq_notification_channel = str_init("notification_peer");
65 66
 int dmq_multi_notify = 0;
66
-sip_uri_t dmq_notification_uri = {0};
67
+static sip_uri_t dmq_notification_uri = {0};
67 68
 int dmq_ping_interval = 60;
68 69
 
69 70
 /* TM bind */
... ...
@@ -79,6 +80,9 @@ dmq_peer_list_t *dmq_peer_list = 0;
79 80
 dmq_node_list_t *dmq_node_list = NULL;
80 81
 /* dmq module is a peer itself for receiving notifications regarding nodes */
81 82
 dmq_peer_t *dmq_notification_peer = NULL;
83
+/* add notification servers */
84
+static int dmq_add_notification_address(modparam_t type, void * val);
85
+
82 86
 
83 87
 /** module functions */
84 88
 static int mod_init(void);
... ...
@@ -110,7 +114,7 @@ static param_export_t params[] = {
110 114
 	{"num_workers", INT_PARAM, &dmq_num_workers},
111 115
 	{"ping_interval", INT_PARAM, &dmq_ping_interval},
112 116
 	{"server_address", PARAM_STR, &dmq_server_address},
113
-	{"notification_address", PARAM_STR, &dmq_notification_address},
117
+	{"notification_address", PARAM_STR|USE_FUNC_PARAM, dmq_add_notification_address},
114 118
 	{"notification_channel", PARAM_STR, &dmq_notification_channel},
115 119
 	{"multi_notify", INT_PARAM, &dmq_multi_notify},
116 120
 	{"worker_usleep", INT_PARAM, &dmq_worker_usleep},
... ...
@@ -221,13 +225,6 @@ static int mod_init(void)
221 225
 		return -1;
222 226
 	}
223 227
 
224
-	if(parse_uri(dmq_notification_address.s, dmq_notification_address.len,
225
-			   &dmq_notification_uri)
226
-			< 0) {
227
-		LM_ERR("notification address invalid\n");
228
-		return -1;
229
-	}
230
-
231 228
 	/* create socket string out of the server_uri */
232 229
 	if(make_socket_str_from_uri(&dmq_server_uri, &dmq_server_socket) < 0) {
233 230
 		LM_ERR("failed to create socket out of server_uri\n");
... ...
@@ -327,12 +324,13 @@ static int child_init(int rank)
327 324
 		 * the module MUST have this parameter if the Kamailio instance is not
328 325
 		 * a master in this architecture
329 326
 		 */
330
-		if(dmq_notification_address.s) {
327
+		if(dmq_notification_address_list != NULL) {
331 328
 			dmq_notification_node =
332
-					add_server_and_notify(&dmq_notification_address);
329
+					add_server_and_notify(dmq_notification_address_list);
333 330
 			if(!dmq_notification_node) {
334
-				LM_WARN("cannot retrieve initial nodelist from %.*s\n",
335
-						STR_FMT(&dmq_notification_address));
331
+				LM_WARN("cannot retrieve initial nodelist, first list entry %.*s\n",
332
+						STR_FMT(&dmq_notification_address_list->s));
333
+
336 334
 			}
337 335
 		}
338 336
 	}
... ...
@@ -347,7 +345,7 @@ static int child_init(int rank)
347 345
 static void destroy(void)
348 346
 {
349 347
 	/* TODO unregister dmq node, free resources */
350
-	if(dmq_notification_address.s && dmq_notification_node && dmq_self_node) {
348
+	if(dmq_notification_address_list && dmq_notification_node && dmq_self_node) {
351 349
 		LM_DBG("unregistering node %.*s\n", STR_FMT(&dmq_self_node->orig_uri));
352 350
 		dmq_self_node->status = DMQ_NODE_DISABLED;
353 351
 		request_nodelist(dmq_notification_node, 1);
... ...
@@ -360,6 +358,44 @@ static void destroy(void)
360 358
 	}
361 359
 }
362 360
 
361
+static int dmq_add_notification_address(modparam_t type, void * val)
362
+{
363
+	str tmp_str;
364
+	tmp_str.s = ((str*) val)->s;
365
+	tmp_str.len = ((str*) val)->len;
366
+	int total_list = 0; /* not used */
367
+
368
+	if(val==NULL) {
369
+		LM_ERR("invalid notification address parameter value\n");
370
+		return -1;
371
+	}
372
+	if(parse_uri(tmp_str.s,  tmp_str.len, &dmq_notification_uri) < 0) {
373
+		LM_ERR("could not parse notification address\n");
374
+		return -1;
375
+	}
376
+
377
+	/* initial allocation */
378
+	if (dmq_notification_address_list == 0) {
379
+		dmq_notification_address_list = pkg_malloc(sizeof(str_list_t));
380
+		if (dmq_notification_address_list == NULL) {
381
+			PKG_MEM_ERROR;
382
+			return -1;
383
+		}
384
+		dmq_tmp_list = dmq_notification_address_list;
385
+		dmq_tmp_list->s = tmp_str;
386
+	} else {
387
+		dmq_tmp_list = append_str_list(tmp_str.s, tmp_str.len, &dmq_tmp_list, &total_list);
388
+		if (dmq_tmp_list == NULL) {
389
+			LM_ERR("could not append to list\n");
390
+			return -1;
391
+		}
392
+		LM_DBG("added new notification address to the list %.*s\n",
393
+			dmq_tmp_list->s.len, dmq_tmp_list->s.s);
394
+	}
395
+	return 0;
396
+}
397
+
398
+
363 399
 static void dmq_rpc_list_nodes(rpc_t *rpc, void *c)
364 400
 {
365 401
 	void *h;
... ...
@@ -27,6 +27,7 @@
27 27
 #include "../../core/dprint.h"
28 28
 #include "../../core/error.h"
29 29
 #include "../../core/sr_module.h"
30
+#include "../../core/str_list.h"
30 31
 #include "../../modules/tm/tm_load.h"
31 32
 #include "../../core/parser/parse_uri.h"
32 33
 #include "../../modules/sl/sl.h"
... ...
@@ -45,9 +46,8 @@ extern dmq_peer_list_t *dmq_peer_list;
45 46
 extern str dmq_request_method;
46 47
 extern str dmq_server_socket;
47 48
 extern sip_uri_t dmq_server_uri;
48
-extern str dmq_notification_address;
49
+extern str_list_t *dmq_notification_address_list;
49 50
 extern int dmq_multi_notify;
50
-extern sip_uri_t dmq_notification_uri;
51 51
 /* sl and tm */
52 52
 extern struct tm_binds tmb;
53 53
 extern sl_api_t slb;
... ...
@@ -526,12 +526,12 @@ void ping_servers(unsigned int ticks, void *param)
526 526
 		LM_DBG("node list is empty - attempt to rebuild from notification "
527 527
 			   "address\n");
528 528
 		*dmq_init_callback_done = 0;
529
-		if(dmq_notification_address.s) {
529
+		if(dmq_notification_address_list) {
530 530
 			dmq_notification_node =
531
-					add_server_and_notify(&dmq_notification_address);
531
+					add_server_and_notify(dmq_notification_address_list);
532 532
 			if(!dmq_notification_node) {
533
-				LM_ERR("cannot retrieve initial nodelist from %.*s\n",
534
-						STR_FMT(&dmq_notification_address));
533
+				LM_ERR("cannot retrieve initial nodelist, first list entry%.*s\n",
534
+						STR_FMT(&dmq_notification_address_list->s));
535 535
 			}
536 536
 		} else {
537 537
 			LM_ERR("no notification address");
... ...
@@ -141,6 +141,9 @@ modparam("dmq", "server_address", "sip:10.0.0.20:5061;transport=tls")
141 141
 		<title><varname>notification_address</varname>(str)</title>
142 142
 		<para>
143 143
 		The address of another DMQ node from which the local node should retrieve initial information about all other nodes.
144
+		This parameter can be specified multiple times in the configuration, to configure multiple notification servers.
145
+		If you configure multiple notification servers, the <emphasis>multi_notify</emphasis> parameter needs to be
146
+		disabled.
144 147
 		</para>
145 148
 		<para>
146 149
 		<emphasis>Default value is <quote>NULL</quote>.</emphasis>
... ...
@@ -177,7 +180,8 @@ modparam("dmq", "notification_channel", "peers")
177 180
 		<title><varname>multi_notify</varname>(int)</title>
178 181
 		<para>
179 182
 		Enables the ability to resolve multiple IPv4/IPv6 addresses for
180
-		a single notification address.
183
+		a single notification address. Please note that this mode is not
184
+		supported if you specify multiple notification address parameter.
181 185
 		</para>
182 186
 		<para>
183 187
 		A value of zero resolves to the first IP address found.
... ...
@@ -278,9 +278,9 @@ int get_dmq_host_list(
278 278
 }
279 279
 
280 280
 /**
281
- * @brief add a server node and notify it
281
+ * @brief add one or more server node(s) and notify it
282 282
  */
283
-dmq_node_t *add_server_and_notify(str *paddr)
283
+dmq_node_t *add_server_and_notify(str_list_t *server_list)
284 284
 {
285 285
 	char puri_data[MAXDMQHOSTS * (MAXDMQURILEN + 1)];
286 286
 	char *puri_list[MAXDMQHOSTS];
... ...
@@ -296,7 +296,12 @@ dmq_node_t *add_server_and_notify(str *paddr)
296 296
 	**********/
297 297
 
298 298
 	if(!dmq_multi_notify) {
299
-		pfirst = add_dmq_node(dmq_node_list, paddr);
299
+		while (&server_list->s != NULL) {
300
+			LM_DBG("adding notification node %.*s\n",
301
+				server_list->s.len, server_list->s.s);
302
+			pfirst = add_dmq_node(dmq_node_list, &server_list->s);
303
+			server_list = server_list->next;
304
+		}
300 305
 	} else {
301 306
 		/**********
302 307
 		* o init data area
... ...
@@ -307,7 +312,7 @@ dmq_node_t *add_server_and_notify(str *paddr)
307 312
 		for(index = 0; index < MAXDMQHOSTS; index++) {
308 313
 			puri_list[index] = &puri_data[index * (MAXDMQURILEN + 1)];
309 314
 		}
310
-		if(parse_uri(paddr->s, paddr->len, puri) < 0) {
315
+		if(parse_uri(server_list->s.s, server_list->s.len, puri) < 0) {
311 316
 			/* this is supposed to be good but just in case... */
312 317
 			LM_ERR("add_server_and_notify address invalid\n");
313 318
 			return 0;
... ...
@@ -599,12 +604,16 @@ int notification_resp_callback_f(
599 604
 			run_init_callbacks();
600 605
 		}
601 606
 	} else if(code == 408) {
602
-		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
603
-			LM_ERR("not deleting notification peer [%.*s]\n",
604
-					STR_FMT(&dmq_notification_address));
605
-			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
606
-			return 0;
607
-		}
607
+		/* TODO this probably do not work for dmq_multi_notify */
608
+		while (&dmq_notification_address_list->s != NULL) {
609
+			if(STR_EQ(node->orig_uri, dmq_notification_address_list->s)) {
610
+				LM_ERR("not deleting notification peer [%.*s]\n",
611
+					STR_FMT(&dmq_notification_address_list->s));
612
+				update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);
613
+				return 0;
614
+			}
615
+			dmq_notification_address_list = dmq_notification_address_list->next;
616
+               }
608 617
 		if (node->status == DMQ_NODE_DISABLED) {
609 618
 			/* deleting node - the server did not respond */
610 619
 			LM_ERR("deleting server node %.*s because of failed request\n",
... ...
@@ -48,7 +48,7 @@ int build_node_str(dmq_node_t *node, char *buf, int buflen);
48 48
  *           forward the request to its own list
49 49
  */
50 50
 int request_nodelist(dmq_node_t *node, int forward);
51
-dmq_node_t *add_server_and_notify(str *server_address);
51
+dmq_node_t *add_server_and_notify(str_list_t *server_list);
52 52
 
53 53
 /* helper functions */
54 54
 extern int notification_resp_callback_f(