Browse code

dmq: Add init_callback() to API

The init_callback is called after DMQ has synced with the notification_peer.
This callback can thus be used to send/broadcast messages as early as
possible.

Alex Hermann authored on 13/11/2014 19:45:32 • Charles Chance committed on 13/11/2014 21:57:34
Showing 5 changed files
... ...
@@ -239,7 +239,14 @@ static int mod_init(void)
239 239
 		LM_ERR("error in shm_malloc\n");
240 240
 		return -1;
241 241
 	}
242
-	
242
+
243
+	dmq_init_callback_done = shm_malloc(sizeof(int));
244
+	if (!dmq_init_callback_done) {
245
+		LM_ERR("no more shm\n");
246
+		return -1;
247
+	}
248
+	*dmq_init_callback_done = 0;
249
+
243 250
 	/**
244 251
 	 * add the dmq notification peer.
245 252
 	 * the dmq is a peer itself so that it can receive node notifications
... ...
@@ -326,6 +333,9 @@ static void destroy(void) {
326 333
 	if (dmq_server_socket.s) {
327 334
 		pkg_free(dmq_server_socket.s);
328 335
 	}
336
+	if (dmq_init_callback_done) {
337
+		shm_free(dmq_init_callback_done);
338
+	}
329 339
 }
330 340
 
331 341
 static int handle_dmq_fixup(void** param, int param_no)
... ...
@@ -29,6 +29,9 @@
29 29
 str notification_content_type = str_init("text/plain");
30 30
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
31 31
 
32
+int *dmq_init_callback_done;
33
+
34
+
32 35
 /**
33 36
  * @brief add notification peer
34 37
  */
... ...
@@ -36,6 +39,7 @@ int add_notification_peer()
36 39
 {
37 40
 	dmq_peer_t not_peer;
38 41
 	not_peer.callback = dmq_notification_callback;
42
+	not_peer.init_callback = NULL;
39 43
 	not_peer.description.s = "notification_peer";
40 44
 	not_peer.description.len = 17;
41 45
 	not_peer.peer_id.s = "notification_peer";
... ...
@@ -165,6 +169,21 @@ error:
165 169
 	return -1;
166 170
 }
167 171
 
172
+
173
+int run_init_callbacks() {
174
+	dmq_peer_t* crt;
175
+
176
+	crt = peer_list->peers;
177
+	while(crt) {
178
+		if (crt->init_callback) {
179
+			crt->init_callback();
180
+		}
181
+		crt = crt->next;
182
+	}
183
+	return 0;
184
+}
185
+
186
+
168 187
 /**
169 188
  * @brief dmq notification callback
170 189
  */
... ...
@@ -206,6 +225,10 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
206 225
 				&notification_callback, maxforwards, &notification_content_type);
207 226
 	}
208 227
 	pkg_free(response_body);
228
+	if (!*dmq_init_callback_done) {
229
+		*dmq_init_callback_done = 1;
230
+		run_init_callbacks();
231
+	}
209 232
 	return 0;
210 233
 error:
211 234
 	return -1;
... ...
@@ -292,8 +315,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code,
292 315
 		dmq_node_t* node, void* param)
293 316
 {
294 317
 	int ret;
318
+	int nodes_recv;
319
+
295 320
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
296
-	if(code == 408) {
321
+	if(code == 200) {
322
+		nodes_recv = extract_node_list(node_list, msg);
323
+		LM_DBG("received %d new or changed nodes\n", nodes_recv);
324
+		if (!*dmq_init_callback_done) {
325
+			*dmq_init_callback_done = 1;
326
+			run_init_callbacks();
327
+		}
328
+	} else if(code == 408) {
297 329
 		/* deleting node - the server did not respond */
298 330
 		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
299 331
 		if (STR_EQ(node->orig_uri, dmq_notification_address)) {
... ...
@@ -34,6 +34,7 @@
34 34
 #include "dmq_funcs.h"
35 35
 
36 36
 extern str notification_content_type;
37
+extern int *dmq_init_callback_done;
37 38
 
38 39
 int add_notification_peer();
39 40
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
... ...
@@ -42,11 +42,13 @@ typedef struct peer_response {
42 42
 } peer_reponse_t;
43 43
 
44 44
 typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
45
+typedef int(*init_callback_t)();
45 46
 
46 47
 typedef struct dmq_peer {
47 48
 	str peer_id;
48 49
 	str description;
49 50
 	peer_callback_t callback;
51
+	init_callback_t init_callback;
50 52
 	struct dmq_peer* next;
51 53
 } dmq_peer_t;
52 54
 
... ...
@@ -59,6 +59,7 @@ int ht_dmq_initialize()
59 59
         }
60 60
 
61 61
 	not_peer.callback = ht_dmq_handle_msg;
62
+	not_peer.init_callback = NULL;
62 63
 	not_peer.description.s = "htable";
63 64
 	not_peer.description.len = 6;
64 65
 	not_peer.peer_id.s = "htable";