Browse code

added send_dmq_message functionality and also used it to send initial notification message

Marius Bucur authored on 22/04/2011 06:56:04
Showing 12 changed files
... ...
@@ -1,5 +1,7 @@
1
+#include "dmq.h"
1 2
 #include "bind_dmq.h"
2 3
 #include "peer.h"
4
+#include "dmq_funcs.h"
3 5
 
4 6
 int bind_dmq(dmq_api_t* api) {
5 7
 	api->register_dmq_peer = register_dmq_peer;
... ...
@@ -60,14 +60,12 @@ static void destroy(void);
60 60
 
61 61
 MODULE_VERSION
62 62
 
63
-/* database connection */
64
-int library_mode = 0;
65
-str server_address = {0, 0};
66 63
 int startup_time = 0;
67 64
 int pid = 0;
68 65
 
69 66
 /* module parameters */
70 67
 int num_workers = DEFAULT_NUM_WORKERS;
68
+str dmq_server_address = {0, 0};
71 69
 
72 70
 /* TM bind */
73 71
 struct tm_binds tmb;
... ...
@@ -75,6 +73,7 @@ struct tm_binds tmb;
75 73
 sl_api_t slb;
76 74
 
77 75
 /** module variables */
76
+str dmq_request_method = {"KDMQ", 4};
78 77
 dmq_worker_t* workers;
79 78
 dmq_peer_list_t* peer_list;
80 79
 /* the list of dmq servers */
... ...
@@ -87,6 +86,7 @@ static int mod_init(void);
87 86
 static int child_init(int);
88 87
 static void destroy(void);
89 88
 static int handle_dmq_fixup(void** param, int param_no);
89
+static int check_dmq_server_address();
90 90
 
91 91
 static cmd_export_t cmds[] = {
92 92
 	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, handle_dmq_fixup, 0, 
... ...
@@ -98,6 +98,7 @@ static cmd_export_t cmds[] = {
98 98
 
99 99
 static param_export_t params[] = {
100 100
 	{"num_workers", INT_PARAM, &num_workers},
101
+	{"server_address", STR_PARAM, &dmq_server_address.s},
101 102
 	{0, 0, 0}
102 103
 };
103 104
 
... ...
@@ -131,23 +132,19 @@ static int mod_init(void) {
131 132
 		return -1;
132 133
 	}
133 134
 
134
-	if(library_mode== 1) {
135
-		LM_DBG("dmq module used for API library purpose only\n");
136
-	}
137
-
138 135
 	/* bind the SL API */
139 136
 	if (sl_load_api(&slb)!=0) {
140 137
 		LM_ERR("cannot bind to SL API\n");
141 138
 		return -1;
142 139
 	}
143
-
140
+	
144 141
 	/* load all TM stuff */
145 142
 	if(load_tm_api(&tmb)==-1) {
146 143
 		LM_ERR("can't load tm functions. TM module probably not loaded\n");
147 144
 		return -1;
148 145
 	}
149
-	
150 146
 	/* load peer list - the list containing the module callbacks for dmq */
147
+	
151 148
 	peer_list = init_peer_list();
152 149
 	
153 150
 	/* load the dmq node list - the list containing the dmq servers */
... ...
@@ -155,6 +152,12 @@ static int mod_init(void) {
155 152
 	
156 153
 	/* register worker processes - add one because of the ping process */
157 154
 	register_procs(num_workers);
155
+	/* check server_address not empty and correct */
156
+	
157
+	if(check_dmq_server_address() < 0) {
158
+		LM_ERR("server address invalid\n");
159
+		return -1;
160
+	}
158 161
 	
159 162
 	/* allocate workers array */
160 163
 	workers = shm_malloc(num_workers * sizeof(*workers));
... ...
@@ -214,4 +217,24 @@ static void destroy(void) {
214 217
 
215 218
 static int handle_dmq_fixup(void** param, int param_no) {
216 219
  	return 0;
220
+}
221
+
222
+static int check_dmq_server_address() {
223
+	if(!dmq_server_address.s) {
224
+		return -1;
225
+	}
226
+	dmq_server_address.len = strlen(dmq_server_address.s);
227
+	if(!dmq_server_address.len) {
228
+		LM_ERR("empty server address\n");
229
+		return -1;
230
+	}
231
+	if(strncmp(dmq_server_address.s, "sip:", 4)) {
232
+		LM_ERR("server address must start with sip:\n");
233
+		return -1;
234
+	}
235
+	if(!strchr(dmq_server_address.s + 4, ':')) {
236
+		LM_ERR("server address must be of form sip:host:port\n");
237
+		return -1;
238
+	}
239
+	return 0;
217 240
 }
218 241
\ No newline at end of file
... ...
@@ -4,6 +4,8 @@
4 4
 #include "../../dprint.h"
5 5
 #include "../../error.h"
6 6
 #include "../../sr_module.h"
7
+#include "../../modules/tm/tm_load.h"
8
+#include "../../modules/sl/sl.h"
7 9
 #include "bind_dmq.h"
8 10
 #include "peer.h"
9 11
 #include "worker.h"
... ...
@@ -14,8 +16,14 @@
14 16
 extern int num_workers;
15 17
 extern dmq_worker_t* workers;
16 18
 extern dmq_peer_t dmq_notification_peer;
19
+extern str dmq_server_address;
17 20
 extern dmq_peer_list_t* peer_list;
18 21
 extern dmq_node_list_t* node_list;
22
+extern str dmq_request_method;
23
+
24
+/* sl and tm */
25
+extern struct tm_binds tmb;
26
+extern sl_api_t slb;
19 27
 
20 28
 static inline int dmq_load_api(dmq_api_t* api) {
21 29
 	bind_dmq_f binddmq;
22 30
new file mode 100644
... ...
@@ -0,0 +1,38 @@
1
+#include "dmq_funcs.h"
2
+
3
+int register_dmq_peer(dmq_peer_t* peer) {
4
+	lock_get(&peer_list->lock);
5
+	if(search_peer_list(peer_list, peer)) {
6
+		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
7
+		       peer->description.len, peer->description.s);
8
+		return -1;
9
+	}
10
+	add_peer(peer_list, peer);
11
+	lock_release(&peer_list->lock);
12
+	return 0;
13
+}
14
+
15
+void dmq_tm_callback( struct cell *t, int type, struct tmcb_params *ps) {
16
+	LM_ERR("callback\n");
17
+}
18
+
19
+int send_dmq_message(dmq_peer_t* peer, str* body) {
20
+	uac_req_t uac_r;
21
+	str str_hdr = {0, 0};
22
+	/* TODO - do not hardcode these - just for tesing purposes */
23
+	str from = {"sip:dmqnode@10.0.0.0:5060", 25};
24
+	str req_uri = from;
25
+	void *cb_param = NULL;
26
+	int result;
27
+	set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL, TMCB_LOCAL_COMPLETED,
28
+			dmq_tm_callback, (void*)cb_param);
29
+	result = tmb.t_request(&uac_r, &req_uri,
30
+			       &from, &from,
31
+			       NULL);
32
+	if(result < 0)
33
+	{
34
+		LM_ERR("error in tmb.t_request_within\n");
35
+		return -1;
36
+	}
37
+	return 0;
38
+}
0 39
\ No newline at end of file
1 40
new file mode 100644
... ...
@@ -0,0 +1,14 @@
1
+#ifndef DMQ_FUNCS_H
2
+#define DMQ_FUNCS_H
3
+
4
+#include "../../modules/tm/dlg.h"
5
+#include "../../modules/tm/tm_load.h"
6
+#include "dmq.h"
7
+#include "peer.h"
8
+#include "worker.h"
9
+#include "../../str.h"
10
+
11
+int register_dmq_peer(dmq_peer_t* peer);
12
+int send_dmq_message(dmq_peer_t* peer, str* body);
13
+
14
+#endif
0 15
\ No newline at end of file
... ...
@@ -1,3 +1,4 @@
1
+#include "../../ut.h"
1 2
 #include "dmqnode.h"
2 3
 #include "dmq.h"
3 4
 
... ...
@@ -6,4 +7,16 @@ dmq_node_list_t* init_dmq_node_list() {
6 7
 	memset(node_list, 0, sizeof(dmq_node_list_t));
7 8
 	lock_init(&node_list->lock);
8 9
 	return node_list;
10
+}
11
+
12
+inline int add_dmq_node(dmq_node_list_t* list, dmq_node_t* newnode) {
13
+	dmq_node_t* copy = shm_malloc(sizeof(dmq_node_t));
14
+	memcpy(copy, newnode, sizeof(dmq_node_t));
15
+	shm_str_dup(&copy->orig_uri, &newnode->orig_uri);
16
+	lock_get(&list->lock);
17
+	copy->next = list->nodes;
18
+	list->nodes = copy;
19
+	list->count++;
20
+	lock_release(&list->lock);
21
+	return 0;
9 22
 }
10 23
\ No newline at end of file
... ...
@@ -9,11 +9,14 @@
9 9
 #include "../../mem/shm_mem.h"
10 10
 #include "../../parser/parse_uri.h"
11 11
 
12
+#define NBODY_LEN 1024
13
+
12 14
 typedef struct dmq_node {
15
+	str orig_uri;
13 16
 	struct sip_uri* uri;
14 17
 	int status;
15 18
 	int last_notification;
16
-	struct dmqnode* next;
19
+	struct dmq_node* next;
17 20
 } dmq_node_t;
18 21
 
19 22
 typedef struct dmq_node_list {
... ...
@@ -24,5 +27,6 @@ typedef struct dmq_node_list {
24 27
 
25 28
 dmq_node_list_t* init_dmq_node_list();
26 29
 int update_node_list(dmq_node_list_t* remote_list);
30
+int add_dmq_node(dmq_node_list_t* list, dmq_node_t* newnode);
27 31
 
28 32
 #endif
29 33
\ No newline at end of file
... ...
@@ -1,28 +1,153 @@
1 1
 #include "notification_peer.h"
2
+#include "dmq_funcs.h"
2 3
 
3 4
 int add_notification_peer() {
4
-	int ret;
5
+	dmq_node_t self_node;
6
+	self_node.orig_uri = dmq_server_address;
7
+	
5 8
 	dmq_notification_peer.callback = dmq_notification_callback;
6 9
 	dmq_notification_peer.description.s = "dmqpeer";
7 10
 	dmq_notification_peer.description.len = 7;
8 11
 	dmq_notification_peer.peer_id.s = "dmqpeer";
9 12
 	dmq_notification_peer.peer_id.len = 7;
10
-	ret = register_dmq_peer(&dmq_notification_peer);
11
-	return ret;
13
+	if(register_dmq_peer(&dmq_notification_peer) < 0) {
14
+		LM_ERR("error in register_dmq_peer\n");
15
+		goto error;
16
+	}
17
+	/* add itself to the node list */
18
+	if(add_dmq_node(node_list, &self_node) < 0) {
19
+		LM_ERR("error adding self node\n");
20
+		goto error;
21
+	}
22
+	if(request_initial_nodelist() < 0) {
23
+		LM_ERR("error in request_initial_notification\n");
24
+		goto error;
25
+	}
26
+	return 0;
27
+error:
28
+	return -1;
29
+}
30
+
31
+int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
32
+	int content_length, total_nodes = 0;
33
+	str body;
34
+	str tmp_uri;
35
+	dmq_node_t *cur = NULL, *prev = NULL, *first = NULL;
36
+	char *tmp, *end, *match;
37
+	if(!msg->content_length) {
38
+		LM_ERR("no content length header found\n");
39
+		return -1;
40
+	}
41
+	content_length = get_content_length(msg);
42
+	if(!content_length) {
43
+		LM_DBG("content length is 0\n");
44
+		return total_nodes;
45
+	}
46
+	body.s = get_body(msg);
47
+	body.len = content_length;
48
+	tmp = body.s;
49
+	end = body.s + body.len;
50
+	
51
+	while(tmp < end) {
52
+		total_nodes++;
53
+		cur = shm_malloc(sizeof(dmq_node_t));
54
+		memset(cur, 0, sizeof(*cur));
55
+		/* keep the list tail in first */
56
+		if(!first) {
57
+			first = cur;
58
+		}
59
+		cur->next = prev;
60
+		
61
+		match = q_memchr(tmp, '\n', end - tmp);
62
+		if (match){
63
+			match++;
64
+		}else {
65
+			/* for the last line - take all of it */
66
+			match = end;
67
+		}
68
+		/* create the orig_uri from the parsed uri line and trim it */
69
+		tmp_uri.s = tmp;
70
+		tmp_uri.len = match - tmp;
71
+		shm_str_dup(&cur->orig_uri, &tmp_uri);
72
+		trim_r(cur->orig_uri);
73
+		
74
+		tmp = match;
75
+		
76
+		prev = cur;
77
+	}
78
+	lock_get(&update_list->lock);
79
+	first->next = update_list->nodes;
80
+	update_list->nodes = cur;
81
+	lock_release(&update_list->lock);
82
+	return total_nodes;
12 83
 }
13 84
 
14
-int dmq_notification_callback(struct sip_msg* msg) {
85
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
86
+	int nodes_recv;
15 87
 	/* received dmqnode list */
16
-	dmq_node_list_t* rlist;
17 88
 	LM_ERR("dmq triggered from dmq_notification_callback\n");
18
-	rlist = extract_node_list(msg);
19
-	if(!rlist) {
20
-		LM_ERR("extract_node_list failed\n");
89
+	nodes_recv = extract_node_list(node_list, msg);
90
+	LM_DBG("received %d nodes\n", nodes_recv);
91
+	return 0;
92
+}
93
+
94
+int build_node_str(dmq_node_t* node, char* buf, int buflen) {
95
+	if(buflen < node->orig_uri.len) {
96
+		LM_ERR("no more space left for node string\n");
21 97
 		return -1;
22 98
 	}
23
-	if(update_node_list(rlist) < 0) {
24
-		LM_ERR("cannot update node_list\n");
25
-		return -1;
99
+	memcpy(buf, node->orig_uri.s, node->orig_uri.len);
100
+	return node->orig_uri.len;
101
+}
102
+
103
+/* builds the body of a notification message from the list of servers 
104
+ * the result will look something like:
105
+ * sip:host1:port1;param1=value1\r\n
106
+ * sip:host2:port2;param2=value2\r\n
107
+ * sip:host3:port3;param3=value3
108
+ */
109
+str* build_notification_body() {
110
+	/* the length of the current line describing the server */
111
+	int slen;
112
+	/* the current length of the body */
113
+	int clen = 0;
114
+	dmq_node_t* cur_node = NULL;
115
+	str* body;
116
+	body = pkg_malloc(sizeof(str));
117
+	memset(body, 0, sizeof(str));
118
+	/* we allocate a chunk of data for the body */
119
+	body->len = NBODY_LEN;
120
+	body->s = pkg_malloc(body->len);
121
+	/* we add each server to the body - each on a different line */
122
+	lock_get(&node_list->lock);
123
+	cur_node = node_list->nodes;
124
+	while(cur_node) {
125
+		LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
126
+		/* body->len - clen - 2 bytes left to write - including the \r\n */
127
+		slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
128
+		if(slen < 0) {
129
+			LM_ERR("cannot build_node_string\n");
130
+			goto error;
131
+		}
132
+		clen += slen;
133
+		body->s[clen++] = '\r';
134
+		body->s[clen++] = '\n';
135
+		cur_node = cur_node->next;
26 136
 	}
27
-	return 0;
137
+	lock_release(&node_list->lock);
138
+	body->len = clen;
139
+	return body;
140
+error:
141
+	pkg_free(body->s);
142
+	pkg_free(body);
143
+	return NULL;
144
+}
145
+
146
+int request_initial_nodelist() {
147
+	str* body = build_notification_body();
148
+	int ret;
149
+	ret = send_dmq_message(&dmq_notification_peer, body);
150
+	pkg_free(body->s);
151
+	pkg_free(body);
152
+	return ret;
28 153
 }
29 154
\ No newline at end of file
... ...
@@ -1,6 +1,13 @@
1 1
 #include "dmq.h"
2 2
 #include "dmqnode.h"
3
+#include "peer.h"
4
+#include "../../parser/msg_parser.h"
5
+#include "../../parser/parse_content.h"
6
+#include "../../ut.h"
3 7
 
4 8
 int add_notification_peer();
5
-int dmq_notification_callback(struct sip_msg* msg);
6
-dmq_node_list_t* extract_node_list(struct sip_msg* msg);
7 9
\ No newline at end of file
10
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
11
+int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
12
+str* build_notification_body();
13
+int build_node_str(dmq_node_t* node, char* buf, int buflen);
14
+int request_initial_nodelist();
8 15
\ No newline at end of file
... ...
@@ -36,18 +36,6 @@ void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
36 36
 	peer_list->peers = new_peer;
37 37
 }
38 38
 
39
-int register_dmq_peer(dmq_peer_t* peer) {
40
-	lock_get(&peer_list->lock);
41
-	if(search_peer_list(peer_list, peer)) {
42
-		LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s,
43
-		       peer->description.len, peer->description.s);
44
-		return -1;
45
-	}
46
-	add_peer(peer_list, peer);
47
-	lock_release(&peer_list->lock);
48
-	return 0;
49
-}
50
-
51 39
 dmq_peer_t* find_peer(str peer_id) {
52 40
 	dmq_peer_t foo_peer;
53 41
 	foo_peer.peer_id = peer_id;
... ...
@@ -9,7 +9,12 @@
9 9
 #include "../../mem/shm_mem.h"
10 10
 #include "../../parser/msg_parser.h"
11 11
 
12
-typedef int(*peer_callback_t)(struct sip_msg*);
12
+typedef struct peer_response {
13
+	int resp_code;
14
+	str body;
15
+} peer_reponse_t;
16
+
17
+typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
13 18
 
14 19
 typedef struct dmq_peer {
15 20
 	str peer_id;
... ...
@@ -30,7 +35,7 @@ dmq_peer_list_t* init_peer_list();
30 35
 dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
31 36
 typedef int (*register_dmq_peer_t)(dmq_peer_t*);
32 37
 
33
-int register_dmq_peer(dmq_peer_t* peer);
38
+void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
34 39
 dmq_peer_t* find_peer(str peer_id);
35 40
 
36 41
 
... ...
@@ -1,9 +1,11 @@
1 1
 #include "dmq.h"
2
+#include "peer.h"
2 3
 #include "worker.h"
3 4
 
4 5
 void worker_loop(int id) {
5 6
 	dmq_worker_t* worker = &workers[id];
6 7
 	dmq_job_t* current_job;
8
+	peer_reponse_t peer_response;
7 9
 	int ret_value;
8 10
 	for(;;) {
9 11
 		LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
... ...
@@ -14,7 +16,7 @@ void worker_loop(int id) {
14 16
 			current_job = job_queue_pop(worker->queue);
15 17
 			/* job_queue_pop might return NULL if queue is empty */
16 18
 			if(current_job) {
17
-				ret_value = current_job->f(current_job->msg);
19
+				ret_value = current_job->f(current_job->msg, &peer_response);
18 20
 				if(ret_value < 0) {
19 21
 					LM_ERR("running job failed\n");
20 22
 				}