Browse code

kazoo - fix crashing on heavy load

Luis Azedo authored on 24/02/2015 09:33:53
Showing 3 changed files
... ...
@@ -41,7 +41,7 @@
41 41
 #include "kz_trans.h"
42 42
 #include "kz_pua.h"
43 43
 
44
-#define DBK_DEFAULT_NO_CONSUMERS 4
44
+#define DBK_DEFAULT_NO_CONSUMERS 8
45 45
 
46 46
 static int mod_init(void);
47 47
 static int  mod_child_init(int rank);
... ...
@@ -232,8 +232,10 @@ static int mod_init(void) {
232 232
    		return -1;
233 233
    	}
234 234
 
235
-    kz_amqp_init();
236
-
235
+    if(!kz_amqp_init()) {
236
+   		return -1;
237
+    }
238
+    
237 239
     if(dbk_pua_mode == 1) {
238 240
 		kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0;
239 241
 		LM_DBG("db_url=%s/%d/%p\n", ZSW(kz_db_url.s), kz_db_url.len,kz_db_url.s);
... ...
@@ -19,9 +19,9 @@
19 19
 
20 20
 #define RET_AMQP_ERROR 2
21 21
 
22
-
23
-kz_amqp_conn_pool_ptr kz_pool = NULL;
22
+kz_amqp_connection_pool_ptr kz_pool = NULL;
24 23
 kz_amqp_bindings_ptr kz_bindings = NULL;
24
+int bindings_count = 0;
25 25
 
26 26
 static unsigned long rpl_query_routing_key_count = 0;
27 27
 
... ...
@@ -49,7 +49,6 @@ extern pv_spec_t kz_query_timeout_spec;
49 49
 const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
50 50
 const amqp_table_t kz_amqp_empty_table = { 0, NULL };
51 51
 
52
-
53 52
 static char *kz_amqp_str_dup(str *src)
54 53
 {
55 54
 	char *res;
... ...
@@ -153,7 +152,7 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
153 152
 	shm_free(bind);
154 153
 }
155 154
 
156
-void kz_amqp_free_connection(kz_amqp_conn_ptr conn)
155
+void kz_amqp_free_connection(kz_amqp_connection_ptr conn)
157 156
 {
158 157
 	if(!conn)
159 158
 		return;
... ...
@@ -278,12 +277,12 @@ kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queu
278 277
 
279 278
 void kz_amqp_init_connection_pool() {
280 279
 	if(kz_pool == NULL) {
281
-		kz_pool = (kz_amqp_conn_pool_ptr) shm_malloc(sizeof(kz_amqp_conn_pool));
282
-		memset(kz_pool, 0, sizeof(kz_amqp_conn_pool));
280
+		kz_pool = (kz_amqp_connection_pool_ptr) shm_malloc(sizeof(kz_amqp_connection_pool));
281
+		memset(kz_pool, 0, sizeof(kz_amqp_connection_pool));
283 282
 	}
284 283
 }
285 284
 
286
-void kz_amqp_init() {
285
+int kz_amqp_init() {
287 286
 	int i;
288 287
 	kz_amqp_init_connection_pool();
289 288
 	if(kz_bindings == NULL) {
... ...
@@ -294,9 +293,14 @@ void kz_amqp_init() {
294 293
 		channels = shm_malloc(dbk_channels * sizeof(kz_amqp_channel));
295 294
 		memset(channels, 0, dbk_channels * sizeof(kz_amqp_channel));
296 295
 		for(i=0; i < dbk_channels; i++) {
296
+			if(lock_init(&channels[i].lock)==NULL) {
297
+				LM_ERR("could not initialize locks for channels\n");
298
+				return 0;
299
+			}
297 300
 			channels[i].channel = i+1;
298 301
 		}
299 302
 	}
303
+	return 1;
300 304
 }
301 305
 
302 306
 void kz_amqp_destroy() {
... ...
@@ -325,9 +329,9 @@ void kz_amqp_destroy() {
325 329
 	}
326 330
 
327 331
 	if(kz_pool != NULL) {
328
-		kz_amqp_conn_ptr conn = kz_pool->head;
332
+		kz_amqp_connection_ptr conn = kz_pool->head;
329 333
 		while(conn != NULL) {
330
-			kz_amqp_conn_ptr tofree = conn;
334
+			kz_amqp_connection_ptr tofree = conn;
331 335
 			conn = conn->next;
332 336
 			kz_amqp_free_connection(tofree);
333 337
 		}
... ...
@@ -342,7 +346,7 @@ static char* KZ_URL_ROOT = "/";
342 346
 
343 347
 int kz_amqp_add_connection(modparam_t type, void* val)
344 348
 {
345
-	kz_amqp_init_connection_pool(); // find a better way
349
+	kz_amqp_init_connection_pool();
346 350
 
347 351
 	char* url = (char*) val;
348 352
 	int len = strlen(url);
... ...
@@ -351,8 +355,8 @@ int kz_amqp_add_connection(modparam_t type, void* val)
351 355
 		return -1;
352 356
 	}
353 357
 
354
-	kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn));
355
-	memset(newConn, 0, sizeof(kz_amqp_conn));
358
+	kz_amqp_connection_ptr newConn = shm_malloc(sizeof(kz_amqp_connection));
359
+	memset(newConn, 0, sizeof(kz_amqp_connection));
356 360
 
357 361
 	newConn->url = shm_malloc( (KZ_URL_MAX_SIZE + 1) * sizeof(char) );
358 362
 	memset(newConn->url, 0, (KZ_URL_MAX_SIZE + 1) * sizeof(char));
... ...
@@ -432,19 +436,19 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
432 436
     	goto error;
433 437
     }
434 438
 
435
-    if (amqp_socket_open(rmq->socket, rmq->info.host, rmq->info.port)) {
439
+    if (amqp_socket_open(rmq->socket, rmq->info->info.host, rmq->info->info.port)) {
436 440
     	LM_DBG("Failed to open TCP socket to AMQP broker\n");
437 441
     	goto error;
438 442
     }
439 443
 
440 444
     if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
441
-					   rmq->info.vhost,
445
+					   rmq->info->info.vhost,
442 446
 					   0,
443 447
 					   131072,
444 448
 					   0,
445 449
 					   AMQP_SASL_METHOD_PLAIN,
446
-					   rmq->info.user,
447
-					   rmq->info.password))) {
450
+					   rmq->info->info.user,
451
+					   rmq->info->info.password))) {
448 452
 
449 453
     	LM_ERR("Login to AMQP broker failed!\n");
450 454
     	goto error;
... ...
@@ -473,6 +477,9 @@ int kz_amqp_channel_open(kz_amqp_conn_ptr rmq, amqp_channel_t channel) {
473 477
 }
474 478
 
475 479
 kz_amqp_conn_ptr kz_amqp_get_connection() {
480
+	return NULL;
481
+
482
+	/*
476 483
 	kz_amqp_conn_ptr ptr = NULL;
477 484
 	if(kz_pool == NULL) {
478 485
 		return NULL;
... ...
@@ -499,9 +506,12 @@ kz_amqp_conn_ptr kz_amqp_get_connection() {
499 506
 //	lock_release(&kz_pool->lock);
500 507
 
501 508
    	return ptr;
509
+   	*/
502 510
 }
503 511
 
504 512
 kz_amqp_conn_ptr kz_amqp_get_next_connection() {
513
+	return NULL;
514
+	/*
505 515
 	kz_amqp_conn_ptr ptr = NULL;
506 516
 	if(kz_pool == NULL) {
507 517
 		return NULL;
... ...
@@ -525,8 +535,44 @@ kz_amqp_conn_ptr kz_amqp_get_next_connection() {
525 535
 
526 536
 
527 537
    	return ptr;
538
+   	*/
528 539
 }
529 540
 
541
+int kz_amqp_open_next_connection(kz_amqp_conn_ptr ptr) {
542
+	if(ptr == NULL) {
543
+		return -1;
544
+	}
545
+
546
+	if(kz_pool == NULL) {
547
+		return -2;
548
+	}
549
+
550
+	if(ptr->info == NULL) {
551
+		ptr->info = kz_pool->head;
552
+	} else {
553
+		ptr->info = ptr->info->next;
554
+		if(ptr->info == NULL) {
555
+			ptr->info = kz_pool->head;
556
+		}
557
+	}
558
+
559
+	while(ptr->conn == NULL) {
560
+		if(kz_amqp_connection_open(ptr) == 0) {
561
+			break;
562
+		}
563
+		ptr->info = ptr->info->next;
564
+		if(ptr->info == NULL) {
565
+			LM_INFO("all connections tried, restarting from head\n");
566
+			sleep(3);
567
+			ptr->info = kz_pool->head;
568
+		}
569
+	}
570
+
571
+
572
+   	return 0;
573
+}
574
+
575
+
530 576
 int kz_amqp_consume_error(amqp_connection_state_t conn)
531 577
 {
532 578
 	amqp_frame_t frame;
... ...
@@ -851,7 +897,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
851 897
 				if((pv_val.flags & PV_VAL_INT) && pv_val.ri != 0 ) {
852 898
 					kz_timeout.tv_usec = (pv_val.ri % 1000) * 1000;
853 899
 					kz_timeout.tv_sec = pv_val.ri / 1000;
854
-					LM_DBG("SET TIMEOUT TO %i\n", (int) kz_timeout.tv_sec);
900
+					LM_DBG("setting timeout to %i,%i\n", (int) kz_timeout.tv_sec, (int) kz_timeout.tv_usec);
855 901
 				}
856 902
 			}
857 903
 		}
... ...
@@ -951,6 +997,7 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange
951 997
 
952 998
 	kz_bindings->tail = binding;
953 999
 	binding->bind = bind;
1000
+	bindings_count++;
954 1001
 
955 1002
     return 1;
956 1003
 
... ...
@@ -1058,6 +1105,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1058 1105
 
1059 1106
 	kz_bindings->tail = binding;
1060 1107
 	binding->bind = bind;
1108
+	bindings_count++;
1061 1109
 
1062 1110
     if(json_obj != NULL)
1063 1111
        	json_object_put(json_obj);
... ...
@@ -1160,7 +1208,7 @@ int get_channel_index() {
1160 1208
 			return n;
1161 1209
 		}
1162 1210
 	if(channel_index == 0) {
1163
-		LM_ERR("max channels (%d) reached. please exit kazoo and change db_kazoo amqp_max_channels param", dbk_channels);
1211
+		LM_ERR("max channels (%d) reached. please exit kamailio and change kazoo amqp_max_channels param", dbk_channels);
1164 1212
 		return -1;
1165 1213
 	}
1166 1214
 	channel_index = 0;
... ...
@@ -1241,6 +1289,48 @@ int kz_amqp_bind_targeted_channels(kz_amqp_conn_ptr kz_conn , int loopcount)
1241 1289
 	return 0;
1242 1290
 }
1243 1291
 
1292
+int kz_amqp_bind_consumer_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan)
1293
+{
1294
+    int ret = -1;
1295
+
1296
+    amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table);
1297
+    if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
1298
+    {
1299
+		ret = -RET_AMQP_ERROR;
1300
+		goto error;
1301
+    }
1302
+
1303
+	amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
1304
+    if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
1305
+    {
1306
+		ret = -RET_AMQP_ERROR;
1307
+		goto error;
1308
+    }
1309
+
1310
+    LM_DBG("QUEUE BIND\n");
1311
+    if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
1312
+	    || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
1313
+    {
1314
+		ret = -RET_AMQP_ERROR;
1315
+		goto error;
1316
+    }
1317
+
1318
+    LM_DBG("BASIC CONSUME\n");
1319
+    if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
1320
+	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
1321
+    {
1322
+		ret = -RET_AMQP_ERROR;
1323
+		goto error;
1324
+    }
1325
+
1326
+    chan[idx].state = KZ_AMQP_CONSUMING;
1327
+	chan[idx].consumer = bind;
1328
+    ret = idx;
1329
+ error:
1330
+
1331
+    return ret;
1332
+}
1333
+
1244 1334
 
1245 1335
 int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind)
1246 1336
 {
... ...
@@ -1286,7 +1376,6 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind)
1286 1376
     return ret;
1287 1377
 }
1288 1378
 
1289
-
1290 1379
 int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_channel_state state, int idx)
1291 1380
 {
1292 1381
 	amqp_bytes_t exchange;
... ...
@@ -1314,7 +1403,7 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann
1314 1403
 
1315 1404
     json_obj = kz_json_parse(cmd->payload);
1316 1405
     if (json_obj == NULL)
1317
-	goto error;
1406
+    	goto error;
1318 1407
 
1319 1408
     if(kz_json_get_object(json_obj, BLF_JSON_SERVERID) == NULL) {
1320 1409
         json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)channels[idx].targeted->routing_key.bytes));
... ...
@@ -1322,11 +1411,16 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann
1322 1411
         payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj)));
1323 1412
     }
1324 1413
 
1325
-
1326
-	amqp_basic_publish(kz_conn->conn, channels[idx].channel, exchange, routing_key, 0, 0, &props, payload);
1414
+	int amqpres = amqp_basic_publish(kz_conn->conn, channels[idx].channel, exchange, routing_key, 0, 0, &props, payload);
1415
+	if ( amqpres != AMQP_STATUS_OK ) {
1416
+		LM_ERR("Failed to publish\n");
1417
+		ret = -1;
1418
+		goto error;
1419
+	}
1327 1420
 
1328 1421
 	if ( kz_amqp_error("Publishing",  amqp_get_rpc_reply(kz_conn->conn)) ) {
1329 1422
 		LM_ERR("Failed to publish\n");
1423
+		ret = -1;
1330 1424
 		goto error;
1331 1425
 	}
1332 1426
 	gettimeofday(&channels[idx].timer, NULL);
... ...
@@ -1458,6 +1552,7 @@ void kz_amqp_consumer_loop(int child_no)
1458 1552
 	LM_DBG("starting consumer %d\n", child_no);
1459 1553
 	close(kz_pipe_fds[child_no*2+1]);
1460 1554
 	int data_pipe = kz_pipe_fds[child_no*2];
1555
+//	int back_idx = (dbk_consumer_processes+1)*2+1;
1461 1556
 
1462 1557
 	fd_set fdset;
1463 1558
     int selret;
... ...
@@ -1477,6 +1572,17 @@ void kz_amqp_consumer_loop(int child_no)
1477 1572
 				if(read(data_pipe, &ptr, sizeof(ptr)) == sizeof(ptr)) {
1478 1573
 					LM_DBG("consumer %d received payload %s\n", child_no, ptr->payload);
1479 1574
 					kz_amqp_consumer_event(child_no, ptr->payload, ptr->event_key, ptr->event_subkey);
1575
+					/*
1576
+					if(ptr->channel > 0 && ptr->delivery_tag > 0) {
1577
+						kz_amqp_cmd_ptr cmd = kz_amqp_alloc_pipe_cmd();
1578
+						cmd->type = KZ_AMQP_ACK;
1579
+						cmd->channel = ptr->channel;
1580
+						cmd->delivery_tag = ptr->delivery_tag;
1581
+						if (write(kz_pipe_fds[back_idx], &cmd, sizeof(cmd)) != sizeof(cmd)) {
1582
+							LM_ERR("failed to send ack to AMQP Manager in process %d, write to command pipe: %s\n", getpid(), strerror(errno));
1583
+						}
1584
+					}
1585
+					*/
1480 1586
 					kz_amqp_free_consumer_delivery(ptr);
1481 1587
 				}
1482 1588
 			}
... ...
@@ -1577,7 +1683,7 @@ void kz_amqp_manager_loop(int child_no)
1577 1683
     		sleep(3);
1578 1684
     	}
1579 1685
 
1580
-    	kz_amqp_fire_connection_event("open", kzconn->info.host);
1686
+    	kz_amqp_fire_connection_event("open", kzconn->info->info.host);
1581 1687
 
1582 1688
     	loopcount++;
1583 1689
     	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
... ...
@@ -1670,7 +1776,7 @@ void kz_amqp_manager_loop(int child_no)
1670 1776
 								} else {
1671 1777
 									cmd->return_code = -1;
1672 1778
 									OK = INTERNAL_READ = CONSUME = 0;
1673
-									LM_ERR("ERROR SENDING PUBLISH\n");
1779
+									LM_ERR("ERROR SENDING PUBLISH");
1674 1780
 								}
1675 1781
 								channels[idx].state = KZ_AMQP_FREE;
1676 1782
 								channels[idx].cmd = NULL;
... ...
@@ -1683,8 +1789,10 @@ void kz_amqp_manager_loop(int child_no)
1683 1789
 									channels[idx].cmd = NULL;
1684 1790
 									cmd->return_code = -1;
1685 1791
 									lock_release(&cmd->lock);
1686
-									LM_ERR("ERROR SENDING QUERY\n");
1792
+									LM_ERR("ERROR SENDING QUERY");
1687 1793
 									OK = INTERNAL_READ = CONSUME = 0;
1794
+								} else {
1795
+									gettimeofday(&channels[idx].timer, NULL);
1688 1796
 								}
1689 1797
 								break;
1690 1798
 							default:
... ...
@@ -1782,17 +1890,18 @@ void kz_amqp_manager_loop(int child_no)
1782 1890
 						cmd->return_code = -1;
1783 1891
 						lock_release(&cmd->lock);
1784 1892
 						// rebind ??
1785
-						LM_ERR("QUERY TIMEOUT\n");
1893
+						LM_ERR("QUERY TIMEOUT");
1786 1894
 					}
1787 1895
 				}
1788 1896
 			}
1789 1897
 			firstLoop = 0;
1790 1898
     	}
1791 1899
     	kz_amqp_connection_close(kzconn);
1792
-    	kz_amqp_fire_connection_event("closed", kzconn->info.host);
1900
+    	kz_amqp_fire_connection_event("closed", kzconn->info->info.host);
1793 1901
     }
1794 1902
 }
1795 1903
 
1904
+
1796 1905
 /* check timeouts */
1797 1906
 void kz_amqp_timeout_proc(int child_no)
1798 1907
 {
... ...
@@ -1800,17 +1909,22 @@ void kz_amqp_timeout_proc(int child_no)
1800 1909
 	int i;
1801 1910
     while(1) {
1802 1911
 		struct timeval now;
1803
-		gettimeofday(&now, NULL);
1804 1912
 		for(i=0; i < dbk_channels; i++) {
1913
+			gettimeofday(&now, NULL);
1805 1914
 			if(channels[i].state == KZ_AMQP_CALLING
1806 1915
 					&& channels[i].cmd != NULL
1807 1916
 					&& check_timeout(&now, &channels[i].timer, &channels[i].cmd->timeout)) {
1808
-				cmd = channels[i].cmd;
1809
-				LM_DBG("Kazoo Query timeout - %s\n", cmd->payload);
1810
-				channels[i].state = KZ_AMQP_FREE;
1811
-				channels[i].cmd = NULL;
1812
-				cmd->return_code = -1;
1813
-				lock_release(&cmd->lock);
1917
+				lock_get(&channels[i].lock);
1918
+				if(channels[i].cmd != NULL)
1919
+				{
1920
+					cmd = channels[i].cmd;
1921
+					LM_DBG("Kazoo Query timeout - %s\n", cmd->payload);
1922
+					cmd->return_code = -1;
1923
+					lock_release(&cmd->lock);
1924
+					channels[i].cmd = NULL;
1925
+					channels[i].state = KZ_AMQP_FREE;
1926
+				}
1927
+				lock_release(&channels[i].lock);
1814 1928
 			}
1815 1929
 		}
1816 1930
 	}
... ...
@@ -1829,22 +1943,23 @@ void kz_amqp_publisher_proc(int child_no)
1829 1943
 	kz_amqp_cmd_ptr cmd;
1830 1944
 	int channel_res;
1831 1945
 
1946
+    kzconn = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
1947
+    memset(kzconn, 0, sizeof(kz_amqp_conn));
1832 1948
 
1833 1949
     while(1) {
1834 1950
     	OK = 1;
1835
-    	while(1) {
1836
-    		kzconn = kz_amqp_get_next_connection();
1837
-    		if(kzconn != NULL)
1838
-    			break;
1839
-    		LM_DBG("Connection failed : all servers down?\n");
1840
-    		sleep(3);
1841
-    	}
1842
-
1843
-    	kz_amqp_fire_connection_event("open", kzconn->info.host);
1951
+   		kz_amqp_open_next_connection(kzconn);
1952
+    	kz_amqp_fire_connection_event("open", kzconn->info->info.host);
1844 1953
 
1845 1954
     	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
1846 1955
 			/* start cleanup */
1847 1956
 			channels[i].state = KZ_AMQP_CLOSED;
1957
+			cmd = channels[i].cmd;
1958
+			if(cmd != NULL) {
1959
+				channels[i].cmd = NULL;
1960
+				cmd->return_code = -1;
1961
+				lock_release(&cmd->lock);
1962
+			}
1848 1963
 			/* end cleanup */
1849 1964
 
1850 1965
 			/* bind targeted channels */
... ...
@@ -1873,7 +1988,7 @@ void kz_amqp_publisher_proc(int child_no)
1873 1988
 							} else {
1874 1989
 								cmd->return_code = -1;
1875 1990
 								OK = 0;
1876
-								LM_ERR("ERROR SENDING PUBLISH\n");
1991
+								LM_ERR("ERROR SENDING PUBLISH");
1877 1992
 							}
1878 1993
 							channels[idx].state = KZ_AMQP_FREE;
1879 1994
 							channels[idx].cmd = NULL;
... ...
@@ -1886,7 +2001,7 @@ void kz_amqp_publisher_proc(int child_no)
1886 2001
 								channels[idx].cmd = NULL;
1887 2002
 								cmd->return_code = -1;
1888 2003
 								lock_release(&cmd->lock);
1889
-								LM_ERR("ERROR SENDING QUERY\n");
2004
+								LM_ERR("ERROR SENDING QUERY");
1890 2005
 								OK = 0;
1891 2006
 							}
1892 2007
 							break;
... ...
@@ -1898,7 +2013,7 @@ void kz_amqp_publisher_proc(int child_no)
1898 2013
         	}
1899 2014
     	}
1900 2015
     	kz_amqp_connection_close(kzconn);
1901
-    	kz_amqp_fire_connection_event("closed", kzconn->info.host);
2016
+    	kz_amqp_fire_connection_event("closed", kzconn->info->info.host);
1902 2017
     }
1903 2018
 }
1904 2019
 
... ...
@@ -1911,52 +2026,55 @@ void kz_amqp_consumer_proc(int child_no)
1911 2026
 	char* payload;
1912 2027
 	int channel_res;
1913 2028
     kz_amqp_conn_ptr kzconn;
1914
-	kz_amqp_cmd_ptr cmd;
2029
+    kz_amqp_channel_ptr consumer_channels = NULL;
2030
+
2031
+    kzconn = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn));
2032
+    memset(kzconn, 0, sizeof(kz_amqp_conn));
2033
+
2034
+    consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count);
2035
+	for(i=0; i < bindings_count; i++)
2036
+		consumer_channels[i].channel = dbk_channels + i + 1;
1915 2037
 
1916 2038
     while(1) {
1917 2039
     	OK = 1;
1918
-    	while(1) {
1919
-    		kzconn = kz_amqp_get_next_connection();
1920
-    		if(kzconn != NULL)
1921
-    			break;
1922
-    		LM_DBG("Connection failed : all servers down?\n");
1923
-    		sleep(3);
1924
-    	}
1925
-
1926
-    	kz_amqp_fire_connection_event("open", kzconn->info.host);
2040
+   		kz_amqp_open_next_connection(kzconn);
2041
+    	kz_amqp_fire_connection_event("open", kzconn->info->info.host);
1927 2042
 
1928 2043
     	/* reset channels */
1929 2044
 
1930 2045
     	for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) {
1931 2046
 			/* start cleanup */
1932
-			channels[i].state = KZ_AMQP_CLOSED;
1933 2047
 			channels[i].consumer = NULL;
1934 2048
 			if(channels[i].targeted != NULL) {
1935 2049
 				kz_amqp_free_bind(channels[i].targeted);
1936 2050
 				channels[i].targeted = NULL;
1937 2051
 			}
1938
-			cmd = channels[i].cmd;
1939
-			if(cmd != NULL) {
1940
-				channels[i].cmd = NULL;
1941
-				cmd->return_code = -1;
1942
-				lock_release(&cmd->lock);
1943
-			}
1944 2052
 			/* end cleanup */
1945 2053
 
1946 2054
 			/* bind targeted channels */
1947 2055
 			channel_res = kz_amqp_channel_open(kzconn, channels[i].channel);
1948 2056
 			if(channel_res == 0) {
1949 2057
 				kz_amqp_bind_targeted_channel(kzconn, 0, i);
1950
-				channels[i].state = KZ_AMQP_FREE;
1951 2058
 			}
1952 2059
     	}
1953
-		channel_index = 0;
2060
+
2061
+    	for(i=0,channel_res=0; i < bindings_count && channel_res == 0; i++) {
2062
+			/* start cleanup */
2063
+    		consumer_channels[i].consumer = NULL;
2064
+			/* end cleanup */
2065
+
2066
+			/* bind targeted channels */
2067
+			channel_res = kz_amqp_channel_open(kzconn, consumer_channels[i].channel);
2068
+    	}
2069
+
2070
+    	i = 0;
1954 2071
 		/* bind consumers */
1955 2072
 		if(kz_bindings != NULL) {
1956 2073
 			kz_amqp_binding_ptr binding = kz_bindings->head;
1957 2074
 			while(binding != NULL) {
1958
-				kz_amqp_bind_consumer(kzconn, binding->bind);
2075
+				kz_amqp_bind_consumer_ex(kzconn, binding->bind, i, consumer_channels);
1959 2076
 				binding = binding->next;
2077
+				i++;
1960 2078
 			}
1961 2079
 		}
1962 2080
 
... ...
@@ -1986,30 +2104,35 @@ void kz_amqp_consumer_proc(int child_no)
1986 2104
 
1987 2105
 			case AMQP_RESPONSE_NORMAL:
1988 2106
 				idx = envelope.channel-1;
1989
-				switch(channels[idx].state) {
1990
-				case KZ_AMQP_CALLING:
1991
-					channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body);
1992
-					channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL;
1993
-					lock_release(&channels[idx].cmd->lock);
1994
-					channels[idx].state = KZ_AMQP_FREE;
1995
-					channels[idx].cmd = NULL;
1996
-					break;
1997
-				case KZ_AMQP_CONSUMING:
2107
+				if(idx < dbk_channels) {
2108
+					switch(channels[idx].state) {
2109
+						case KZ_AMQP_CALLING:
2110
+							lock_get(&channels[idx].lock);
2111
+							if(channels[idx].cmd != NULL) {
2112
+								channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body);
2113
+								channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL;
2114
+								lock_release(&channels[idx].cmd->lock);
2115
+								channels[idx].cmd = NULL;
2116
+								channels[idx].state = KZ_AMQP_FREE;
2117
+							}
2118
+							lock_release(&channels[idx].lock);
2119
+							break;
2120
+						default:
2121
+							LM_DBG("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes);
2122
+							break;
2123
+					}
2124
+				} else {
2125
+					idx = idx - dbk_channels;
1998 2126
 					kz_amqp_send_consumer_event_ex(kz_amqp_bytes_dup(envelope.message.body),
1999
-							kz_amqp_bytes_dup(channels[idx].consumer->event_key),
2000
-							kz_amqp_bytes_dup(channels[idx].consumer->event_subkey),
2001
-							channels[idx].consumer->no_ack ? 0 : envelope.channel,
2002
-							channels[idx].consumer->no_ack ? 0 : envelope.delivery_tag,1);
2003
-					if(!channels[idx].consumer->no_ack ) {
2127
+						kz_amqp_bytes_dup(consumer_channels[idx].consumer->event_key),
2128
+						kz_amqp_bytes_dup(consumer_channels[idx].consumer->event_subkey),
2129
+						0, 0, 1);
2130
+					if(!consumer_channels[idx].consumer->no_ack ) {
2004 2131
 						if(amqp_basic_ack(kzconn->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) {
2005 2132
 							LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n");
2006 2133
 							OK = 0;
2007 2134
 						}
2008 2135
 					}
2009
-					break;
2010
-				default:
2011
-					LM_DBG("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes);
2012
-					break;
2013 2136
 				}
2014 2137
 				break;
2015 2138
 			case AMQP_RESPONSE_SERVER_EXCEPTION:
... ...
@@ -2026,8 +2149,7 @@ void kz_amqp_consumer_proc(int child_no)
2026 2149
 		}
2027 2150
 
2028 2151
     	kz_amqp_connection_close(kzconn);
2029
-    	kz_amqp_fire_connection_event("closed", kzconn->info.host);
2152
+    	kz_amqp_fire_connection_event("closed", kzconn->info->info.host);
2030 2153
 
2031 2154
     }
2032 2155
 }
2033
-
... ...
@@ -25,10 +25,20 @@ extern str dbk_consumer_event_key;
25 25
 extern str dbk_consumer_event_subkey;
26 26
 extern int dbk_consumer_processes;
27 27
 
28
-
29
-typedef struct kz_amqp_conn_t {
28
+typedef struct kz_amqp_connection_t {
30 29
 	kz_amqp_connection_info info;
31 30
 	char* url;
31
+    struct kz_amqp_connection_t* next;
32
+} kz_amqp_connection, *kz_amqp_connection_ptr;
33
+
34
+typedef struct {
35
+	kz_amqp_connection_ptr current;
36
+	kz_amqp_connection_ptr head;
37
+	kz_amqp_connection_ptr tail;
38
+} kz_amqp_connection_pool, *kz_amqp_connection_pool_ptr;
39
+
40
+typedef struct kz_amqp_conn_t {
41
+	kz_amqp_connection_ptr info;
32 42
 	amqp_connection_state_t conn;
33 43
 	amqp_socket_t *socket;
34 44
 	amqp_channel_t channel_count;
... ...
@@ -110,6 +120,7 @@ typedef struct {
110 120
 	amqp_channel_t channel;
111 121
 	kz_amqp_channel_state state;
112 122
 	struct timeval timer;
123
+	gen_lock_t lock;
113 124
 } kz_amqp_channel, *kz_amqp_channel_ptr;
114 125
 
115 126
 typedef struct kz_amqp_binding_t {
... ...
@@ -122,7 +133,7 @@ typedef struct {
122 133
 	kz_amqp_binding_ptr tail;
123 134
 } kz_amqp_bindings, *kz_amqp_bindings_ptr;
124 135
 
125
-void kz_amqp_init();
136
+int kz_amqp_init();
126 137
 void kz_amqp_destroy();
127 138
 int kz_amqp_add_connection(modparam_t type, void* val);
128 139