Browse code

temporarily changed the presence module to test dmq functionality

Marius Bucur authored on 06/04/2011 17:12:00
Showing 5 changed files
... ...
@@ -0,0 +1,14 @@
1
+#ifndef BIND_DMQ_H
2
+#define BIND_DMQ_H
3
+
4
+#include "peer.h"
5
+
6
+typedef struct dmq_api {
7
+	register_dmq_peer_t register_dmq_peer;
8
+} dmq_api_t;
9
+
10
+typedef int (*bind_dmq_f)(dmq_api_t* api);
11
+
12
+int bind_dmq(dmq_api_t* api);
13
+
14
+#endif
0 15
\ No newline at end of file
... ...
@@ -47,11 +47,11 @@
47 47
 #include "../../lib/kmi/mi.h"
48 48
 #include "../../lib/kcore/hash_func.h"
49 49
 #include "dmq.h"
50
+#include "peer.h"
50 51
 #include "bind_dmq.h"
51
-#include "dmq_worker.h"
52
+#include "worker.h"
52 53
 #include "../../mod_fix.h"
53 54
 
54
-static int mi_child_init(void);
55 55
 static int mod_init(void);
56 56
 static int child_init(int);
57 57
 static void destroy(void);
... ...
@@ -74,15 +74,18 @@ sl_api_t slb;
74 74
 
75 75
 /** module variables */
76 76
 dmq_worker_t* workers;
77
+dmq_peer_list_t* peer_list;
77 78
 
78 79
 /** module functions */
79 80
 static int mod_init(void);
80 81
 static int child_init(int);
81 82
 static void destroy(void);
82
-static int fixup_dmq(void** param, int param_no);
83
+static int handle_dmq_fixup(void** param, int param_no);
83 84
 
84 85
 static cmd_export_t cmds[] = {
85
-	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, fixup_dmq, 0, 
86
+	{"handle_dmq_message",  (cmd_function)handle_dmq_message, 0, handle_dmq_fixup, 0, 
87
+		REQUEST_ROUTE},
88
+	{"bind_dmq",        (cmd_function)bind_dmq, 0, 0, 0,
86 89
 		REQUEST_ROUTE},
87 90
 	{0, 0, 0, 0, 0, 0}
88 91
 };
... ...
@@ -93,7 +96,6 @@ static param_export_t params[] = {
93 96
 };
94 97
 
95 98
 static mi_export_t mi_cmds[] = {
96
-	{"cleanup", 0, 0, 0, mi_child_init},
97 99
 	{0, 0, 0, 0, 0}
98 100
 };
99 101
 
... ...
@@ -117,7 +119,6 @@ struct module_exports exports = {
117 119
  * init module function
118 120
  */
119 121
 static int mod_init(void) {
120
-	int i = 0;
121 122
 	
122 123
 	if(register_mi_mod(exports.name, mi_cmds)!=0) {
123 124
 		LM_ERR("failed to register MI commands\n");
... ...
@@ -136,23 +137,21 @@ static int mod_init(void) {
136 137
 
137 138
 	/* load all TM stuff */
138 139
 	if(load_tm_api(&tmb)==-1) {
139
-		LM_ERR("Can't load tm functions. Module TM not loaded?\n");
140
+		LM_ERR("can't load tm functions. TM module probably not loaded\n");
140 141
 		return -1;
141 142
 	}
142 143
 	
143
-	/* fork worker processes */
144
+	/* load peer list - the list containing the module callbacks for dmq */
145
+	peer_list = init_peer_list();
146
+	
147
+	/* register worker processes */
148
+	register_procs(num_workers);
149
+	
150
+	/* allocate workers array */
144 151
 	workers = shm_malloc(num_workers * sizeof(*workers));
145
-	for(i = 0; i < num_workers; i++) {
146
-		int newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
147
-		if(newpid < 0) {
148
-			LM_ERR("failed to form process\n");
149
-			return -1;
150
-		} else if(newpid == 0) {
151
-			/* child */
152
-			// worker loop
153
-		} else {
154
-			workers[i].pid = newpid;
155
-		}
152
+	if(workers == NULL) {
153
+		LM_ERR("error in shm_malloc\n");
154
+		return -1;
156 155
 	}
157 156
 	
158 157
 	startup_time = (int) time(NULL);
... ...
@@ -163,26 +162,40 @@ static int mod_init(void) {
163 162
  * Initialize children
164 163
  */
165 164
 static int child_init(int rank) {
166
-	if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) {
165
+  	int i, newpid;
166
+	if (rank == PROC_MAIN) {
167
+		/* fork worker processes */
168
+		for(i = 0; i < num_workers; i++) {
169
+			init_worker(&workers[i]);
170
+			LM_DBG("starting worker process %d\n", i);
171
+			newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
172
+			if(newpid < 0) {
173
+				LM_ERR("failed to form process\n");
174
+				return -1;
175
+			} else if(newpid == 0) {
176
+				// child - this will loop forever
177
+				worker_loop(i);
178
+			} else {
179
+				workers[i].pid = newpid;
180
+			}
181
+		}
182
+		return 0;
183
+	}
184
+	if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
167 185
 		/* do nothing for the main process */
168 186
 		return 0;
169 187
 	}
170 188
 
171 189
 	pid = my_pid();
172
-	
173
-	if(library_mode)
174
-		return 0;
175
-
176
-	return 0;
177
-}
178
-
179
-static int mi_child_init(void) {
180 190
 	return 0;
181 191
 }
182 192
 
183
-
184 193
 /*
185 194
  * destroy function
186 195
  */
187 196
 static void destroy(void) {
188 197
 }
198
+
199
+static int handle_dmq_fixup(void** param, int param_no) {
200
+ 	return 0;
201
+}
189 202
\ No newline at end of file
... ...
@@ -1,3 +1,33 @@
1
+#ifndef DMQ_H
2
+#define DMQ_H
3
+
4
+#include "../../dprint.h"
5
+#include "../../error.h"
6
+#include "../../sr_module.h"
7
+#include "bind_dmq.h"
8
+#include "peer.h"
9
+#include "worker.h"
1 10
 
2 11
 #define DEFAULT_NUM_WORKERS	2
12
+
13
+extern int num_workers;
14
+extern dmq_worker_t* workers;
15
+
16
+static inline int dmq_load_api(dmq_api_t* api) {
17
+	bind_dmq_f binddmq;
18
+	binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0);
19
+	if ( binddmq == 0) {
20
+		LM_ERR("cannot find bind_dmq\n");
21
+		return -1;
22
+	}
23
+	if (binddmq(api) < 0)
24
+	{
25
+		LM_ERR("cannot bind dmq api\n");
26
+		return -1;
27
+	}
28
+	return 0;
29
+}
30
+
3 31
 int handle_dmq_message(struct sip_msg* msg, char* str1 ,char* str2);
32
+
33
+#endif
4 34
\ No newline at end of file
5 35
deleted file mode 100644
... ...
@@ -1,8 +0,0 @@
1
-
2
-
3
-struct dmq_worker {
4
-	void* queue;
5
-	int pid;
6
-};
7
-
8
-typedef struct dmq_worker dmq_worker_t;
9 0
\ No newline at end of file
... ...
@@ -69,6 +69,7 @@
69 69
 #include "../../lib/kmi/mi.h"
70 70
 #include "../../lib/kcore/hash_func.h"
71 71
 #include "../pua/hash.h"
72
+#include "../dmq/dmq.h"
72 73
 #include "presence.h"
73 74
 #include "publish.h"
74 75
 #include "subscribe.h"
... ...
@@ -105,6 +106,9 @@ char* to_tag_pref = "10";
105 106
 struct tm_binds tmb;
106 107
 /* SL API structure */
107 108
 sl_api_t slb;
109
+/* dmq API structure */
110
+dmq_api_t dmq;
111
+register_dmq_peer_t register_dmq;
108 112
 
109 113
 /** module functions */
110 114
 
... ...
@@ -206,6 +210,22 @@ struct module_exports exports= {
206 210
 	child_init                  	/* per-child init function */
207 211
 };
208 212
 
213
+int dmq_callback(struct sip_msg* msg) {
214
+	LM_ERR("it worked - dmq module triggered the presence callback [%ld %d]\n", time(0), my_pid());
215
+	sleep(4);
216
+	return 0;
217
+}
218
+
219
+static void add_dmq_peer() {
220
+	dmq_peer_t presence_peer;
221
+	presence_peer.peer_id.s = "presence";
222
+	presence_peer.peer_id.len = 8;
223
+	presence_peer.description.s = "presence";
224
+	presence_peer.description.len = 8;
225
+	presence_peer.callback = dmq_callback;
226
+	register_dmq(&presence_peer);
227
+}
228
+
209 229
 /**
210 230
  * init module function
211 231
  */
... ...
@@ -268,6 +288,15 @@ static int mod_init(void)
268 288
 		return -1;
269 289
 	}
270 290
 	
291
+	if(dmq_load_api(&dmq) < 0) {
292
+		LM_ERR("cannot load dmq api\n");
293
+		return -1;
294
+	} else {
295
+		register_dmq = dmq.register_dmq_peer;
296
+		add_dmq_peer();
297
+		LM_DBG("presence-dmq loaded\n");
298
+	}
299
+	
271 300
 	if(db_url.s== NULL)
272 301
 	{
273 302
 		LM_ERR("database url not set!\n");