Browse code

implemented send/bcast functions

Marius Bucur authored on 26/04/2011 16:21:26
Showing 12 changed files
... ...
@@ -48,6 +48,7 @@
48 48
 #include "../../lib/kmi/mi.h"
49 49
 #include "../../lib/kcore/hash_func.h"
50 50
 #include "dmq.h"
51
+#include "dmq_funcs.h"
51 52
 #include "peer.h"
52 53
 #include "bind_dmq.h"
53 54
 #include "worker.h"
... ...
@@ -71,6 +72,7 @@ struct sip_uri dmq_server_uri;
71 72
 
72 73
 str dmq_notification_address = {0, 0};
73 74
 struct sip_uri dmq_notification_uri;
75
+int ping_interval = 4;
74 76
 
75 77
 /* TM bind */
76 78
 struct tm_binds tmb;
... ...
@@ -84,7 +86,7 @@ dmq_peer_list_t* peer_list;
84 86
 /* the list of dmq servers */
85 87
 dmq_node_list_t* node_list;
86 88
 // the dmq module is a peer itself for receiving notifications regarding nodes
87
-dmq_peer_t dmq_notification_peer;
89
+dmq_peer_t* dmq_notification_peer;
88 90
 
89 91
 /** module functions */
90 92
 static int mod_init(void);
... ...
@@ -103,6 +105,7 @@ static cmd_export_t cmds[] = {
103 105
 
104 106
 static param_export_t params[] = {
105 107
 	{"num_workers", INT_PARAM, &num_workers},
108
+	{"ping_interval", INT_PARAM, &ping_interval},
106 109
 	{"server_address", STR_PARAM, &dmq_server_address.s},
107 110
 	{"notification_address", STR_PARAM, &dmq_notification_address.s},
108 111
 	{0, 0, 0}
... ...
@@ -177,9 +180,20 @@ static int mod_init(void) {
177 180
 		return -1;
178 181
 	}
179 182
 	
180
-	/* add first dmq peer - the dmq module itself to receive peer notify messages */
183
+	/**
184
+         * add the dmq notification peer.
185
+	 * the dmq is a peer itself so that it can receive node notifications
186
+	 */
187
+	add_notification_peer();
181 188
 	
182 189
 	startup_time = (int) time(NULL);
190
+	
191
+	/**
192
+	 * add the ping timer
193
+	 * it pings the servers once in a while so that we know which failed
194
+	 */
195
+	register_timer(ping_servers, 0, ping_interval);
196
+	
183 197
 	return 0;
184 198
 }
185 199
 
... ...
@@ -204,11 +218,20 @@ static int child_init(int rank) {
204 218
 				workers[i].pid = newpid;
205 219
 			}
206 220
 		}
207
-		/**
208
-		 * add the dmq notification peer.
209
-		 * the dmq is a peer itself so that it can receive node notifications
221
+		/* notification_node - the node from which the Kamailio instance
222
+		 * gets the server list on startup.
223
+		 * the address is given as a module parameter in dmq_notification_address
224
+		 * the module MUST have this parameter if the Kamailio instance is not
225
+		 * a master in this architecture
210 226
 		 */
211
-		add_notification_peer();
227
+		if(dmq_notification_address.s) {
228
+			notification_node = add_server_and_notify(&dmq_notification_address);
229
+			if(!notification_node) {
230
+				LM_ERR("cannot retrieve initial nodelist from %.*s\n",
231
+				       STR_FMT(&dmq_notification_address));
232
+				return -1;
233
+			}
234
+		}
212 235
 		return 0;
213 236
 	}
214 237
 	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
... ...
@@ -234,16 +257,19 @@ static int handle_dmq_fixup(void** param, int param_no) {
234 257
 static int parse_server_address(str* uri, struct sip_uri* parsed_uri) {
235 258
 	if(!uri->s) {
236 259
 		LM_ERR("server address missing\n");
237
-		return -1;
260
+		goto empty;
238 261
 	}
239 262
 	uri->len = strlen(uri->s);
240 263
 	if(!uri->len) {
241 264
 		LM_ERR("empty server address\n");
242
-		return -1;
265
+		goto empty;
243 266
 	}
244 267
 	if(parse_uri(uri->s, uri->len, parsed_uri) < 0) {
245 268
 		LM_ERR("error parsing server address\n");
246 269
 		return -1;
247 270
 	}
248 271
 	return 0;
272
+empty:
273
+	uri->s = NULL;
274
+	return 0;
249 275
 }
250 276
\ No newline at end of file
... ...
@@ -16,7 +16,7 @@
16 16
 
17 17
 extern int num_workers;
18 18
 extern dmq_worker_t* workers;
19
-extern dmq_peer_t dmq_notification_peer;
19
+extern dmq_peer_t* dmq_notification_peer;
20 20
 extern str dmq_server_address;
21 21
 extern dmq_peer_list_t* peer_list;
22 22
 extern dmq_node_list_t* node_list;
... ...
@@ -28,6 +28,11 @@ extern struct sip_uri dmq_notification_uri;
28 28
 extern struct tm_binds tmb;
29 29
 extern sl_api_t slb;
30 30
 
31
+extern str dmq_200_rpl;
32
+extern str dmq_400_rpl;
33
+extern str dmq_500_rpl;
34
+extern str dmq_404_rpl;
35
+
31 36
 static inline int dmq_load_api(dmq_api_t* api) {
32 37
 	bind_dmq_f binddmq;
33 38
 	binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0);
... ...
@@ -1,22 +1,33 @@
1 1
 #include "dmq_funcs.h"
2
+#include "notification_peer.h"
2 3
 
3
-int register_dmq_peer(dmq_peer_t* peer) {
4
+dmq_peer_t* register_dmq_peer(dmq_peer_t* peer) {
5
+	dmq_peer_t* new_peer;
4 6
 	lock_get(&peer_list->lock);
5 7
 	if(search_peer_list(peer_list, peer)) {
6 8
 		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
7 9
 		       peer->description.len, peer->description.s);
8
-		return -1;
10
+		return NULL;
9 11
 	}
10
-	add_peer(peer_list, peer);
12
+	new_peer = add_peer(peer_list, peer);
11 13
 	lock_release(&peer_list->lock);
12
-	return 0;
14
+	return new_peer;
13 15
 }
14 16
 
15
-void dmq_tm_callback( struct cell *t, int type, struct tmcb_params *ps) {
16
-	LM_ERR("callback\n");
17
+void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps) {
18
+	dmq_cback_param_t* cb_param = (dmq_cback_param_t*)(*ps->param);
19
+	LM_DBG("dmq_tm_callback start\n");
20
+	if(cb_param->resp_cback.f) {
21
+		if(cb_param->resp_cback.f(ps->rpl, ps->code, cb_param->node, cb_param->resp_cback.param) < 0) {
22
+			LM_ERR("error in response callback\n");
23
+		}
24
+	}
25
+	LM_DBG("dmq_tm_callback done\n");
26
+	shm_free_node(cb_param->node);
27
+	shm_free(cb_param);
17 28
 }
18 29
 
19
-int build_from_str(str* username, struct sip_uri* uri, str* from) {
30
+int build_uri_str(str* username, struct sip_uri* uri, str* from) {
20 31
 	/* sip:user@host:port */
21 32
 	int from_len = username->len + uri->host.len + uri->port.len + 10;
22 33
 	if(!uri->host.s || !uri->host.len) {
... ...
@@ -51,19 +62,58 @@ int build_from_str(str* username, struct sip_uri* uri, str* from) {
51 62
 	return 0;
52 63
 }
53 64
 
54
-int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node) {
65
+/* broadcast a dmq message
66
+ * peer - the peer structure on behalf of which we are sending
67
+ * body - the body of the message
68
+ * except - we do not send the message to this node
69
+ * resp_cback - a response callback that gets called when the transaction is complete
70
+ */
71
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback) {
72
+	dmq_node_t* node;
73
+	
74
+	lock_get(&node_list->lock);
75
+	node = node_list->nodes;
76
+	while(node) {
77
+		if((except && cmp_dmq_node(node, except)) || node->local) {
78
+			node = node->next;
79
+			continue;
80
+		}
81
+		if(send_dmq_message(peer, body, node, resp_cback) < 0) {
82
+			LM_ERR("error sending dmq message\n");
83
+			goto error;
84
+		}
85
+		node = node->next;
86
+	}
87
+	lock_release(&node_list->lock);
88
+	return 0;
89
+error:
90
+	lock_release(&node_list->lock);
91
+	return -1;
92
+}
93
+
94
+/* send a dmq message
95
+ * peer - the peer structure on behalf of which we are sending
96
+ * body - the body of the message
97
+ * node - we send the message to this node
98
+ * resp_cback - a response callback that gets called when the transaction is complete
99
+ */
100
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback) {
55 101
 	uac_req_t uac_r;
56 102
 	str str_hdr = {0, 0};
57
-	/* TODO - do not hardcode these - just for tesing purposes */
58 103
 	str from, to, req_uri;
59
-	void *cb_param = NULL;
60
-	int result;
104
+	dmq_cback_param_t* cb_param = NULL;
105
+	int result = 0;
106
+	
107
+	cb_param = shm_malloc(sizeof(*cb_param));
108
+	memset(cb_param, 0, sizeof(*cb_param));
109
+	cb_param->resp_cback = *resp_cback;
110
+	cb_param->node = shm_dup_node(node);
61 111
 	
62
-	if(build_from_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
63
-		LM_ERR("error building from string\n");
112
+	if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
113
+		LM_ERR("error building from string [username %.*s]\n", STR_FMT(&peer->peer_id));
64 114
 		return -1;
65 115
 	}
66
-	if(build_from_str(&peer->peer_id, &node->uri, &to) < 0) {
116
+	if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
67 117
 		LM_ERR("error building to string\n");
68 118
 		return -1;
69 119
 	}
... ...
@@ -74,10 +124,28 @@ int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node) {
74 124
 	result = tmb.t_request(&uac_r, &req_uri,
75 125
 			       &to, &from,
76 126
 			       NULL);
77
-	if(result < 0)
78
-	{
127
+	if(result < 0) {
79 128
 		LM_ERR("error in tmb.t_request_within\n");
80 129
 		return -1;
81 130
 	}
82 131
 	return 0;
132
+}
133
+
134
+/* pings the servers in the nodelist
135
+ * if the server does not reply to the ping, it is removed from the list
136
+ * the ping messages are actualy notification requests
137
+ * this way the ping will have two uses:
138
+ *   - checks if the servers in the list are up and running
139
+ *   - updates the list of servers from the other nodes
140
+ */
141
+void ping_servers(unsigned int ticks,void *param) {
142
+	str* body = build_notification_body();
143
+	int ret;
144
+	LM_DBG("ping_servers\n");
145
+	ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback);
146
+	pkg_free(body->s);
147
+	pkg_free(body);
148
+	if(ret < 0) {
149
+		LM_ERR("error broadcasting message\n");
150
+	}
83 151
 }
84 152
\ No newline at end of file
... ...
@@ -1,14 +1,27 @@
1 1
 #ifndef DMQ_FUNCS_H
2 2
 #define DMQ_FUNCS_H
3 3
 
4
+#include "../../str.h"
4 5
 #include "../../modules/tm/dlg.h"
5 6
 #include "../../modules/tm/tm_load.h"
6 7
 #include "dmq.h"
7 8
 #include "peer.h"
8 9
 #include "worker.h"
9
-#include "../../str.h"
10 10
 
11
-int register_dmq_peer(dmq_peer_t* peer);
12
-int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node);
11
+void ping_servers(unsigned int ticks,void *param);
12
+
13
+typedef struct dmq_resp_cback {
14
+	int (*f)(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
15
+	void* param;
16
+} dmq_resp_cback_t;
17
+
18
+typedef struct dmq_cback_param {
19
+	dmq_resp_cback_t resp_cback;
20
+	dmq_node_t* node;
21
+} dmq_cback_param_t;
22
+
23
+dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
24
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback);
25
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback);
13 26
 
14 27
 #endif
15 28
\ No newline at end of file
... ...
@@ -12,6 +12,65 @@ dmq_node_list_t* init_dmq_node_list() {
12 12
 	return node_list;
13 13
 }
14 14
 
15
+inline int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
16
+	if(!node || !cmpnode) {
17
+		LM_ERR("cmp_dmq_node - null node received\n");
18
+		return -1;
19
+	}
20
+	return STR_EQ(node->uri.host, cmpnode->uri.host) &&
21
+	       STR_EQ(node->uri.port, cmpnode->uri.port);
22
+}
23
+
24
+inline dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
25
+	dmq_node_t* cur = list->nodes;
26
+	while(cur) {
27
+		if(cmp_dmq_node(cur, node)) {
28
+			return cur;
29
+		}
30
+		cur = cur->next;
31
+	}
32
+	return NULL;
33
+}
34
+
35
+inline dmq_node_t* shm_dup_node(dmq_node_t* node) {
36
+	dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
37
+	memcpy(newnode, node, sizeof(dmq_node_t));
38
+	shm_str_dup(&newnode->orig_uri, &node->orig_uri);
39
+	if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len, &newnode->uri) < 0) {
40
+		LM_ERR("error in parsing node uri\n");
41
+		goto error;
42
+	}
43
+	return newnode;
44
+error:
45
+	shm_free(newnode->orig_uri.s);
46
+	shm_free(newnode);
47
+	return NULL;
48
+}
49
+
50
+inline void shm_free_node(dmq_node_t* node) {
51
+	shm_free(node->orig_uri.s);
52
+	shm_free(node);
53
+}
54
+
55
+inline int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
56
+	dmq_node_t *cur, **prev;
57
+	lock_get(&list->lock);
58
+	cur = list->nodes;
59
+	prev = &list->nodes;
60
+	while(cur) {
61
+		if(cmp_dmq_node(cur, node)) {
62
+			*prev = cur->next;
63
+			shm_free_node(cur);
64
+			lock_release(&list->lock);
65
+			return 1;
66
+		}
67
+		prev = &cur->next;
68
+		cur = cur->next;
69
+	}
70
+	lock_release(&list->lock);
71
+	return 0;
72
+}
73
+
15 74
 inline dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
16 75
 	dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
17 76
 	memset(newnode, 0, sizeof(dmq_node_t));
... ...
@@ -27,6 +86,7 @@ inline dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
27 86
 	lock_release(&list->lock);
28 87
 	return newnode;
29 88
 error:
89
+	shm_free(newnode->orig_uri.s);
30 90
 	shm_free(newnode);
31
-	return 0;
91
+	return NULL;
32 92
 }
33 93
\ No newline at end of file
... ...
@@ -12,22 +12,28 @@
12 12
 #define NBODY_LEN 1024
13 13
 
14 14
 typedef struct dmq_node {
15
-	str orig_uri;
16
-	struct sip_uri uri;
17
-	int status;
18
-	int last_notification;
19
-	struct dmq_node* next;
15
+	int local; /* local type set means the dmq dmqnode == self */
16
+	str orig_uri; /* original uri string - e.g. sip:127.0.0.1:5060;passive=true */
17
+	struct sip_uri uri; /* parsed uri string */
18
+	int status; /* reserved - maybe something like active,timeout,disabled */
19
+	int last_notification; /* last notificatino receied from the node */
20
+	struct dmq_node* next; /* pointer to the next struct dmq_node */
20 21
 } dmq_node_t;
21 22
 
22 23
 typedef struct dmq_node_list {
23
-	gen_lock_t lock;
24
-	struct dmq_node* nodes;
25
-	int count;
24
+	gen_lock_t lock; /* lock for the list - must acquire before manipulating it */
25
+	struct dmq_node* nodes; /* the nodes in the list */
26
+	int count; /* the number of nodes in the list */
26 27
 } dmq_node_list_t;
27 28
 
28 29
 dmq_node_list_t* init_dmq_node_list();
29 30
 int update_node_list(dmq_node_list_t* remote_list);
30 31
 dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri);
32
+dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node);
33
+int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node);
34
+int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode);
35
+dmq_node_t* shm_dup_node(dmq_node_t* node);
36
+void shm_free_node(dmq_node_t* node);
31 37
 
32 38
 extern dmq_node_t* self_node;
33 39
 extern dmq_node_t* notification_node;	
... ...
@@ -1,5 +1,6 @@
1 1
 #include "../../parser/parse_to.h"
2
-#include "../../parser/parse_uri.h" 
2
+#include "../../parser/parse_uri.h"
3
+#include "../../sip_msg_clone.h"
3 4
 #include "../../parser/parse_content.h"
4 5
 #include "../../parser/parse_from.h"
5 6
 #include "../../ut.h"
... ...
@@ -8,16 +9,18 @@
8 9
 #include "peer.h"
9 10
 #include "message.h"
10 11
 
11
-static str dmq_200_rpl  = str_init("OK");
12
-static str dmq_400_rpl  = str_init("Bad request");
13
-static str dmq_500_rpl  = str_init("Server Internal Error");
14
-static str dmq_404_rpl  = str_init("User Not Found");
12
+str dmq_200_rpl  = str_init("OK");
13
+str dmq_400_rpl  = str_init("Bad request");
14
+str dmq_500_rpl  = str_init("Server Internal Error");
15
+str dmq_404_rpl  = str_init("User Not Found");
15 16
 
16 17
 int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
17 18
 	dmq_peer_t* peer;
19
+	struct sip_msg* cloned_msg = NULL;
20
+	int cloned_msg_len;
18 21
 	if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
19
-			LM_ERR("cannot parse msg URI\n");
20
-			return -1;
22
+			LM_ERR("error parsing msg uri\n");
23
+			goto error;
21 24
 	}
22 25
 	LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n",
23 26
 	       msg->first_line.u.request.method.len, msg->first_line.u.request.method.s,
... ...
@@ -25,7 +28,7 @@ int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
25 28
 	       ZSW(str1), ZSW(str2));
26 29
 	/* the peer id is given as the userinfo part of the request URI */
27 30
 	peer = find_peer(msg->parsed_uri.user);
28
-	if(peer) {
31
+	if(!peer) {
29 32
 		LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
30 33
 		if(slb.freply(msg, 404, &dmq_404_rpl) < 0)
31 34
 		{
... ...
@@ -35,7 +38,12 @@ int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
35 38
 		return 0;
36 39
 	}
37 40
 	LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
38
-	add_dmq_job(msg, peer);
41
+	cloned_msg = sip_msg_shm_clone(msg, &cloned_msg_len, 1);
42
+	if(!cloned_msg) {
43
+		LM_ERR("error cloning sip message\n");
44
+		goto error;
45
+	}
46
+	add_dmq_job(cloned_msg, peer);
39 47
 	return 0;
40 48
 error:
41 49
 	return -1;
... ...
@@ -1,13 +1,17 @@
1 1
 #include "notification_peer.h"
2
-#include "dmq_funcs.h"
2
+
3
+static str notification_content_type = str_init("text/plain");
4
+dmq_resp_cback_t notification_callback = {&notification_callback_f, 0};
3 5
 
4 6
 int add_notification_peer() {
5
-	dmq_notification_peer.callback = dmq_notification_callback;
6
-	dmq_notification_peer.description.s = "notification_peer";
7
-	dmq_notification_peer.description.len = 17;
8
-	dmq_notification_peer.peer_id.s = "notification_peer";
9
-	dmq_notification_peer.peer_id.len = 17;
10
-	if(register_dmq_peer(&dmq_notification_peer) < 0) {
7
+	dmq_peer_t not_peer;
8
+	not_peer.callback = dmq_notification_callback;
9
+	not_peer.description.s = "notification_peer";
10
+	not_peer.description.len = 17;
11
+	not_peer.peer_id.s = "notification_peer";
12
+	not_peer.peer_id.len = 17;
13
+	dmq_notification_peer = register_dmq_peer(&not_peer);
14
+	if(!dmq_notification_peer) {
11 15
 		LM_ERR("error in register_dmq_peer\n");
12 16
 		goto error;
13 17
 	}
... ...
@@ -17,20 +21,28 @@ int add_notification_peer() {
17 21
 		LM_ERR("error adding self node\n");
18 22
 		goto error;
19 23
 	}
20
-	/* add the notification server to the node list */
21
-	notification_node = add_dmq_node(node_list, &dmq_notification_address);
22
-	if(!notification_node) {
24
+	/* local node - only for self */
25
+	self_node->local = 1;
26
+	return 0;
27
+error:
28
+	return -1;
29
+}
30
+
31
+dmq_node_t* add_server_and_notify(str* server_address) {
32
+	/* add the notification server to the node list - if any */
33
+	dmq_node_t* node = add_dmq_node(node_list, server_address);
34
+	if(!node) {
23 35
 		LM_ERR("error adding notification node\n");
24 36
 		goto error;
25 37
 	}
26
-	/* initial notification request to receive the complete node list */
27
-	if(request_initial_nodelist() < 0) {
28
-		LM_ERR("error in request_initial_notification\n");
38
+	/* request initial list from the notification server */
39
+	if(request_nodelist(node) < 0) {
40
+		LM_ERR("error requesting initial nodelist\n");
29 41
 		goto error;
30 42
 	}
31
-	return 0;
43
+	return node;
32 44
 error:
33
-	return -1;
45
+	return NULL;
34 46
 }
35 47
 
36 48
 /**
... ...
@@ -50,7 +62,7 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
50 62
 	int content_length, total_nodes = 0;
51 63
 	str body;
52 64
 	str tmp_uri;
53
-	dmq_node_t *cur = NULL, *prev = NULL, *first = NULL;
65
+	dmq_node_t *cur = NULL;
54 66
 	char *tmp, *end, *match;
55 67
 	if(!msg->content_length) {
56 68
 		LM_ERR("no content length header found\n");
... ...
@@ -66,47 +78,78 @@ int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
66 78
 	tmp = body.s;
67 79
 	end = body.s + body.len;
68 80
 	
81
+	/* acquire big list lock */
82
+	lock_get(&update_list->lock);
69 83
 	while(tmp < end) {
70
-		total_nodes++;
71 84
 		cur = shm_malloc(sizeof(dmq_node_t));
72 85
 		memset(cur, 0, sizeof(*cur));
73
-		/* keep the list tail in first */
74
-		if(!first) {
75
-			first = cur;
76
-		}
77
-		cur->next = prev;
78 86
 		
79 87
 		match = q_memchr(tmp, '\n', end - tmp);
80
-		if (match){
88
+		if(match) {
81 89
 			match++;
82
-		}else {
90
+		} else {
83 91
 			/* for the last line - take all of it */
84 92
 			match = end;
85 93
 		}
86 94
 		/* create the orig_uri from the parsed uri line and trim it */
87 95
 		tmp_uri.s = tmp;
88
-		tmp_uri.len = match - tmp;
96
+		tmp_uri.len = match - tmp - 1;
97
+		tmp = match;
89 98
 		shm_str_dup(&cur->orig_uri, &tmp_uri);
99
+		/* trim the \r, \n and \0's */
90 100
 		trim_r(cur->orig_uri);
91
-		
92
-		tmp = match;
93
-		
94
-		prev = cur;
101
+		if(parse_uri(cur->orig_uri.s, cur->orig_uri.len, &cur->uri) < 0) {
102
+			LM_ERR("cannot parse uri\n");
103
+			goto error;
104
+		}
105
+		if(!find_dmq_node(update_list, cur)) {
106
+			LM_DBG("found new node %.*s\n", STR_FMT(&cur->orig_uri));
107
+			cur->next = update_list->nodes;
108
+			update_list->nodes = cur;
109
+			update_list->count++;
110
+			total_nodes++;
111
+		} else {
112
+			shm_free(cur->orig_uri.s);
113
+			shm_free(cur);
114
+		}
95 115
 	}
96
-	lock_get(&update_list->lock);
97
-	first->next = update_list->nodes;
98
-	update_list->nodes = cur;
116
+	/* release big list lock */
99 117
 	lock_release(&update_list->lock);
100 118
 	return total_nodes;
119
+error:
120
+	lock_release(&update_list->lock);
121
+	shm_free(cur->orig_uri.s);
122
+	shm_free(cur);
123
+	return -1;
101 124
 }
102 125
 
103 126
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
104 127
 	int nodes_recv;
128
+	str* response_body = NULL;
105 129
 	/* received dmqnode list */
106
-	LM_ERR("dmq triggered from dmq_notification_callback\n");
130
+	LM_DBG("dmq triggered from dmq_notification_callback\n");
131
+	/* parse the message headers */
132
+	if(parse_headers(msg, HDR_EOH_F, 0) < 0) {
133
+		LM_ERR("error parsing message headers\n");
134
+		goto error;
135
+	}
107 136
 	nodes_recv = extract_node_list(node_list, msg);
108
-	LM_DBG("received %d nodes\n", nodes_recv);
137
+	LM_DBG("received %d new nodes\n", nodes_recv);
138
+	response_body = build_notification_body();
139
+	resp->content_type = notification_content_type;
140
+	resp->reason = dmq_200_rpl;
141
+	resp->body = *response_body;
142
+	resp->resp_code = 200;
143
+	
144
+	/* if we received any new nodes tell about them to the others */
145
+	if(nodes_recv > 0) {
146
+		bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback);
147
+	}
148
+	LM_DBG("broadcasted message\n");
149
+	pkg_free(response_body);
109 150
 	return 0;
151
+error:
152
+	return -1;
110 153
 }
111 154
 
112 155
 int build_node_str(dmq_node_t* node, char* buf, int buflen) {
... ...
@@ -162,11 +205,23 @@ error:
162 205
 	return NULL;
163 206
 }
164 207
 
165
-int request_initial_nodelist() {
208
+int request_nodelist(dmq_node_t* node) {
166 209
 	str* body = build_notification_body();
167 210
 	int ret;
168
-	ret = send_dmq_message(&dmq_notification_peer, body, notification_node);
211
+	ret = send_dmq_message(dmq_notification_peer, body, node, &notification_callback);
169 212
 	pkg_free(body->s);
170 213
 	pkg_free(body);
171 214
 	return ret;
215
+}
216
+
217
+int notification_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) {
218
+	int ret;
219
+	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
220
+	if(code == 408) {
221
+		/* deleting node - the server did not respond */
222
+		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
223
+		ret = del_dmq_node(node_list, node);
224
+		LM_DBG("del_dmq_node returned %d\n", ret);
225
+	}
226
+	return 0;
172 227
 }
173 228
\ No newline at end of file
... ...
@@ -1,13 +1,19 @@
1
-#include "dmq.h"
2
-#include "dmqnode.h"
3
-#include "peer.h"
4 1
 #include "../../parser/msg_parser.h"
5 2
 #include "../../parser/parse_content.h"
6 3
 #include "../../ut.h"
4
+#include "dmq.h"
5
+#include "dmqnode.h"
6
+#include "peer.h"
7
+#include "dmq_funcs.h"
7 8
 
8 9
 int add_notification_peer();
9 10
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
10 11
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
11 12
 str* build_notification_body();
12 13
 int build_node_str(dmq_node_t* node, char* buf, int buflen);
13
-int request_initial_nodelist();
14 14
\ No newline at end of file
15
+int request_nodelist(dmq_node_t* node);
16
+dmq_node_t* add_server_and_notify(str* server_address);
17
+
18
+/* helper functions */
19
+extern int notification_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
20
+extern dmq_resp_cback_t notification_callback;
15 21
\ No newline at end of file
... ...
@@ -22,7 +22,7 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
22 22
 	return 0;
23 23
 }
24 24
 
25
-void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
25
+dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
26 26
 	dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
27 27
 	*new_peer = *peer;
28 28
 	
... ...
@@ -34,6 +34,7 @@ void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
34 34
 	
35 35
 	new_peer->next = peer_list->peers;
36 36
 	peer_list->peers = new_peer;
37
+	return new_peer;
37 38
 }
38 39
 
39 40
 dmq_peer_t* find_peer(str peer_id) {
... ...
@@ -11,6 +11,8 @@
11 11
 
12 12
 typedef struct peer_response {
13 13
 	int resp_code;
14
+	str content_type;
15
+	str reason;
14 16
 	str body;
15 17
 } peer_reponse_t;
16 18
 
... ...
@@ -33,9 +35,9 @@ extern dmq_peer_list_t* peer_list;
33 35
 
34 36
 dmq_peer_list_t* init_peer_list();
35 37
 dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
36
-typedef int (*register_dmq_peer_t)(dmq_peer_t*);
38
+typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
37 39
 
38
-void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
40
+dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
39 41
 dmq_peer_t* find_peer(str peer_id);
40 42
 
41 43
 
... ...
@@ -1,6 +1,45 @@
1 1
 #include "dmq.h"
2 2
 #include "peer.h"
3 3
 #include "worker.h"
4
+#include "../../data_lump_rpl.h"
5
+#include "../../mod_fix.h"
6
+
7
+/* set the body of a response */
8
+static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
9
+{
10
+	char* buf;
11
+	int len;
12
+	int value_len;
13
+	str nb = *body;
14
+	str nc = *content_type;
15
+
16
+	/* add content-type */
17
+	value_len = nc.len;
18
+	len=sizeof("Content-Type: ") - 1 + value_len + CRLF_LEN;
19
+	buf=pkg_malloc(sizeof(char)*(len));
20
+
21
+	if (buf==0) {
22
+		LM_ERR("out of pkg memory\n");
23
+		return -1;
24
+	}
25
+	memcpy(buf, "Content-Type: ", sizeof("Content-Type: ") - 1);
26
+	memcpy(buf+sizeof("Content-Type: ") - 1, nc.s, value_len);
27
+	memcpy(buf+sizeof("Content-Type: ") - 1 + value_len, CRLF, CRLF_LEN);
28
+	if (add_lump_rpl(msg, buf, len, LUMP_RPL_HDR) == 0) {
29
+		LM_ERR("failed to insert content-type lump\n");
30
+		pkg_free(buf);
31
+		return -1;
32
+	}
33
+	pkg_free(buf);
34
+
35
+	/* add body */
36
+	if (add_lump_rpl(msg, nb.s, nb.len, LUMP_RPL_BODY) == 0) {
37
+		LM_ERR("cannot add body lump\n");
38
+		return -1;
39
+	}
40
+		
41
+	return 1;
42
+}
4 43
 
5 44
 void worker_loop(int id) {
6 45
 	dmq_worker_t* worker = &workers[id];
... ...
@@ -13,6 +52,8 @@ void worker_loop(int id) {
13 52
 		LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
14 53
 		/* multiple lock_release calls might be performed, so remove from queue until empty */
15 54
 		do {
55
+			/* fill the response with 0's */
56
+			memset(&peer_response, 0, sizeof(peer_response));
16 57
 			current_job = job_queue_pop(worker->queue);
17 58
 			/* job_queue_pop might return NULL if queue is empty */
18 59
 			if(current_job) {
... ...
@@ -20,6 +61,26 @@ void worker_loop(int id) {
20 61
 				if(ret_value < 0) {
21 62
 					LM_ERR("running job failed\n");
22 63
 				}
64
+				/* add the body to the reply */
65
+				if(peer_response.body.s) {
66
+					if(set_reply_body(current_job->msg, &peer_response.body, &peer_response.content_type) < 0) {
67
+						LM_ERR("error adding lumps\n");
68
+						continue;
69
+					}
70
+				}
71
+				/* send the reply */
72
+				if(slb.freply(current_job->msg, peer_response.resp_code, &peer_response.reason) < 0)
73
+				{
74
+					LM_ERR("error sending reply\n");
75
+				}
76
+				
77
+				/* if body given, free the lumps and free the body */
78
+				if(peer_response.body.s) {
79
+					del_nonshm_lump_rpl(&current_job->msg->reply_lump);
80
+					pkg_free(peer_response.body.s);
81
+				}
82
+				LM_DBG("sent reply\n");
83
+				shm_free(current_job->msg);
23 84
 				shm_free(current_job);
24 85
 				worker->jobs_processed++;
25 86
 			}
... ...
@@ -35,7 +96,7 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
35 96
 	new_job.msg = msg;
36 97
 	new_job.orig_peer = peer;
37 98
 	if(!num_workers) {
38
-		LM_ERR("error in add_dmq_job no workers spawned\n");
99
+		LM_ERR("error in add_dmq_job: no workers spawned\n");
39 100
 		return -1;
40 101
 	}
41 102
 	/* initialize the worker with the first one */