Browse code

improvements regarding the notification system

Marius Bucur authored on 22/04/2011 16:49:20
Showing 8 changed files
... ...
@@ -42,6 +42,7 @@
42 42
 #include "../../mem/shm_mem.h"
43 43
 #include "../../usr_avp.h"
44 44
 #include "../../modules/tm/tm_load.h"
45
+#include "../../parser/parse_uri.h"
45 46
 #include "../../modules/sl/sl.h"
46 47
 #include "../../pt.h"
47 48
 #include "../../lib/kmi/mi.h"
... ...
@@ -66,6 +67,10 @@ int pid = 0;
66 67
 /* module parameters */
67 68
 int num_workers = DEFAULT_NUM_WORKERS;
68 69
 str dmq_server_address = {0, 0};
70
+struct sip_uri dmq_server_uri;
71
+
72
+str dmq_notification_address = {0, 0};
73
+struct sip_uri dmq_notification_uri;
69 74
 
70 75
 /* TM bind */
71 76
 struct tm_binds tmb;
... ...
@@ -86,7 +91,7 @@ static int mod_init(void);
86 91
 static int child_init(int);
87 92
 static void destroy(void);
88 93
 static int handle_dmq_fixup(void** param, int param_no);
89
-static int check_dmq_server_address();
94
+static int parse_server_address(str* uri, struct sip_uri* parsed_uri);
90 95
 
91 96
 static cmd_export_t cmds[] = {
92 97
 	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, handle_dmq_fixup, 0, 
... ...
@@ -99,6 +104,7 @@ static cmd_export_t cmds[] = {
99 104
 static param_export_t params[] = {
100 105
 	{"num_workers", INT_PARAM, &num_workers},
101 106
 	{"server_address", STR_PARAM, &dmq_server_address.s},
107
+	{"notification_address", STR_PARAM, &dmq_notification_address.s},
102 108
 	{0, 0, 0}
103 109
 };
104 110
 
... ...
@@ -152,13 +158,18 @@ static int mod_init(void) {
152 158
 	
153 159
 	/* register worker processes - add one because of the ping process */
154 160
 	register_procs(num_workers);
155
-	/* check server_address not empty and correct */
156 161
 	
157
-	if(check_dmq_server_address() < 0) {
162
+	/* check server_address and notification_address are not empty and correct */
163
+	if(parse_server_address(&dmq_server_address, &dmq_server_uri) < 0) {
158 164
 		LM_ERR("server address invalid\n");
159 165
 		return -1;
160 166
 	}
161 167
 	
168
+	if(parse_server_address(&dmq_notification_address, &dmq_notification_uri) < 0) {
169
+		LM_ERR("notification address invalid\n");
170
+		return -1;
171
+	}
172
+	
162 173
 	/* allocate workers array */
163 174
 	workers = shm_malloc(num_workers * sizeof(*workers));
164 175
 	if(workers == NULL) {
... ...
@@ -213,27 +224,25 @@ static int child_init(int rank) {
213 224
  * destroy function
214 225
  */
215 226
 static void destroy(void) {
227
+	/* TODO unregister dmq node, free resources */
216 228
 }
217 229
 
218 230
 static int handle_dmq_fixup(void** param, int param_no) {
219 231
  	return 0;
220 232
 }
221 233
 
222
-static int check_dmq_server_address() {
223
-	if(!dmq_server_address.s) {
234
+static int parse_server_address(str* uri, struct sip_uri* parsed_uri) {
235
+	if(!uri->s) {
236
+		LM_ERR("server address missing\n");
224 237
 		return -1;
225 238
 	}
226
-	dmq_server_address.len = strlen(dmq_server_address.s);
227
-	if(!dmq_server_address.len) {
239
+	uri->len = strlen(uri->s);
240
+	if(!uri->len) {
228 241
 		LM_ERR("empty server address\n");
229 242
 		return -1;
230 243
 	}
231
-	if(strncmp(dmq_server_address.s, "sip:", 4)) {
232
-		LM_ERR("server address must start with sip:\n");
233
-		return -1;
234
-	}
235
-	if(!strchr(dmq_server_address.s + 4, ':')) {
236
-		LM_ERR("server address must be of form sip:host:port\n");
244
+	if(parse_uri(uri->s, uri->len, parsed_uri) < 0) {
245
+		LM_ERR("error parsing server address\n");
237 246
 		return -1;
238 247
 	}
239 248
 	return 0;
... ...
@@ -5,6 +5,7 @@
5 5
 #include "../../error.h"
6 6
 #include "../../sr_module.h"
7 7
 #include "../../modules/tm/tm_load.h"
8
+#include "../../parser/parse_uri.h"
8 9
 #include "../../modules/sl/sl.h"
9 10
 #include "bind_dmq.h"
10 11
 #include "peer.h"
... ...
@@ -20,7 +21,9 @@ extern str dmq_server_address;
20 21
 extern dmq_peer_list_t* peer_list;
21 22
 extern dmq_node_list_t* node_list;
22 23
 extern str dmq_request_method;
23
-
24
+extern struct sip_uri dmq_server_uri;
25
+extern str dmq_notification_address;
26
+extern struct sip_uri dmq_notification_uri;
24 27
 /* sl and tm */
25 28
 extern struct tm_binds tmb;
26 29
 extern sl_api_t slb;
... ...
@@ -16,18 +16,63 @@ void dmq_tm_callback( struct cell *t, int type, struct tmcb_params *ps) {
16 16
 	LM_ERR("callback\n");
17 17
 }
18 18
 
19
-int send_dmq_message(dmq_peer_t* peer, str* body) {
19
+int build_from_str(str* username, struct sip_uri* uri, str* from) {
20
+	/* sip:user@host:port */
21
+	int from_len = username->len + uri->host.len + uri->port.len + 10;
22
+	if(!uri->host.s || !uri->host.len) {
23
+		LM_ERR("no host in uri\n");
24
+		return -1;
25
+	}
26
+	if(!username->s || !username->len) {
27
+		LM_ERR("no username given\n");
28
+		return -1;
29
+	}
30
+	from->s = pkg_malloc(from_len);
31
+	from->len = 0;
32
+	
33
+	memcpy(from->s, "sip:", 4);
34
+	from->len += 4;
35
+	
36
+	memcpy(from->s + from->len, username->s, username->len);
37
+	from->len += username->len;
38
+	
39
+	memcpy(from->s + from->len, "@", 1);
40
+	from->len += 1;
41
+	
42
+	memcpy(from->s + from->len, uri->host.s, uri->host.len);
43
+	from->len += uri->host.len;
44
+	
45
+	if(uri->port.s && uri->port.len) {
46
+		memcpy(from->s + from->len, ":", 1);
47
+		from->len += 1;
48
+		memcpy(from->s + from->len, uri->port.s, uri->port.len);
49
+		from->len += uri->port.len;
50
+	}
51
+	return 0;
52
+}
53
+
54
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node) {
20 55
 	uac_req_t uac_r;
21 56
 	str str_hdr = {0, 0};
22 57
 	/* TODO - do not hardcode these - just for tesing purposes */
23
-	str from = {"sip:dmqnode@10.0.0.0:5060", 25};
24
-	str req_uri = from;
58
+	str from, to, req_uri;
25 59
 	void *cb_param = NULL;
26 60
 	int result;
61
+	
62
+	if(build_from_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
63
+		LM_ERR("error building from string\n");
64
+		return -1;
65
+	}
66
+	if(build_from_str(&peer->peer_id, &node->uri, &to) < 0) {
67
+		LM_ERR("error building to string\n");
68
+		return -1;
69
+	}
70
+	req_uri = to;
71
+	
27 72
 	set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL, TMCB_LOCAL_COMPLETED,
28 73
 			dmq_tm_callback, (void*)cb_param);
29 74
 	result = tmb.t_request(&uac_r, &req_uri,
30
-			       &from, &from,
75
+			       &to, &from,
31 76
 			       NULL);
32 77
 	if(result < 0)
33 78
 	{
... ...
@@ -9,6 +9,6 @@
9 9
 #include "../../str.h"
10 10
 
11 11
 int register_dmq_peer(dmq_peer_t* peer);
12
-int send_dmq_message(dmq_peer_t* peer, str* body);
12
+int send_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* node);
13 13
 
14 14
 #endif
15 15
\ No newline at end of file
... ...
@@ -2,6 +2,9 @@
2 2
 #include "dmqnode.h"
3 3
 #include "dmq.h"
4 4
 
5
+dmq_node_t* self_node;
6
+dmq_node_t* notification_node;	
7
+
5 8
 dmq_node_list_t* init_dmq_node_list() {
6 9
 	dmq_node_list_t* node_list = shm_malloc(sizeof(dmq_node_list_t));
7 10
 	memset(node_list, 0, sizeof(dmq_node_list_t));
... ...
@@ -9,14 +12,21 @@ dmq_node_list_t* init_dmq_node_list() {
9 12
 	return node_list;
10 13
 }
11 14
 
12
-inline int add_dmq_node(dmq_node_list_t* list, dmq_node_t* newnode) {
13
-	dmq_node_t* copy = shm_malloc(sizeof(dmq_node_t));
14
-	memcpy(copy, newnode, sizeof(dmq_node_t));
15
-	shm_str_dup(&copy->orig_uri, &newnode->orig_uri);
15
+inline dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
16
+	dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
17
+	memset(newnode, 0, sizeof(dmq_node_t));
18
+	shm_str_dup(&newnode->orig_uri, uri);
19
+	if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len, &newnode->uri) < 0) {
20
+		LM_ERR("error in parsing node uri\n");
21
+		goto error;
22
+	}
16 23
 	lock_get(&list->lock);
17
-	copy->next = list->nodes;
18
-	list->nodes = copy;
24
+	newnode->next = list->nodes;
25
+	list->nodes = newnode;
19 26
 	list->count++;
20 27
 	lock_release(&list->lock);
28
+	return newnode;
29
+error:
30
+	shm_free(newnode);
21 31
 	return 0;
22 32
 }
23 33
\ No newline at end of file
... ...
@@ -13,7 +13,7 @@
13 13
 
14 14
 typedef struct dmq_node {
15 15
 	str orig_uri;
16
-	struct sip_uri* uri;
16
+	struct sip_uri uri;
17 17
 	int status;
18 18
 	int last_notification;
19 19
 	struct dmq_node* next;
... ...
@@ -27,6 +27,9 @@ typedef struct dmq_node_list {
27 27
 
28 28
 dmq_node_list_t* init_dmq_node_list();
29 29
 int update_node_list(dmq_node_list_t* remote_list);
30
-int add_dmq_node(dmq_node_list_t* list, dmq_node_t* newnode);
30
+dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri);
31
+
32
+extern dmq_node_t* self_node;
33
+extern dmq_node_t* notification_node;	
31 34
 
32 35
 #endif
33 36
\ No newline at end of file
... ...
@@ -3,10 +3,16 @@
3 3
 #include "../../parser/parse_content.h"
4 4
 #include "../../parser/parse_from.h"
5 5
 #include "../../ut.h"
6
+#include "dmq.h"
6 7
 #include "worker.h"
7 8
 #include "peer.h"
8 9
 #include "message.h"
9 10
 
11
+static str dmq_200_rpl  = str_init("OK");
12
+static str dmq_400_rpl  = str_init("Bad request");
13
+static str dmq_500_rpl  = str_init("Server Internal Error");
14
+static str dmq_404_rpl  = str_init("User Not Found");
15
+
10 16
 int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
11 17
 	dmq_peer_t* peer;
12 18
 	if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
... ...
@@ -19,11 +25,18 @@ int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
19 25
 	       ZSW(str1), ZSW(str2));
20 26
 	/* the peer id is given as the userinfo part of the request URI */
21 27
 	peer = find_peer(msg->parsed_uri.user);
22
-	if(!peer) {
28
+	if(peer) {
23 29
 		LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
30
+		if(slb.freply(msg, 404, &dmq_404_rpl) < 0)
31
+		{
32
+			LM_ERR("sending reply\n");
33
+			goto error;
34
+		}
24 35
 		return 0;
25 36
 	}
26 37
 	LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len, msg->parsed_uri.user.s);
27 38
 	add_dmq_job(msg, peer);
28 39
 	return 0;
40
+error:
41
+	return -1;
29 42
 }
30 43
\ No newline at end of file
... ...
@@ -2,23 +2,28 @@
2 2
 #include "dmq_funcs.h"
3 3
 
4 4
 int add_notification_peer() {
5
-	dmq_node_t self_node;
6
-	self_node.orig_uri = dmq_server_address;
7
-	
8 5
 	dmq_notification_peer.callback = dmq_notification_callback;
9
-	dmq_notification_peer.description.s = "dmqpeer";
10
-	dmq_notification_peer.description.len = 7;
11
-	dmq_notification_peer.peer_id.s = "dmqpeer";
12
-	dmq_notification_peer.peer_id.len = 7;
6
+	dmq_notification_peer.description.s = "notification_peer";
7
+	dmq_notification_peer.description.len = 17;
8
+	dmq_notification_peer.peer_id.s = "notification_peer";
9
+	dmq_notification_peer.peer_id.len = 17;
13 10
 	if(register_dmq_peer(&dmq_notification_peer) < 0) {
14 11
 		LM_ERR("error in register_dmq_peer\n");
15 12
 		goto error;
16 13
 	}
17 14
 	/* add itself to the node list */
18
-	if(add_dmq_node(node_list, &self_node) < 0) {
15
+	self_node = add_dmq_node(node_list, &dmq_server_address);
16
+	if(!self_node) {
19 17
 		LM_ERR("error adding self node\n");
20 18
 		goto error;
21 19
 	}
20
+	/* add the notification server to the node list */
21
+	notification_node = add_dmq_node(node_list, &dmq_notification_address);
22
+	if(!notification_node) {
23
+		LM_ERR("error adding notification node\n");
24
+		goto error;
25
+	}
26
+	/* initial notification request to receive the complete node list */
22 27
 	if(request_initial_nodelist() < 0) {
23 28
 		LM_ERR("error in request_initial_notification\n");
24 29
 		goto error;
... ...
@@ -28,6 +33,19 @@ error:
28 33
 	return -1;
29 34
 }
30 35
 
36
+/**
37
+ * extract the node list from the body of a notification request SIP message
38
+ * the SIP request will look something like:
39
+ * 	KDMQ sip:10.0.0.0:5062
40
+ * 	To: ...
41
+ * 	From: ...
42
+ * 	Max-Forwards: ...
43
+ * 	Content-Length: 22
44
+ * 	
45
+ * 	sip:host1:port1;param1=value1
46
+ * 	sip:host2:port2;param2=value2
47
+ * 	...
48
+ */
31 49
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
32 50
 	int content_length, total_nodes = 0;
33 51
 	str body;
... ...
@@ -100,7 +118,8 @@ int build_node_str(dmq_node_t* node, char* buf, int buflen) {
100 118
 	return node->orig_uri.len;
101 119
 }
102 120
 
103
-/* builds the body of a notification message from the list of servers 
121
+/**
122
+ * builds the body of a notification message from the list of servers 
104 123
  * the result will look something like:
105 124
  * sip:host1:port1;param1=value1\r\n
106 125
  * sip:host2:port2;param2=value2\r\n
... ...
@@ -146,7 +165,7 @@ error:
146 165
 int request_initial_nodelist() {
147 166
 	str* body = build_notification_body();
148 167
 	int ret;
149
-	ret = send_dmq_message(&dmq_notification_peer, body);
168
+	ret = send_dmq_message(&dmq_notification_peer, body, notification_node);
150 169
 	pkg_free(body->s);
151 170
 	pkg_free(body);
152 171
 	return ret;