Browse code

core, lib, modules: restructured source code tree

- new folder src/ to hold the source code for main project applications
- main.c is in src/
- all core files are subfolder are in src/core/
- modules are in src/modules/
- libs are in src/lib/
- application Makefiles are in src/
- application binary is built in src/ (src/kamailio)

Daniel-Constantin Mierla authored on 07/12/2016 11:03:51
Showing 1 changed files
1 1
deleted file mode 100644
... ...
@@ -1,329 +0,0 @@
1
-/*
2
- * $Id$
3
- *
4
- * Kazoo module interface
5
- *
6
- * Copyright (C) 2010-2014 2600Hz
7
- *
8
- * This file is part of Kamailio, a free SIP server.
9
- *
10
- * Kamailio is free software; you can redistribute it and/or modify
11
- * it under the terms of the GNU General Public License as published by
12
- * the Free Software Foundation; either version 2 of the License, or
13
- * (at your option) any later version
14
- *
15
- * Kamailio is distributed in the hope that it will be useful,
16
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
17
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
- * GNU General Public License for more details.
19
- *
20
- * You should have received a copy of the GNU General Public License
21
- * along with this program; if not, write to the Free Software
22
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23
- *
24
- * History:
25
- * --------
26
- * 2014-08  first version (2600hz)
27
- */
28
-
29
-#ifndef KZ_AMQP_H_
30
-#define KZ_AMQP_H_
31
-
32
-#include <fcntl.h>
33
-#include <event.h>
34
-#include <sys/timerfd.h>
35
-#include <amqp.h>
36
-
37
-#include "../../sr_module.h"
38
-#include "../../str.h"
39
-
40
-#include "const.h"
41
-#include "defs.h"
42
-#include "../../fmsg.h"
43
-
44
-typedef enum {
45
-	KZ_AMQP_CONNECTION_CLOSED     = 0,
46
-	KZ_AMQP_CONNECTION_OPEN    = 1,
47
-	KZ_AMQP_CONNECTION_FAILURE    = 2
48
-} kz_amqp_connection_state;
49
-
50
-typedef enum {
51
-	KZ_AMQP_CMD_PUBLISH     = 1,
52
-	KZ_AMQP_CMD_CALL    = 2,
53
-	KZ_AMQP_CMD_CONSUME = 3,
54
-	KZ_AMQP_CMD_ACK = 4,
55
-	KZ_AMQP_CMD_TARGETED_CONSUMER = 5,
56
-	KZ_AMQP_CMD_PUBLISH_BROADCAST = 6,
57
-	KZ_AMQP_CMD_COLLECT = 7,
58
-	KZ_AMQP_CMD_ASYNC_CALL    = 8,
59
-	KZ_AMQP_CMD_ASYNC_COLLECT    = 9
60
-} kz_amqp_pipe_cmd_type;
61
-
62
-typedef enum {
63
-	KZ_AMQP_CHANNEL_CLOSED     = 0,
64
-	KZ_AMQP_CHANNEL_FREE     = 1,
65
-	KZ_AMQP_CHANNEL_PUBLISHING    = 2,
66
-	KZ_AMQP_CHANNEL_BINDED = 3,
67
-	KZ_AMQP_CHANNEL_CALLING    = 4,
68
-	KZ_AMQP_CHANNEL_CONSUMING = 5
69
-} kz_amqp_channel_state;
70
-
71
-typedef struct amqp_connection_info kz_amqp_connection_info;
72
-typedef kz_amqp_connection_info *kz_amqp_connection_info_ptr;
73
-
74
-extern int dbk_channels;
75
-extern str dbk_node_hostname;
76
-extern str dbk_consumer_event_key;
77
-extern str dbk_consumer_event_subkey;
78
-extern int dbk_consumer_workers;
79
-
80
-typedef struct kz_amqp_connection_t {
81
-	kz_amqp_connection_info info;
82
-	char* url;
83
-//    struct kz_amqp_connection_t* next;
84
-} kz_amqp_connection, *kz_amqp_connection_ptr;
85
-
86
-/*
87
-typedef struct {
88
-	kz_amqp_connection_ptr current;
89
-	kz_amqp_connection_ptr head;
90
-	kz_amqp_connection_ptr tail;
91
-} kz_amqp_connection_pool, *kz_amqp_connection_pool_ptr;
92
-*/
93
-typedef struct kz_amqp_conn_t {
94
-	struct kz_amqp_server_t* server;
95
-	amqp_connection_state_t conn;
96
-	kz_amqp_connection_state state;
97
-	struct event *ev;
98
-	struct itimerspec *timer;
99
-	amqp_socket_t *socket;
100
-	amqp_channel_t channel_count;
101
-	amqp_channel_t channel_counter;
102
-//    struct kz_amqp_conn_t* next;
103
-} kz_amqp_conn, *kz_amqp_conn_ptr;
104
-
105
-typedef struct {
106
-	kz_amqp_conn_ptr current;
107
-	kz_amqp_conn_ptr head;
108
-	kz_amqp_conn_ptr tail;
109
-} kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;
110
-
111
-
112
-/*
113
-#define AMQP_KZ_CMD_PUBLISH       1
114
-#define AMQP_KZ_CMD_CALL          2
115
-#define AMQP_KZ_CMD_CONSUME       3
116
-*/
117
-
118
-typedef struct {
119
-    gen_lock_t lock;
120
-	kz_amqp_pipe_cmd_type type;
121
-	char* exchange;
122
-	char* exchange_type;
123
-	char* routing_key;
124
-	char* reply_routing_key;
125
-	char* queue;
126
-	char* payload;
127
-	char* return_payload;
128
-	str* message_id;
129
-	int   return_code;
130
-	int   consumer;
131
-	int   server_id;
132
-	uint64_t delivery_tag;
133
-	amqp_channel_t channel;
134
-	struct timeval timeout;
135
-
136
-	/* timer */
137
-//	struct event *timer_ev;
138
-//	int timerfd;
139
-
140
-	/* async */
141
-	char *cb_route;
142
-	char *err_route;
143
-	unsigned int t_hash;
144
-	unsigned int t_label;
145
-
146
-
147
-} kz_amqp_cmd, *kz_amqp_cmd_ptr;
148
-
149
-typedef struct {
150
-	str* message_id;
151
-
152
-	/* timer */
153
-	struct event *timer_ev;
154
-	int timerfd;
155
-
156
-} kz_amqp_cmd_timeout, *kz_amqp_cmd_timeout_ptr;
157
-
158
-typedef struct kz_amqp_cmd_entry_t {
159
-	kz_amqp_cmd_ptr cmd;
160
-	struct kz_amqp_cmd_entry_t* next;
161
-} kz_amqp_cmd_entry, *kz_amqp_cmd_entry_ptr;
162
-
163
-typedef struct kz_amqp_cmd_table_t {
164
-	kz_amqp_cmd_entry_ptr entries;
165
-	gen_lock_t lock;
166
-} kz_amqp_cmd_table, *kz_amqp_cmd_table_ptr;
167
-
168
-
169
-typedef struct {
170
-	char* payload;
171
-	uint64_t delivery_tag;
172
-	amqp_channel_t channel;
173
-	char* event_key;
174
-	char* event_subkey;
175
-	str* message_id;
176
-	kz_amqp_cmd_ptr cmd;
177
-} kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
178
-
179
-typedef struct {
180
-	amqp_bytes_t exchange;
181
-	amqp_bytes_t exchange_type;
182
-	amqp_bytes_t routing_key;
183
-	amqp_bytes_t queue;
184
-	amqp_bytes_t event_key;
185
-	amqp_bytes_t event_subkey;
186
-	amqp_boolean_t passive;
187
-	amqp_boolean_t durable;
188
-	amqp_boolean_t exclusive;
189
-	amqp_boolean_t auto_delete;
190
-	amqp_boolean_t no_ack;
191
-	amqp_boolean_t wait_for_consumer_ack;
192
-	amqp_boolean_t federate;
193
-} kz_amqp_bind, *kz_amqp_bind_ptr;
194
-
195
-typedef struct {
196
-	kz_amqp_cmd_ptr cmd;
197
-	kz_amqp_bind_ptr targeted;
198
-	kz_amqp_bind_ptr consumer;
199
-	amqp_channel_t channel;
200
-	kz_amqp_channel_state state;
201
-	struct timeval timer;
202
-	gen_lock_t lock;
203
-} kz_amqp_channel, *kz_amqp_channel_ptr;
204
-
205
-typedef struct kz_amqp_binding_t {
206
-	kz_amqp_bind_ptr bind;
207
-    struct kz_amqp_binding_t* next;
208
-} kz_amqp_binding, *kz_amqp_binding_ptr;
209
-
210
-typedef struct {
211
-	kz_amqp_binding_ptr head;
212
-	kz_amqp_binding_ptr tail;
213
-} kz_amqp_bindings, *kz_amqp_bindings_ptr;
214
-
215
-typedef struct kz_amqp_server_t {
216
-	int id;
217
-	int channel_index;
218
-	struct kz_amqp_zone_t* zone;
219
-	kz_amqp_connection_ptr connection;
220
-	kz_amqp_conn_ptr producer;
221
-//	kz_amqp_conn_ptr consumer;
222
-	kz_amqp_channel_ptr channels;
223
-//	kz_amqp_channel_ptr consumer_channels;
224
-    struct kz_amqp_server_t* next;
225
-} kz_amqp_server, *kz_amqp_server_ptr;
226
-
227
-typedef struct kz_amqp_servers_t {
228
-	kz_amqp_server_ptr head;
229
-	kz_amqp_server_ptr tail;
230
-} kz_amqp_servers, *kz_amqp_servers_ptr;
231
-
232
-typedef struct kz_amqp_zone_t {
233
-	char* zone;
234
-	kz_amqp_servers_ptr servers;
235
-    struct kz_amqp_zone_t* next;
236
-} kz_amqp_zone, *kz_amqp_zone_ptr;
237
-
238
-typedef struct kz_amqp_zones_t {
239
-	kz_amqp_zone_ptr head;
240
-	kz_amqp_zone_ptr tail;
241
-} kz_amqp_zones, *kz_amqp_zones_ptr;
242
-
243
-int kz_amqp_init();
244
-void kz_amqp_destroy();
245
-int kz_amqp_add_connection(modparam_t type, void* val);
246
-
247
-int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
248
-int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* dst);
249
-int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, char* payload);
250
-int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
251
-int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue_name, char* routing_key);
252
-int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
253
-int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);
254
-
255
-int kz_amqp_async_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _cb_route, char* _err_route);
256
-
257
-//void kz_amqp_generic_consumer_loop(int child_no);
258
-void kz_amqp_manager_loop(int child_no);
259
-
260
-int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr);
261
-int kz_amqp_publisher_proc(int cmd_pipe);
262
-int kz_amqp_timeout_proc();
263
-int kz_amqp_consumer_worker_proc(int cmd_pipe);
264
-
265
-int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection);
266
-
267
-int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
268
-int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
269
-int kz_pv_get_connection_host(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
270
-
271
-/* callid generator */
272
-int kz_callid_init(void);
273
-int kz_callid_child_init(int rank);
274
-void kz_generate_callid(str* callid);
275
-
276
-kz_amqp_zone_ptr kz_amqp_get_primary_zone();
277
-kz_amqp_zone_ptr kz_amqp_get_zones();
278
-kz_amqp_zone_ptr kz_amqp_get_zone(char* zone);
279
-kz_amqp_zone_ptr kz_amqp_add_zone(char* zone);
280
-
281
-void kz_amqp_fire_connection_event(char *event, char* host);
282
-
283
-void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd);
284
-
285
-static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
286
-{
287
-	amqp_connection_close_t *mconn;
288
-	amqp_channel_close_t *mchan;
289
-
290
-	switch (x.reply_type) {
291
-		case AMQP_RESPONSE_NORMAL:
292
-			return 0;
293
-
294
-		case AMQP_RESPONSE_NONE:
295
-			LM_ERR("%s: missing RPC reply type!", context);
296
-			break;
297
-
298
-		case AMQP_RESPONSE_LIBRARY_EXCEPTION:
299
-			LM_ERR("%s: %s\n", context,  "(end-of-stream)");
300
-			break;
301
-
302
-		case AMQP_RESPONSE_SERVER_EXCEPTION:
303
-			switch (x.reply.id) {
304
-				case AMQP_CONNECTION_CLOSE_METHOD:
305
-					mconn = (amqp_connection_close_t *)x.reply.decoded;
306
-					LM_ERR("%s: server connection error %d, message: %.*s",
307
-							context, mconn->reply_code,
308
-							(int)mconn->reply_text.len,
309
-							(char *)mconn->reply_text.bytes);
310
-					break;
311
-				case AMQP_CHANNEL_CLOSE_METHOD:
312
-						mchan = (amqp_channel_close_t *)x.reply.decoded;
313
-					LM_ERR("%s: server channel error %d, message: %.*s",
314
-							context, mchan->reply_code,
315
-							(int)mchan->reply_text.len,
316
-							(char *)mchan->reply_text.bytes);
317
-					break;
318
-				default:
319
-					LM_ERR("%s: unknown server error, method id 0x%08X",
320
-							context, x.reply.id);
321
-					break;
322
-			}
323
-			break;
324
-	}
325
-	return -1;
326
-}
327
-
328
-
329
-#endif /* KZ_AMQP_H_ */
Browse code

modules: faked message api has moved to core

Mikko Lehto authored on 02/12/2016 10:46:37
Showing 1 changed files
... ...
@@ -39,7 +39,7 @@
39 39
 
40 40
 #include "const.h"
41 41
 #include "defs.h"
42
-#include "../../lib/kcore/faked_msg.h"
42
+#include "../../fmsg.h"
43 43
 
44 44
 typedef enum {
45 45
 	KZ_AMQP_CONNECTION_CLOSED     = 0,
Browse code

update headers

lazedo authored on 11/05/2016 22:16:35
Showing 1 changed files
... ...
@@ -1,8 +1,29 @@
1 1
 /*
2
- * kz_amqp.h
2
+ * $Id$
3 3
  *
4
- *  Created on: Jul 29, 2014
5
- *      Author: root
4
+ * Kazoo module interface
5
+ *
6
+ * Copyright (C) 2010-2014 2600Hz
7
+ *
8
+ * This file is part of Kamailio, a free SIP server.
9
+ *
10
+ * Kamailio is free software; you can redistribute it and/or modify
11
+ * it under the terms of the GNU General Public License as published by
12
+ * the Free Software Foundation; either version 2 of the License, or
13
+ * (at your option) any later version
14
+ *
15
+ * Kamailio is distributed in the hope that it will be useful,
16
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
17
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
+ * GNU General Public License for more details.
19
+ *
20
+ * You should have received a copy of the GNU General Public License
21
+ * along with this program; if not, write to the Free Software
22
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23
+ *
24
+ * History:
25
+ * --------
26
+ * 2014-08  first version (2600hz)
6 27
  */
7 28
 
8 29
 #ifndef KZ_AMQP_H_
Browse code

kazoo : first approach to multiple consumers

Luis Azedo authored on 01/07/2015 20:38:01 • Luis Azedo committed on 01/07/2015 20:38:01
Showing 1 changed files
... ...
@@ -54,7 +54,7 @@ extern int dbk_channels;
54 54
 extern str dbk_node_hostname;
55 55
 extern str dbk_consumer_event_key;
56 56
 extern str dbk_consumer_event_subkey;
57
-extern int dbk_consumer_processes;
57
+extern int dbk_consumer_workers;
58 58
 
59 59
 typedef struct kz_amqp_connection_t {
60 60
 	kz_amqp_connection_info info;
... ...
@@ -197,9 +197,9 @@ typedef struct kz_amqp_server_t {
197 197
 	struct kz_amqp_zone_t* zone;
198 198
 	kz_amqp_connection_ptr connection;
199 199
 	kz_amqp_conn_ptr producer;
200
-	kz_amqp_conn_ptr consumer;
200
+//	kz_amqp_conn_ptr consumer;
201 201
 	kz_amqp_channel_ptr channels;
202
-	kz_amqp_channel_ptr consumer_channels;
202
+//	kz_amqp_channel_ptr consumer_channels;
203 203
     struct kz_amqp_server_t* next;
204 204
 } kz_amqp_server, *kz_amqp_server_ptr;
205 205
 
Browse code

kazoo : add async query feature

suspend the transaction on send and continue on return or timeout

Luis Azedo authored on 01/07/2015 11:15:10
Showing 1 changed files
... ...
@@ -112,11 +112,27 @@ typedef struct {
112 112
 	amqp_channel_t channel;
113 113
 	struct timeval timeout;
114 114
 
115
+	/* timer */
116
+//	struct event *timer_ev;
117
+//	int timerfd;
118
+
119
+	/* async */
120
+	char *cb_route;
121
+	char *err_route;
122
+	unsigned int t_hash;
123
+	unsigned int t_label;
124
+
125
+
126
+} kz_amqp_cmd, *kz_amqp_cmd_ptr;
127
+
128
+typedef struct {
129
+	str* message_id;
130
+
115 131
 	/* timer */
116 132
 	struct event *timer_ev;
117 133
 	int timerfd;
118 134
 
119
-} kz_amqp_cmd, *kz_amqp_cmd_ptr;
135
+} kz_amqp_cmd_timeout, *kz_amqp_cmd_timeout_ptr;
120 136
 
121 137
 typedef struct kz_amqp_cmd_entry_t {
122 138
 	kz_amqp_cmd_ptr cmd;
... ...
@@ -135,6 +151,8 @@ typedef struct {
135 151
 	amqp_channel_t channel;
136 152
 	char* event_key;
137 153
 	char* event_subkey;
154
+	str* message_id;
155
+	kz_amqp_cmd_ptr cmd;
138 156
 } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
139 157
 
140 158
 typedef struct {
... ...
@@ -213,6 +231,8 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange
213 231
 int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
214 232
 int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);
215 233
 
234
+int kz_amqp_async_query(struct sip_msg* msg, char* exchange, char* routing_key, char* payload, char* _cb_route, char* _err_route);
235
+
216 236
 //void kz_amqp_generic_consumer_loop(int child_no);
217 237
 void kz_amqp_manager_loop(int child_no);
218 238
 
Browse code

kazoo : changes in targeted exchanges

Luis Azedo authored on 26/06/2015 18:35:55
Showing 1 changed files
... ...
@@ -30,7 +30,12 @@ typedef enum {
30 30
 	KZ_AMQP_CMD_PUBLISH     = 1,
31 31
 	KZ_AMQP_CMD_CALL    = 2,
32 32
 	KZ_AMQP_CMD_CONSUME = 3,
33
-	KZ_AMQP_CMD_ACK = 4
33
+	KZ_AMQP_CMD_ACK = 4,
34
+	KZ_AMQP_CMD_TARGETED_CONSUMER = 5,
35
+	KZ_AMQP_CMD_PUBLISH_BROADCAST = 6,
36
+	KZ_AMQP_CMD_COLLECT = 7,
37
+	KZ_AMQP_CMD_ASYNC_CALL    = 8,
38
+	KZ_AMQP_CMD_ASYNC_COLLECT    = 9
34 39
 } kz_amqp_pipe_cmd_type;
35 40
 
36 41
 typedef enum {
... ...
@@ -83,10 +88,11 @@ typedef struct {
83 88
 } kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;
84 89
 
85 90
 
91
+/*
86 92
 #define AMQP_KZ_CMD_PUBLISH       1
87 93
 #define AMQP_KZ_CMD_CALL          2
88 94
 #define AMQP_KZ_CMD_CONSUME       3
89
-
95
+*/
90 96
 
91 97
 typedef struct {
92 98
     gen_lock_t lock;
... ...
@@ -98,14 +104,31 @@ typedef struct {
98 104
 	char* queue;
99 105
 	char* payload;
100 106
 	char* return_payload;
107
+	str* message_id;
101 108
 	int   return_code;
102 109
 	int   consumer;
103 110
 	int   server_id;
104 111
 	uint64_t delivery_tag;
105 112
 	amqp_channel_t channel;
106 113
 	struct timeval timeout;
114
+
115
+	/* timer */
116
+	struct event *timer_ev;
117
+	int timerfd;
118
+
107 119
 } kz_amqp_cmd, *kz_amqp_cmd_ptr;
108 120
 
121
+typedef struct kz_amqp_cmd_entry_t {
122
+	kz_amqp_cmd_ptr cmd;
123
+	struct kz_amqp_cmd_entry_t* next;
124
+} kz_amqp_cmd_entry, *kz_amqp_cmd_entry_ptr;
125
+
126
+typedef struct kz_amqp_cmd_table_t {
127
+	kz_amqp_cmd_entry_ptr entries;
128
+	gen_lock_t lock;
129
+} kz_amqp_cmd_table, *kz_amqp_cmd_table_ptr;
130
+
131
+
109 132
 typedef struct {
110 133
 	char* payload;
111 134
 	uint64_t delivery_tag;
... ...
@@ -216,6 +239,8 @@ kz_amqp_zone_ptr kz_amqp_add_zone(char* zone);
216 239
 
217 240
 void kz_amqp_fire_connection_event(char *event, char* host);
218 241
 
242
+void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd);
243
+
219 244
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
220 245
 {
221 246
 	amqp_connection_close_t *mconn;
Browse code

kazoo : allow multiple simultaneous servers

Luis Azedo authored on 23/06/2015 03:57:07
Showing 1 changed files
... ...
@@ -8,6 +8,9 @@
8 8
 #ifndef KZ_AMQP_H_
9 9
 #define KZ_AMQP_H_
10 10
 
11
+#include <fcntl.h>
12
+#include <event.h>
13
+#include <sys/timerfd.h>
11 14
 #include <amqp.h>
12 15
 
13 16
 #include "../../sr_module.h"
... ...
@@ -17,6 +20,28 @@
17 20
 #include "defs.h"
18 21
 #include "../../lib/kcore/faked_msg.h"
19 22
 
23
+typedef enum {
24
+	KZ_AMQP_CONNECTION_CLOSED     = 0,
25
+	KZ_AMQP_CONNECTION_OPEN    = 1,
26
+	KZ_AMQP_CONNECTION_FAILURE    = 2
27
+} kz_amqp_connection_state;
28
+
29
+typedef enum {
30
+	KZ_AMQP_CMD_PUBLISH     = 1,
31
+	KZ_AMQP_CMD_CALL    = 2,
32
+	KZ_AMQP_CMD_CONSUME = 3,
33
+	KZ_AMQP_CMD_ACK = 4
34
+} kz_amqp_pipe_cmd_type;
35
+
36
+typedef enum {
37
+	KZ_AMQP_CHANNEL_CLOSED     = 0,
38
+	KZ_AMQP_CHANNEL_FREE     = 1,
39
+	KZ_AMQP_CHANNEL_PUBLISHING    = 2,
40
+	KZ_AMQP_CHANNEL_BINDED = 3,
41
+	KZ_AMQP_CHANNEL_CALLING    = 4,
42
+	KZ_AMQP_CHANNEL_CONSUMING = 5
43
+} kz_amqp_channel_state;
44
+
20 45
 typedef struct amqp_connection_info kz_amqp_connection_info;
21 46
 typedef kz_amqp_connection_info *kz_amqp_connection_info_ptr;
22 47
 
... ...
@@ -29,22 +54,26 @@ extern int dbk_consumer_processes;
29 54
 typedef struct kz_amqp_connection_t {
30 55
 	kz_amqp_connection_info info;
31 56
 	char* url;
32
-    struct kz_amqp_connection_t* next;
57
+//    struct kz_amqp_connection_t* next;
33 58
 } kz_amqp_connection, *kz_amqp_connection_ptr;
34 59
 
60
+/*
35 61
 typedef struct {
36 62
 	kz_amqp_connection_ptr current;
37 63
 	kz_amqp_connection_ptr head;
38 64
 	kz_amqp_connection_ptr tail;
39 65
 } kz_amqp_connection_pool, *kz_amqp_connection_pool_ptr;
40
-
66
+*/
41 67
 typedef struct kz_amqp_conn_t {
42
-	kz_amqp_connection_ptr info;
68
+	struct kz_amqp_server_t* server;
43 69
 	amqp_connection_state_t conn;
70
+	kz_amqp_connection_state state;
71
+	struct event *ev;
72
+	struct itimerspec *timer;
44 73
 	amqp_socket_t *socket;
45 74
 	amqp_channel_t channel_count;
46 75
 	amqp_channel_t channel_counter;
47
-    struct kz_amqp_conn_t* next;
76
+//    struct kz_amqp_conn_t* next;
48 77
 } kz_amqp_conn, *kz_amqp_conn_ptr;
49 78
 
50 79
 typedef struct {
... ...
@@ -58,21 +87,6 @@ typedef struct {
58 87
 #define AMQP_KZ_CMD_CALL          2
59 88
 #define AMQP_KZ_CMD_CONSUME       3
60 89
 
61
-typedef enum {
62
-	KZ_AMQP_PUBLISH     = 1,
63
-	KZ_AMQP_CALL    = 2,
64
-	KZ_AMQP_CONSUME = 3,
65
-	KZ_AMQP_ACK = 4
66
-} kz_amqp_pipe_cmd_type;
67
-
68
-typedef enum {
69
-	KZ_AMQP_CLOSED     = 0,
70
-	KZ_AMQP_FREE     = 1,
71
-	KZ_AMQP_PUBLISHING    = 2,
72
-	KZ_AMQP_BINDED = 3,
73
-	KZ_AMQP_CALLING    = 4,
74
-	KZ_AMQP_CONSUMING = 5
75
-} kz_amqp_channel_state;
76 90
 
77 91
 typedef struct {
78 92
     gen_lock_t lock;
... ...
@@ -86,6 +100,7 @@ typedef struct {
86 100
 	char* return_payload;
87 101
 	int   return_code;
88 102
 	int   consumer;
103
+	int   server_id;
89 104
 	uint64_t delivery_tag;
90 105
 	amqp_channel_t channel;
91 106
 	struct timeval timeout;
... ...
@@ -135,6 +150,34 @@ typedef struct {
135 150
 	kz_amqp_binding_ptr tail;
136 151
 } kz_amqp_bindings, *kz_amqp_bindings_ptr;
137 152
 
153
+typedef struct kz_amqp_server_t {
154
+	int id;
155
+	int channel_index;
156
+	struct kz_amqp_zone_t* zone;
157
+	kz_amqp_connection_ptr connection;
158
+	kz_amqp_conn_ptr producer;
159
+	kz_amqp_conn_ptr consumer;
160
+	kz_amqp_channel_ptr channels;
161
+	kz_amqp_channel_ptr consumer_channels;
162
+    struct kz_amqp_server_t* next;
163
+} kz_amqp_server, *kz_amqp_server_ptr;
164
+
165
+typedef struct kz_amqp_servers_t {
166
+	kz_amqp_server_ptr head;
167
+	kz_amqp_server_ptr tail;
168
+} kz_amqp_servers, *kz_amqp_servers_ptr;
169
+
170
+typedef struct kz_amqp_zone_t {
171
+	char* zone;
172
+	kz_amqp_servers_ptr servers;
173
+    struct kz_amqp_zone_t* next;
174
+} kz_amqp_zone, *kz_amqp_zone_ptr;
175
+
176
+typedef struct kz_amqp_zones_t {
177
+	kz_amqp_zone_ptr head;
178
+	kz_amqp_zone_ptr tail;
179
+} kz_amqp_zones, *kz_amqp_zones_ptr;
180
+
138 181
 int kz_amqp_init();
139 182
 void kz_amqp_destroy();
140 183
 int kz_amqp_add_connection(modparam_t type, void* val);
... ...
@@ -146,15 +189,16 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload);
146 189
 int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue_name, char* routing_key);
147 190
 int kz_amqp_encode(struct sip_msg* msg, char* unencoded, char* encoded);
148 191
 int kz_amqp_encode_ex(str* unencoded, pv_value_p dst_val);
149
-//void kz_amqp_presence_consumer_loop(int child_no);
150
-void kz_amqp_consumer_loop(int child_no);
151 192
 
152 193
 //void kz_amqp_generic_consumer_loop(int child_no);
153 194
 void kz_amqp_manager_loop(int child_no);
154 195
 
155
-void kz_amqp_consumer_proc(int child_no);
156
-void kz_amqp_publisher_proc(int child_no);
157
-void kz_amqp_timeout_proc(int child_no);
196
+int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr);
197
+int kz_amqp_publisher_proc(int cmd_pipe);
198
+int kz_amqp_timeout_proc();
199
+int kz_amqp_consumer_worker_proc(int cmd_pipe);
200
+
201
+int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection);
158 202
 
159 203
 int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
160 204
 int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
... ...
@@ -165,6 +209,13 @@ int kz_callid_init(void);
165 209
 int kz_callid_child_init(int rank);
166 210
 void kz_generate_callid(str* callid);
167 211
 
212
+kz_amqp_zone_ptr kz_amqp_get_primary_zone();
213
+kz_amqp_zone_ptr kz_amqp_get_zones();
214
+kz_amqp_zone_ptr kz_amqp_get_zone(char* zone);
215
+kz_amqp_zone_ptr kz_amqp_add_zone(char* zone);
216
+
217
+void kz_amqp_fire_connection_event(char *event, char* host);
218
+
168 219
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
169 220
 {
170 221
 	amqp_connection_close_t *mconn;
Browse code

kazoo : support alternative federated exchanges

Luis Azedo authored on 18/06/2015 22:10:53
Showing 1 changed files
... ...
@@ -112,6 +112,7 @@ typedef struct {
112 112
 	amqp_boolean_t auto_delete;
113 113
 	amqp_boolean_t no_ack;
114 114
 	amqp_boolean_t wait_for_consumer_ack;
115
+	amqp_boolean_t federate;
115 116
 } kz_amqp_bind, *kz_amqp_bind_ptr;
116 117
 
117 118
 typedef struct {
Browse code

kazoo : missing fixes

Luis Azedo authored on 11/06/2015 16:08:45
Showing 1 changed files
... ...
@@ -11,6 +11,7 @@
11 11
 #include <amqp.h>
12 12
 
13 13
 #include "../../sr_module.h"
14
+#include "../../str.h"
14 15
 
15 16
 #include "const.h"
16 17
 #include "defs.h"
... ...
@@ -158,6 +159,11 @@ int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param,	pv_value_t *
158 159
 int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
159 160
 int kz_pv_get_connection_host(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
160 161
 
162
+/* callid generator */
163
+int kz_callid_init(void);
164
+int kz_callid_child_init(int rank);
165
+void kz_generate_callid(str* callid);
166
+
161 167
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
162 168
 {
163 169
 	amqp_connection_close_t *mconn;
Browse code

kazoo - fix crashing on heavy load

Luis Azedo authored on 24/02/2015 09:33:53
Showing 1 changed files
... ...
@@ -25,10 +25,20 @@ extern str dbk_consumer_event_key;
25 25
 extern str dbk_consumer_event_subkey;
26 26
 extern int dbk_consumer_processes;
27 27
 
28
-
29
-typedef struct kz_amqp_conn_t {
28
+typedef struct kz_amqp_connection_t {
30 29
 	kz_amqp_connection_info info;
31 30
 	char* url;
31
+    struct kz_amqp_connection_t* next;
32
+} kz_amqp_connection, *kz_amqp_connection_ptr;
33
+
34
+typedef struct {
35
+	kz_amqp_connection_ptr current;
36
+	kz_amqp_connection_ptr head;
37
+	kz_amqp_connection_ptr tail;
38
+} kz_amqp_connection_pool, *kz_amqp_connection_pool_ptr;
39
+
40
+typedef struct kz_amqp_conn_t {
41
+	kz_amqp_connection_ptr info;
32 42
 	amqp_connection_state_t conn;
33 43
 	amqp_socket_t *socket;
34 44
 	amqp_channel_t channel_count;
... ...
@@ -110,6 +120,7 @@ typedef struct {
110 120
 	amqp_channel_t channel;
111 121
 	kz_amqp_channel_state state;
112 122
 	struct timeval timer;
123
+	gen_lock_t lock;
113 124
 } kz_amqp_channel, *kz_amqp_channel_ptr;
114 125
 
115 126
 typedef struct kz_amqp_binding_t {
... ...
@@ -122,7 +133,7 @@ typedef struct {
122 133
 	kz_amqp_binding_ptr tail;
123 134
 } kz_amqp_bindings, *kz_amqp_bindings_ptr;
124 135
 
125
-void kz_amqp_init();
136
+int kz_amqp_init();
126 137
 void kz_amqp_destroy();
127 138
 int kz_amqp_add_connection(modparam_t type, void* val);
128 139
 
Browse code

kazoo - fix timeouts

timeout should be ms not sec
set time before state so timeout check doesn't set timeout immediately
separate proc for handling timeouts

Luis Azedo authored on 23/02/2015 20:55:57
Showing 1 changed files
... ...
@@ -139,6 +139,10 @@ void kz_amqp_consumer_loop(int child_no);
139 139
 //void kz_amqp_generic_consumer_loop(int child_no);
140 140
 void kz_amqp_manager_loop(int child_no);
141 141
 
142
+void kz_amqp_consumer_proc(int child_no);
143
+void kz_amqp_publisher_proc(int child_no);
144
+void kz_amqp_timeout_proc(int child_no);
145
+
142 146
 int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
143 147
 int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
144 148
 int kz_pv_get_connection_host(struct sip_msg *msg, pv_param_t *param,	pv_value_t *res);
Browse code

kazoo : event key/subkey & documentation corrections

Luis Azedo authored on 12/09/2014 17:23:35
Showing 1 changed files
... ...
@@ -84,6 +84,8 @@ typedef struct {
84 84
 	char* payload;
85 85
 	uint64_t delivery_tag;
86 86
 	amqp_channel_t channel;
87
+	char* event_key;
88
+	char* event_subkey;
87 89
 } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
88 90
 
89 91
 typedef struct {
... ...
@@ -91,6 +93,8 @@ typedef struct {
91 93
 	amqp_bytes_t exchange_type;
92 94
 	amqp_bytes_t routing_key;
93 95
 	amqp_bytes_t queue;
96
+	amqp_bytes_t event_key;
97
+	amqp_bytes_t event_subkey;
94 98
 	amqp_boolean_t passive;
95 99
 	amqp_boolean_t durable;
96 100
 	amqp_boolean_t exclusive;
Browse code

kazoo : remove dead code

lazedo authored on 12/09/2014 15:23:20
Showing 1 changed files
... ...
@@ -11,7 +11,6 @@
11 11
 #include <amqp.h>
12 12
 
13 13
 #include "../../sr_module.h"
14
-#include "../tm/tm_load.h"
15 14
 
16 15
 #include "const.h"
17 16
 #include "defs.h"
Browse code

kazoo: remove dependency on tm module

Luis Azedo authored on 11/09/2014 16:32:56
Showing 1 changed files
... ...
@@ -22,7 +22,6 @@ typedef kz_amqp_connection_info *kz_amqp_connection_info_ptr;
22 22
 
23 23
 extern int dbk_channels;
24 24
 extern str dbk_node_hostname;
25
-extern struct tm_binds tmb;
26 25
 extern str dbk_consumer_event_key;
27 26
 extern str dbk_consumer_event_subkey;
28 27
 extern int dbk_consumer_processes;
Browse code

support vhost in connection

lazedo authored on 11/09/2014 13:47:33
Showing 1 changed files
... ...
@@ -30,11 +30,11 @@ extern int dbk_consumer_processes;
30 30
 
31 31
 typedef struct kz_amqp_conn_t {
32 32
 	kz_amqp_connection_info info;
33
+	char* url;
33 34
 	amqp_connection_state_t conn;
34 35
 	amqp_socket_t *socket;
35 36
 	amqp_channel_t channel_count;
36 37
 	amqp_channel_t channel_counter;
37
-//    gen_lock_t lock;
38 38
     struct kz_amqp_conn_t* next;
39 39
 } kz_amqp_conn, *kz_amqp_conn_ptr;
40 40
 
... ...
@@ -42,7 +42,6 @@ typedef struct {
42 42
 	kz_amqp_conn_ptr current;
43 43
 	kz_amqp_conn_ptr head;
44 44
 	kz_amqp_conn_ptr tail;
45
-//    gen_lock_t lock;
46 45
 } kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;
47 46
 
48 47
 
Browse code

kazoo initial commit

lazedo authored on 10/09/2014 10:04:15
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,189 @@
1
+/*
2
+ * kz_amqp.h
3
+ *
4
+ *  Created on: Jul 29, 2014
5
+ *      Author: root
6
+ */
7
+
8
+#ifndef KZ_AMQP_H_
9
+#define KZ_AMQP_H_
10
+
11
+#include <amqp.h>
12
+
13
+#include "../../sr_module.h"
14
+#include "../tm/tm_load.h"
15
+
16
+#include "const.h"
17
+#include "defs.h"
18
+#include "../../lib/kcore/faked_msg.h"
19
+
20
+typedef struct amqp_connection_info kz_amqp_connection_info;
21
+typedef kz_amqp_connection_info *kz_amqp_connection_info_ptr;
22
+
23
+extern int dbk_channels;
24
+extern str dbk_node_hostname;
25
+extern struct tm_binds tmb;
26
+extern str dbk_consumer_event_key;
27
+extern str dbk_consumer_event_subkey;
28
+extern int dbk_consumer_processes;
29
+
30
+
31
+typedef struct kz_amqp_conn_t {
32
+	kz_amqp_connection_info info;
33
+	amqp_connection_state_t conn;
34
+	amqp_socket_t *socket;
35
+	amqp_channel_t channel_count;
36
+	amqp_channel_t channel_counter;
37
+//    gen_lock_t lock;
38
+    struct kz_amqp_conn_t* next;
39
+} kz_amqp_conn, *kz_amqp_conn_ptr;
40
+
41
+typedef struct {
42
+	kz_amqp_conn_ptr current;
43
+	kz_amqp_conn_ptr head;
44
+	kz_amqp_conn_ptr tail;
45
+//    gen_lock_t lock;
46
+} kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;
47
+
48
+
49
+#define AMQP_KZ_CMD_PUBLISH       1
50
+#define AMQP_KZ_CMD_CALL          2
51
+#define AMQP_KZ_CMD_CONSUME       3
52
+
53
+typedef enum {
54
+	KZ_AMQP_PUBLISH     = 1,
55
+	KZ_AMQP_CALL    = 2,
56
+	KZ_AMQP_CONSUME = 3,
57
+	KZ_AMQP_ACK = 4
58
+} kz_amqp_pipe_cmd_type;
59
+
60
+typedef enum {
61
+	KZ_AMQP_CLOSED     = 0,
62
+	KZ_AMQP_FREE     = 1,
63
+	KZ_AMQP_PUBLISHING    = 2,
64
+	KZ_AMQP_BINDED = 3,
65
+	KZ_AMQP_CALLING    = 4,
66
+	KZ_AMQP_CONSUMING = 5
67
+} kz_amqp_channel_state;
68
+
69
+typedef struct {
70
+    gen_lock_t lock;
71
+	kz_amqp_pipe_cmd_type type;
72
+	char* exchange;
73
+	char* exchange_type;
74
+	char* routing_key;
75
+	char* reply_routing_key;
76
+	char* queue;
77
+	char* payload;
78
+	char* return_payload;
79
+	int   return_code;
80
+	int   consumer;
81
+	uint64_t delivery_tag;
82
+	amqp_channel_t channel;
83
+	struct timeval timeout;
84
+} kz_amqp_cmd, *kz_amqp_cmd_ptr;
85
+