Browse code

kazoo: add exchange bindings

lazedo authored on 07/04/2017 15:06:32
Showing 4 changed files
... ...
@@ -83,16 +83,40 @@ kz_amqp_zones_ptr kz_zones = NULL;
83 83
 kz_amqp_zone_ptr kz_primary_zone = NULL;
84 84
 
85 85
 
86
-amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, 
87
-								amqp_bytes_t exchange, amqp_bytes_t type, 
88
-								amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments) {
86
+amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, amqp_table_t arguments)
87
+{
88
+	LM_DBG("declare exchange %.*s , %.*s\n",
89
+	        (int)exchange->name.len, (char*)exchange->name.bytes,
90
+	        (int)exchange->type.len, (char*)exchange->type.bytes);
91
+
89 92
 #if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 6
90
-	return amqp_exchange_declare(state, channel, exchange, type, passive, durable, arguments);
93
+	return amqp_exchange_declare(state, channel, exchange->name, exchange->type,
94
+	                             exchange->passive, exchange->durable,
95
+	                             arguments);
91 96
 #else
92
-	return amqp_exchange_declare(state, channel, exchange, type, passive, durable, 0, 0, arguments);
97
+	return amqp_exchange_declare(state, channel, exchange->name, exchange->type,
98
+	                             exchange->passive, exchange->durable,
99
+	                             exchange->auto_delete, exchange->internal,
100
+	                             arguments);
93 101
 #endif
94 102
 }
95 103
 
104
+amqp_queue_declare_ok_t * AMQP_CALL kz_amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_queue_ptr queue, amqp_table_t arguments)
105
+{
106
+	return amqp_queue_declare(state, channel, queue->name, queue->passive, queue->durable, queue->exclusive, queue->auto_delete, arguments);
107
+}
108
+
109
+amqp_queue_bind_ok_t * AMQP_CALL kz_amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, kz_amqp_queue_ptr queue, kz_amqp_routings_ptr queue_bindings, amqp_table_t arguments)
110
+{
111
+	amqp_queue_bind_ok_t *ret = amqp_queue_bind(state, channel, queue->name, exchange->name, queue_bindings->routing, arguments);
112
+
113
+   	if(ret >= 0 && queue_bindings->next) {
114
+   		return kz_amqp_queue_bind(state, channel, exchange, queue, queue_bindings->next, arguments);
115
+   	} else {
116
+   		return ret;
117
+   	}
118
+}
119
+
96 120
 int set_non_blocking(int fd)
97 121
 {
98 122
 	int flags;
... ...
@@ -214,6 +238,22 @@ char *kz_amqp_bytes_dup(amqp_bytes_t bytes)
214 238
 	return res;
215 239
 }
216 240
 
241
+static inline str* kz_str_from_amqp_bytes(amqp_bytes_t src)
242
+{
243
+	char *dst_char = (char*)shm_malloc(sizeof(str)+src.len+1);
244
+	if (!dst_char) {
245
+		LM_ERR("error allocating shared memory for str");
246
+		return NULL;
247
+	}
248
+	str* dst = (str*)dst_char;
249
+	dst->s = dst_char+sizeof(str);
250
+
251
+	memcpy(dst->s, src.bytes, src.len);
252
+	dst->len = src.len;
253
+	dst->s[dst->len] = '\0';
254
+	return dst;
255
+}
256
+
217 257
 char *kz_local_amqp_bytes_dup(amqp_bytes_t bytes)
218 258
 {
219 259
 	char *res;
... ...
@@ -321,6 +361,8 @@ void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr)
321 361
 		shm_free(ptr->event_subkey);
322 362
 	if(ptr->message_id)
323 363
 		shm_free(ptr->message_id);
364
+	if(ptr->routing_key)
365
+		shm_free(ptr->routing_key);
324 366
 	if(ptr->cmd)
325 367
 		kz_amqp_free_pipe_cmd(ptr->cmd);
326 368
 	shm_free(ptr);
... ...
@@ -330,14 +372,14 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
330 372
 {
331 373
 	if(bind == NULL)
332 374
 		return;
333
-	if(bind->exchange.bytes)
334
-		kz_amqp_bytes_free(bind->exchange);
335
-	if(bind->exchange_type.bytes)
336
-		kz_amqp_bytes_free(bind->exchange_type);
337
-	if(bind->queue.bytes)
338
-		kz_amqp_bytes_free(bind->queue);
339
-	if(bind->routing_key.bytes)
340
-		kz_amqp_bytes_free(bind->routing_key);
375
+	if(bind->exchange)
376
+		kz_amqp_exchange_free(bind->exchange);
377
+	if(bind->exchange_bindings)
378
+		kz_amqp_exchange_bindings_free(bind->exchange_bindings);
379
+	if(bind->queue)
380
+		kz_amqp_queue_free(bind->queue);
381
+	if(bind->queue_bindings)
382
+		kz_amqp_routing_free(bind->queue_bindings);
341 383
 	if(bind->event_key.bytes)
342 384
 		kz_amqp_bytes_free(bind->event_key);
343 385
 	if(bind->event_subkey.bytes)
... ...
@@ -373,48 +415,21 @@ kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd()
373 415
 	return cmd;
374 416
 }
375 417
 
376
-kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* queue, str* routing_key, str* event_key, str* event_subkey )
418
+kz_amqp_bind_ptr kz_amqp_bind_alloc(kz_amqp_exchange_ptr exchange, kz_amqp_exchange_binding_ptr exchange_bindings, kz_amqp_queue_ptr queue, kz_amqp_routings_ptr queue_bindings, str* event_key, str* event_subkey )
377 419
 {
378
-    kz_amqp_bind_ptr bind = NULL;
420
+	kz_amqp_bind_ptr bind = NULL;
379 421
 
380
-    bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind));
422
+	bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind));
381 423
 	if(bind == NULL) {
382 424
 		LM_ERR("error allocation memory for bind alloc\n");
383 425
 		goto error;
384 426
 	}
385 427
 	memset(bind, 0, sizeof(kz_amqp_bind));
386 428
 
387
-	if(exchange != NULL) {
388
-		bind->exchange = kz_amqp_bytes_dup_from_str(exchange);
389
-	    if (bind->exchange.bytes == NULL) {
390
-			LM_ERR("Out of memory allocating for exchange\n");
391
-			goto error;
392
-	    }
393
-	}
394
-
395
-	if(exchange_type != NULL) {
396
-		bind->exchange_type = kz_amqp_bytes_dup_from_str(exchange_type);
397
-	    if (bind->exchange_type.bytes == NULL) {
398
-			LM_ERR("Out of memory allocating for exchange type\n");
399
-			goto error;
400
-	    }
401
-	}
402
-
403
-	if(queue != NULL) {
404
-		bind->queue = kz_amqp_bytes_dup_from_str(queue);
405
-	    if (bind->queue.bytes == NULL) {
406
-			LM_ERR("Out of memory allocating for queue\n");
407
-			goto error;
408
-	    }
409
-	}
410
-
411
-	if(routing_key != NULL) {
412
-		bind->routing_key = kz_amqp_bytes_dup_from_str(routing_key);
413
-	    if (bind->routing_key.bytes == NULL) {
414
-			LM_ERR("Out of memory allocating for routing key\n");
415
-			goto error;
416
-	    }
417
-	}
429
+	bind->exchange = exchange;
430
+	bind->queue = queue;
431
+	bind->exchange_bindings = exchange_bindings;
432
+	bind->queue_bindings = queue_bindings;
418 433
 
419 434
 	if(event_key != NULL) {
420 435
 		bind->event_key = kz_amqp_bytes_dup_from_str(event_key);
... ...
@@ -439,11 +454,6 @@ error:
439 454
     return NULL;
440 455
 }
441 456
 
442
-kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key )
443
-{
444
-	return kz_amqp_bind_alloc_ex(exchange, exchange_type, queue, routing_key, NULL, NULL );
445
-}
446
-
447 457
 kz_amqp_zone_ptr kz_amqp_get_primary_zone() {
448 458
 	if(kz_primary_zone == NULL) {
449 459
 		kz_primary_zone = (kz_amqp_zone_ptr) shm_malloc(sizeof(kz_amqp_zone));
... ...
@@ -486,6 +496,12 @@ kz_amqp_zone_ptr kz_amqp_add_zone(char* zone) {
486 496
 	return zone_ptr;
487 497
 }
488 498
 
499
+kz_amqp_queue_ptr kz_amqp_targeted_queue(char *name)
500
+{
501
+	str queue = str_init(name);
502
+	return kz_amqp_queue_new(&queue);
503
+}
504
+
489 505
 int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx )
490 506
 {
491 507
     kz_amqp_bind_ptr bind = NULL;
... ...
@@ -501,16 +517,23 @@ int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx )
501 517
 	}
502 518
 	memset(bind, 0, sizeof(kz_amqp_bind));
503 519
 
504
-	bind->exchange = kz_amqp_bytes_dup_from_str(&rpl_exch);
505
-	bind->exchange_type = kz_amqp_bytes_dup_from_str(&rpl_exch_type);
520
+	bind->exchange = kz_amqp_exchange_new(&rpl_exch, &rpl_exch_type);
521
+	if(bind->exchange == NULL) {
522
+		LM_ERR("error allocation memory for reply\n");
523
+		goto error;
524
+	}
506 525
 
507 526
     sprintf(serverid, "kamailio@%.*s-<%d-%d>", dbk_node_hostname.len, dbk_node_hostname.s, server->id, idx);
508
-    bind->queue = kz_amqp_bytes_dup_from_string(serverid);
527
+    bind->queue = kz_amqp_targeted_queue(serverid);
528
+	if(bind->queue == NULL) {
529
+		LM_ERR("error allocation memory for reply\n");
530
+		goto error;
531
+	}
509 532
 
510 533
     sprintf(serverid, "kamailio@%.*s-<%d>-targeted-%d", dbk_node_hostname.len, dbk_node_hostname.s, server->id, idx);
511
-    bind->routing_key = kz_amqp_bytes_dup_from_string(serverid);
534
+    bind->queue_bindings = kz_amqp_routing_new(serverid);
512 535
 
513
-    if (bind->exchange.bytes == NULL || bind->routing_key.bytes == NULL || bind->queue.bytes == NULL) {
536
+    if (bind->queue_bindings == NULL) {
514 537
 		LM_ERR("Out of memory allocating for exchange/routing_key\n");
515 538
 		goto error;
516 539
     }
... ...
@@ -1472,41 +1495,363 @@ int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char*
1472 1495
 		return 1;
1473 1496
 };
1474 1497
 
1475
-int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key)
1498
+void kz_amqp_queue_free(kz_amqp_queue_ptr queue)
1476 1499
 {
1477
-	str exchange_s;
1478
-	str exchange_type_s;
1479
-	str queue_s;
1480
-	str routing_key_s;
1500
+	if(queue->name.bytes)
1501
+		kz_amqp_bytes_free(queue->name);
1481 1502
 
1482
-	if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) {
1483
-		LM_ERR("cannot get exchange string value\n");
1484
-		return -1;
1503
+	shm_free(queue);
1504
+}
1505
+
1506
+kz_amqp_queue_ptr kz_amqp_queue_new(str *name)
1507
+{
1508
+	kz_amqp_queue_ptr queue = (kz_amqp_queue_ptr) shm_malloc(sizeof(kz_amqp_queue));
1509
+	if(queue == NULL) {
1510
+		LM_ERR("NO MORE SHARED MEMORY!");
1511
+		return NULL;
1485 1512
 	}
1513
+	memset(queue, 0, sizeof(kz_amqp_queue));
1514
+	queue->auto_delete = 1;
1486 1515
 
1487
-	if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) {
1488
-		LM_ERR("cannot get exchange type string value\n");
1489
-		return -1;
1516
+    if(name != NULL) {
1517
+    	queue->name = kz_amqp_bytes_dup_from_str(name);
1518
+		if (queue->name.bytes == NULL) {
1519
+			LM_ERR("Out of memory allocating for exchange\n");
1520
+			goto error;
1521
+		}
1522
+    }
1523
+
1524
+    return queue;
1525
+
1526
+error:
1527
+    	kz_amqp_queue_free(queue);
1528
+    	return NULL;
1529
+}
1530
+
1531
+kz_amqp_queue_ptr kz_amqp_queue_from_json(str *name, json_object* json_obj)
1532
+{
1533
+	json_object* tmpObj = NULL;
1534
+	kz_amqp_queue_ptr queue = kz_amqp_queue_new(name);
1535
+
1536
+	if(queue == NULL) {
1537
+		LM_ERR("NO MORE SHARED MEMORY!");
1538
+		return NULL;
1490 1539
 	}
1491 1540
 
1492
-	if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) {
1493
-		LM_ERR("cannot get queue string value\n");
1541
+	tmpObj = kz_json_get_object(json_obj, "passive");
1542
+	if(tmpObj != NULL) {
1543
+		queue->passive = json_object_get_int(tmpObj);
1544
+	}
1545
+
1546
+	tmpObj = kz_json_get_object(json_obj, "durable");
1547
+	if(tmpObj != NULL) {
1548
+		queue->durable = json_object_get_int(tmpObj);
1549
+	}
1550
+
1551
+ 	tmpObj = kz_json_get_object(json_obj, "exclusive");
1552
+	if(tmpObj != NULL) {
1553
+		queue->exclusive = json_object_get_int(tmpObj);
1554
+	}
1555
+
1556
+	tmpObj = kz_json_get_object(json_obj, "auto_delete");
1557
+	if(tmpObj != NULL) {
1558
+		queue->auto_delete = json_object_get_int(tmpObj);
1559
+	}
1560
+
1561
+	return queue;
1562
+
1563
+}
1564
+
1565
+void kz_amqp_exchange_free(kz_amqp_exchange_ptr exchange)
1566
+{
1567
+	if(exchange->name.bytes)
1568
+		kz_amqp_bytes_free(exchange->name);
1569
+
1570
+	if(exchange->type.bytes)
1571
+		kz_amqp_bytes_free(exchange->type);
1572
+
1573
+	shm_free(exchange);
1574
+}
1575
+
1576
+kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type)
1577
+{
1578
+	kz_amqp_exchange_ptr exchange = (kz_amqp_exchange_ptr) shm_malloc(sizeof(kz_amqp_exchange));
1579
+	if(exchange == NULL) {
1580
+		LM_ERR("NO MORE SHARED MEMORY!");
1581
+		return NULL;
1582
+	}
1583
+	memset(exchange, 0, sizeof(kz_amqp_exchange));
1584
+	exchange->name = kz_amqp_bytes_dup_from_str(name);
1585
+	if (exchange->name.bytes == NULL) {
1586
+		LM_ERR("Out of memory allocating for exchange\n");
1587
+		goto error;
1588
+	}
1589
+
1590
+	exchange->type = kz_amqp_bytes_dup_from_str(type);
1591
+	if (exchange->type.bytes == NULL) {
1592
+		LM_ERR("Out of memory allocating for exchange type\n");
1593
+		goto error;
1594
+	}
1595
+
1596
+	LM_DBG("NEW exchange %.*s : %.*s : %.*s : %.*s\n",
1597
+	        (int)name->len, (char*)name->s,
1598
+	        (int)type->len, (char*)type->s,
1599
+		(int)exchange->name.len, (char*)exchange->name.bytes,
1600
+		(int)exchange->type.len, (char*)exchange->type.bytes);
1601
+
1602
+	return exchange;
1603
+
1604
+error:
1605
+
1606
+	kz_amqp_exchange_free(exchange);
1607
+	return NULL;
1608
+}
1609
+
1610
+kz_amqp_exchange_ptr kz_amqp_exchange_from_json(str *name, json_object* json_obj)
1611
+{
1612
+	str type = STR_NULL;
1613
+	kz_amqp_exchange_ptr exchange = NULL;
1614
+	json_object* tmpObj = NULL;
1615
+
1616
+	json_extract_field("type", type);
1617
+
1618
+	LM_DBG("NEW JSON exchange %.*s : %.*s\n",
1619
+	        (int)name->len, (char*)name->s,
1620
+	        (int)type.len, (char*)type.s);
1621
+
1622
+
1623
+	exchange = kz_amqp_exchange_new(name, &type);
1624
+	if(exchange == NULL) {
1625
+		LM_ERR("NO MORE SHARED MEMORY!");
1626
+		return NULL;
1627
+	}
1628
+
1629
+	tmpObj = kz_json_get_object(json_obj, "passive");
1630
+	if(tmpObj != NULL) {
1631
+		exchange->passive = json_object_get_int(tmpObj);
1632
+	}
1633
+
1634
+	tmpObj = kz_json_get_object(json_obj, "durable");
1635
+	if(tmpObj != NULL) {
1636
+		exchange->durable = json_object_get_int(tmpObj);
1637
+	}
1638
+
1639
+	tmpObj = kz_json_get_object(json_obj, "auto_delete");
1640
+	if(tmpObj != NULL) {
1641
+		exchange->auto_delete = json_object_get_int(tmpObj);
1642
+	}
1643
+
1644
+	tmpObj = kz_json_get_object(json_obj, "internal");
1645
+	if(tmpObj != NULL) {
1646
+		exchange->internal = json_object_get_int(tmpObj);
1647
+	}
1648
+
1649
+	return exchange;
1650
+
1651
+}
1652
+
1653
+void kz_amqp_routing_free(kz_amqp_routings_ptr routing)
1654
+{
1655
+	if(routing) {
1656
+		if(routing->next)
1657
+			kz_amqp_routing_free(routing->next);
1658
+		shm_free(routing);
1659
+	}
1660
+}
1661
+
1662
+kz_amqp_routings_ptr kz_amqp_routing_new(char* routing)
1663
+{
1664
+	kz_amqp_routings_ptr ptr = (kz_amqp_routings_ptr) shm_malloc(sizeof(kz_amqp_routings));
1665
+	memset(ptr, 0, sizeof(kz_amqp_routings));
1666
+
1667
+	ptr->routing = kz_amqp_bytes_dup_from_string(routing);
1668
+	return ptr;
1669
+}
1670
+
1671
+kz_amqp_routings_ptr kz_amqp_routing_from_json(json_object* json_obj)
1672
+{
1673
+	kz_amqp_routings_ptr r, r1 = NULL, ret = NULL;
1674
+	char *routing;
1675
+	int len, n;
1676
+
1677
+	if(json_obj == NULL)
1678
+    	return ret;
1679
+
1680
+   	switch(kz_json_get_type(json_obj))
1681
+   	{
1682
+   	case json_type_string:
1683
+   		routing = (char*)json_object_get_string(json_obj);
1684
+		ret = kz_amqp_routing_new(routing);
1685
+		break;
1686
+
1687
+   	case json_type_array:
1688
+		len = json_object_array_length(json_obj);
1689
+		for(n=0; n < len; n++) {
1690
+			routing = (char*)json_object_get_string(json_object_array_get_idx(json_obj, n));
1691
+			r = kz_amqp_routing_new(routing);
1692
+			if(r1 != NULL) {
1693
+				r1->next = r;
1694
+			} else {
1695
+				ret = r;
1696
+			}
1697
+			r1 = r;
1698
+		}
1699
+		break;
1700
+
1701
+   	default:
1702
+   		LM_DBG("type not handled in routing");
1703
+   		break;
1704
+   	}
1705
+   	return ret;
1706
+}
1707
+
1708
+void kz_amqp_exchange_bindings_free(kz_amqp_exchange_binding_ptr binding)
1709
+{
1710
+	if(binding) {
1711
+		if(binding->next)
1712
+			kz_amqp_exchange_bindings_free(binding->next);
1713
+		kz_amqp_exchange_free(binding->from_exchange);
1714
+		kz_amqp_routing_free(binding->routing);
1715
+		shm_free(binding);
1716
+	}
1717
+
1718
+}
1719
+
1720
+kz_amqp_exchange_binding_ptr kz_amqp_exchange_binding_from_json(json_object* JObj)
1721
+{
1722
+//	struct json_object* tmpObj = NULL;
1723
+	struct json_object* routingObj = NULL;
1724
+	kz_amqp_exchange_ptr exchange;
1725
+	kz_amqp_exchange_binding_ptr prv = NULL;
1726
+	kz_amqp_exchange_binding_ptr ret = NULL;
1727
+	if(JObj != NULL) {
1728
+		json_foreach(JObj, k, v) {
1729
+			str name = {k, strlen(k)};
1730
+			LM_DBG("exchange binding1 %s, %i , %s,  %i : %.*s\n", k, (int) strlen(k), name.s, name.len, name.len, name.s);
1731
+			exchange = kz_amqp_exchange_from_json(&name, v);
1732
+			LM_DBG("exchange binding2 %s, %i : %.*s\n", k, (int) strlen(k), name.len, name.s);
1733
+			LM_DBG("exchange binding3 %.*s : %.*s\n",
1734
+					        (int)exchange->name.len, (char*)exchange->name.bytes,
1735
+					        (int)exchange->type.len, (char*)exchange->type.bytes);
1736
+
1737
+		    routingObj = kz_json_get_object(v, "routing");
1738
+		    if(routingObj != NULL) {
1739
+		    	kz_amqp_exchange_binding_ptr binding = (kz_amqp_exchange_binding_ptr) shm_malloc(sizeof(kz_amqp_exchange_binding));
1740
+		    	memset(binding, 0, sizeof(kz_amqp_exchange_binding));
1741
+		    	binding->from_exchange = exchange;
1742
+		    	binding->routing = kz_amqp_routing_from_json(routingObj);
1743
+		    	if(binding->routing == NULL) {
1744
+		    		LM_DBG("invalid routing");
1745
+		    		kz_amqp_exchange_bindings_free(binding);
1746
+		    		binding = NULL;
1747
+		    	} else {
1748
+			    	if(ret == NULL)
1749
+			    		ret = binding;
1750
+			    	if(prv != NULL)
1751
+			    		prv->next = binding;
1752
+		    	}
1753
+		    } else {
1754
+		    	kz_amqp_exchange_free(exchange);
1755
+		    }
1756
+		}
1757
+	}
1758
+
1759
+	return ret;
1760
+}
1761
+
1762
+int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1763
+{
1764
+	str exchange_s = STR_NULL;
1765
+	str queue_s = STR_NULL;
1766
+	str payload_s = STR_NULL;
1767
+	str key_s = STR_NULL;
1768
+	str subkey_s = STR_NULL;
1769
+	int no_ack = 1;
1770
+	int federate = 0;
1771
+	int consistent_worker = 0;
1772
+	int wait_for_consumer_ack = 1;
1773
+	kz_amqp_queue_ptr queue = NULL;
1774
+	kz_amqp_exchange_ptr exchange = NULL;
1775
+	kz_amqp_exchange_binding_ptr exchange_binding = NULL;
1776
+	kz_amqp_routings_ptr routing = NULL;
1777
+	str* event_key = NULL;
1778
+    str* event_subkey = NULL;
1779
+
1780
+
1781
+
1782
+	json_obj_ptr json_obj = NULL;
1783
+	struct json_object* tmpObj = NULL;
1784
+
1785
+	if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) {
1786
+		LM_ERR("cannot get payload value\n");
1494 1787
 		return -1;
1495 1788
 	}
1496 1789
 
1497
-	if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) {
1498
-		LM_ERR("cannot get routing_key string value\n");
1790
+	json_obj = kz_json_parse(payload_s.s);
1791
+	if (json_obj == NULL)
1499 1792
 		return -1;
1793
+    
1794
+
1795
+	json_extract_field("exchange", exchange_s);
1796
+	json_extract_field("queue", queue_s);
1797
+	json_extract_field("event_key", key_s);
1798
+	json_extract_field("event_subkey", subkey_s);
1799
+
1800
+	if(key_s.len != 0)
1801
+		event_key = &key_s;
1802
+
1803
+	if(subkey_s.len != 0)
1804
+		event_subkey = &subkey_s;
1805
+
1806
+	tmpObj = kz_json_get_object(json_obj, "no_ack");
1807
+	if(tmpObj != NULL) {
1808
+		no_ack = json_object_get_int(tmpObj);
1809
+	}
1810
+
1811
+	tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack");
1812
+	if(tmpObj != NULL) {
1813
+		wait_for_consumer_ack = json_object_get_int(tmpObj);
1814
+	}
1815
+
1816
+	tmpObj = kz_json_get_object(json_obj, "federate");
1817
+	if(tmpObj != NULL) {
1818
+		federate = json_object_get_int(tmpObj);
1819
+	}
1820
+
1821
+	tmpObj = kz_json_get_object(json_obj, "consistent-worker");
1822
+	if(tmpObj != NULL) {
1823
+		consistent_worker = json_object_get_int(tmpObj);
1824
+	}
1825
+
1826
+	tmpObj = kz_json_get_object(json_obj, "exchange-bindings");
1827
+	if(tmpObj != NULL) {
1828
+		exchange_binding = kz_amqp_exchange_binding_from_json(tmpObj);
1500 1829
 	}
1501 1830
 
1502
-	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
1831
+	tmpObj = kz_json_get_object(json_obj, "routing");
1832
+	if(tmpObj != NULL) {
1833
+		routing = kz_amqp_routing_from_json(tmpObj);
1834
+	}
1835
+
1836
+	if(routing == NULL) {
1837
+		LM_ERR("invalid routing\n");
1838
+		goto error;
1839
+	}
1840
+
1841
+	exchange = kz_amqp_exchange_from_json(&exchange_s, json_obj);
1842
+	queue = kz_amqp_queue_from_json(&queue_s, json_obj);
1843
+
1844
+	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(exchange, exchange_binding, queue, routing, event_key, event_subkey);
1503 1845
 	if(bind == NULL) {
1504 1846
 		LM_ERR("Could not allocate bind struct\n");
1505 1847
 		goto error;
1506 1848
 	}
1507 1849
 
1508
-	bind->auto_delete = 1;
1509
-	bind->no_ack = 1;
1850
+	bind->no_ack = no_ack;
1851
+	bind->wait_for_consumer_ack = wait_for_consumer_ack;
1852
+	bind->federate = federate;
1853
+	bind->consistent_worker = consistent_worker;
1854
+
1510 1855
 
1511 1856
 	kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
1512 1857
 	if(binding == NULL) {
... ...
@@ -1525,109 +1870,64 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange
1525 1870
 	binding->bind = bind;
1526 1871
 	bindings_count++;
1527 1872
 
1528
-    return 1;
1873
+	if(json_obj != NULL)
1874
+		json_object_put(json_obj);
1875
+
1876
+	return 1;
1529 1877
 
1530 1878
 error:
1531
-    if(binding != NULL)
1532
-    	shm_free(binding);
1879
+	if(binding != NULL)
1880
+		shm_free(binding);
1881
+
1882
+	if(json_obj != NULL)
1883
+		json_object_put(json_obj);
1533 1884
 
1534 1885
 	return -1;
1535 1886
 
1536 1887
 }
1537 1888
 
1538
-int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1889
+int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key)
1539 1890
 {
1540 1891
 	str exchange_s;
1541 1892
 	str exchange_type_s;
1542 1893
 	str queue_s;
1543 1894
 	str routing_key_s;
1544
-	str payload_s;
1545
-	str key_s;
1546
-	str subkey_s;
1547
-	int passive = 0;
1548
-	int durable = 0;
1549
-	int exclusive = 0;
1550
-	int auto_delete = 1;
1551
-	int no_ack = 1;
1552
-	int federate = 0;
1553
-	int wait_for_consumer_ack = 1;
1554
-    int consistent_worker = 0;
1895
+	kz_amqp_exchange_ptr exchange_ptr = NULL;
1896
+	kz_amqp_queue_ptr queue_ptr = NULL;
1897
+	kz_amqp_routings_ptr routing_ptr = NULL;
1555 1898
 
1556
-    json_obj_ptr json_obj = NULL;
1557
-	struct json_object* tmpObj = NULL;
1558 1899
 
1559
-	if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) {
1560
-		LM_ERR("cannot get payload value\n");
1900
+	if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) {
1901
+		LM_ERR("cannot get exchange string value\n");
1561 1902
 		return -1;
1562 1903
 	}
1563 1904
 
1564
-    json_obj = kz_json_parse(payload_s.s);
1565
-    if (json_obj == NULL)
1566
-	return -1;
1567
-    
1568
-
1569
-    json_extract_field("exchange", exchange_s);
1570
-    json_extract_field("type", exchange_type_s);
1571
-    json_extract_field("queue", queue_s);
1572
-    json_extract_field("routing", routing_key_s);
1573
-    json_extract_field("event_key", key_s);
1574
-    json_extract_field("event_subkey", subkey_s);
1575
-
1576
-    tmpObj = kz_json_get_object(json_obj, "passive");
1577
-    if(tmpObj != NULL) {
1578
-    	passive = json_object_get_int(tmpObj);
1579
-    }
1580
-
1581
-    tmpObj = kz_json_get_object(json_obj, "durable");
1582
-    if(tmpObj != NULL) {
1583
-    	durable = json_object_get_int(tmpObj);
1584
-    }
1585
-
1586
-    tmpObj = kz_json_get_object(json_obj, "exclusive");
1587
-    if(tmpObj != NULL) {
1588
-    	exclusive = json_object_get_int(tmpObj);
1589
-    }
1590
-
1591
-    tmpObj = kz_json_get_object(json_obj, "auto_delete");
1592
-    if(tmpObj != NULL) {
1593
-    	auto_delete = json_object_get_int(tmpObj);
1594
-    }
1905
+	if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) {
1906
+		LM_ERR("cannot get exchange type string value\n");
1907
+		return -1;
1908
+	}
1595 1909
 
1596
-    tmpObj = kz_json_get_object(json_obj, "no_ack");
1597
-    if(tmpObj != NULL) {
1598
-    	no_ack = json_object_get_int(tmpObj);
1599
-    }
1910
+	if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) {
1911
+		LM_ERR("cannot get queue string value\n");
1912
+		return -1;
1913
+	}
1600 1914
 
1601
-    tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack");
1602
-    if(tmpObj != NULL) {
1603
-    	wait_for_consumer_ack = json_object_get_int(tmpObj);
1604
-    }
1915
+	if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) {
1916
+		LM_ERR("cannot get routing_key string value\n");
1917
+		return -1;
1918
+	}
1605 1919
 
1606
-    tmpObj = kz_json_get_object(json_obj, "federate");
1607
-    if(tmpObj != NULL) {
1608
-    	federate = json_object_get_int(tmpObj);
1609
-    }
1920
+	exchange_ptr = kz_amqp_exchange_new(&exchange_s, &exchange_type_s);
1921
+	queue_ptr = kz_amqp_queue_new(&queue_s);
1922
+	routing_ptr = kz_amqp_routing_new(routing_key_s.s);
1610 1923
 
1611
-    tmpObj = kz_json_get_object(json_obj, "consistent-worker");
1612
-    if(tmpObj != NULL) {
1613
-        consistent_worker = json_object_get_int(tmpObj);
1614
-    }
1615
-    
1616
-	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s);
1924
+	kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(exchange_ptr, NULL, queue_ptr, routing_ptr, NULL, NULL);
1617 1925
 	if(bind == NULL) {
1618 1926
 		LM_ERR("Could not allocate bind struct\n");
1619 1927
 		goto error;
1620 1928
 	}
1621 1929
 
1622
-	bind->durable = durable;
1623
-	bind->passive = passive;
1624
-	bind->exclusive = exclusive;
1625
-	bind->auto_delete = auto_delete;
1626
-	bind->no_ack = no_ack;
1627
-	bind->wait_for_consumer_ack = wait_for_consumer_ack;
1628
-	bind->federate = federate;
1629
-    bind->consistent_worker = consistent_worker;
1630
-
1930
+	bind->no_ack = 1;
1631 1931
 
1632 1932
 	kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding));
1633 1933
 	if(binding == NULL) {
... ...
@@ -1646,24 +1946,18 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload)
1646 1946
 	binding->bind = bind;
1647 1947
 	bindings_count++;
1648 1948
 
1649
-    if(json_obj != NULL)
1650
-       	json_object_put(json_obj);
1651
-
1652 1949
     return 1;
1653 1950
 
1654 1951
 error:
1655 1952
     if(binding != NULL)
1656 1953
     	shm_free(binding);
1657 1954
 
1658
-    if(json_obj != NULL){
1659
-       	json_object_put(json_obj);
1660
-    }
1661
-
1662 1955
 	return -1;
1663 1956
 
1664 1957
 }
1665 1958
 
1666 1959
 
1960
+
1667 1961
 #define KEY_SAFE(C)  ((C >= 'a' && C <= 'z') || \
1668 1962
                       (C >= 'A' && C <= 'Z') || \
1669 1963
                       (C >= '0' && C <= '9') || \
... ...
@@ -1760,27 +2054,27 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
1760 2054
     kz_amqp_bind_ptr bind = kz_conn->server->channels[idx].targeted;
1761 2055
     int ret = -1;
1762 2056
 
1763
-	kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
1764
-    if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
2057
+	kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, kz_amqp_empty_table);
2058
+    if (kz_amqp_error("Declaring targeted exchange", amqp_get_rpc_reply(kz_conn->conn)))
1765 2059
     {
1766 2060
 		ret = -RET_AMQP_ERROR;
1767 2061
 		goto error;
1768 2062
     }
1769 2063
 
1770
-    amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, 0, 0, 0, 1, kz_amqp_empty_table);
1771
-    if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
2064
+    kz_amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_table);
2065
+    if (kz_amqp_error("Declaring targeted queue", amqp_get_rpc_reply(kz_conn->conn)))
1772 2066
     {
1773 2067
 		goto error;
1774 2068
     }
1775 2069
 
1776
-    if (amqp_queue_bind(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
1777
-	    || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
2070
+    if (kz_amqp_queue_bind(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->queue, bind->queue_bindings, kz_amqp_empty_table) < 0
2071
+	    || kz_amqp_error("Binding targeted queue", amqp_get_rpc_reply(kz_conn->conn)))
1778 2072
     {
1779 2073
 		goto error;
1780 2074
     }
1781 2075
 
1782
-    if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0
1783
-	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
2076
+    if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue->name, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0
2077
+	    || kz_amqp_error("Consuming targeted queue", amqp_get_rpc_reply(kz_conn->conn)))
1784 2078
     {
1785 2079
 		goto error;
1786 2080
     }
... ...
@@ -1790,71 +2084,59 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx )
1790 2084
     return ret;
1791 2085
 }
1792 2086
 
1793
-int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan)
2087
+int kz_amqp_bind_exchange(kz_amqp_conn_ptr kz_conn, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, kz_amqp_exchange_binding_ptr bindings)
1794 2088
 {
1795
-    int ret = -1;
1796
-    amqp_bytes_t federated_exchange = {0, 0};
1797
-    amqp_bytes_t federated_routing_key = {0, 0};
1798
-	char _federated[100];
1799
-
1800
-    if(bind->federate == 0
1801
-    		|| dbk_use_federated_exchange == 0
1802
-    		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
1803
-		kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
1804
-		if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
1805
-		{
1806
-			ret = -RET_AMQP_ERROR;
1807
-			goto error;
1808
-		}
1809
-    }
2089
+	while(bindings != NULL) {
2090
+		LM_DBG("DECLARE EXH BIND FOR %.*s\n", (int)exchange->name.len, (char*)exchange->name.bytes);
2091
+		LM_DBG("DECLARE EXH BIND TO %.*s\n", (int)bindings->from_exchange->name.len, (char*)bindings->from_exchange->name.bytes);
1810 2092
 
1811
-    if(bind->federate == 1
1812
-    		&& dbk_use_federated_exchange == 1
1813
-			&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
1814
-    	federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s);
1815
-		kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
1816
-		if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
1817
-		{
1818
-			ret = -RET_AMQP_ERROR;
1819
-			goto error;
2093
+		kz_amqp_exchange_declare(kz_conn->conn, channel, bindings->from_exchange, kz_amqp_empty_table);
2094
+		if (kz_amqp_error("Declaring binded exchange", amqp_get_rpc_reply(kz_conn->conn)))
2095
+			return -RET_AMQP_ERROR;
2096
+
2097
+		kz_amqp_routings_ptr routings = bindings->routing;
2098
+		while(routings) {
2099
+			if (amqp_exchange_bind(kz_conn->conn, channel, exchange->name, bindings->from_exchange->name, routings->routing, kz_amqp_empty_table) < 0
2100
+				|| kz_amqp_error("Binding exchange", amqp_get_rpc_reply(kz_conn->conn)))
2101
+				return -RET_AMQP_ERROR;
2102
+			routings = routings->next;
1820 2103
 		}
1821
-    }
2104
+		bindings = bindings->next;
2105
+	}
2106
+	return 0;
2107
+
2108
+}
1822 2109
 
1823
-	amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table);
1824
-	if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
2110
+int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan)
2111
+{
2112
+    int ret = -1;
2113
+
2114
+    LM_DBG("BINDING CONSUMER %i TO %.*s\n", idx,  (int)bind->exchange->name.len,  (char*)bind->exchange->name.bytes);
2115
+	kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, kz_amqp_empty_table);
2116
+	if (kz_amqp_error("Declaring consumer exchange", amqp_get_rpc_reply(kz_conn->conn)))
1825 2117
 	{
1826 2118
 		ret = -RET_AMQP_ERROR;
1827 2119
 		goto error;
1828 2120
 	}
1829 2121
 
1830
-    if(bind->federate == 0
1831
-    		|| dbk_use_federated_exchange == 0
1832
-    		|| kz_conn->server->zone == kz_amqp_get_primary_zone()) {
1833
-		if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0
1834
-			|| kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn)))
1835
-		{
1836
-			ret = -RET_AMQP_ERROR;
1837
-			goto error;
1838
-		}
1839
-    }
2122
+	if((ret =  kz_amqp_bind_exchange(kz_conn, chan[idx].channel, bind->exchange, bind->exchange_bindings)) != 0)
2123
+		goto error;
1840 2124
 
1841
-    if(bind->federate == 1
1842
-    		&& dbk_use_federated_exchange == 1
1843
-			&& kz_conn->server->zone != kz_amqp_get_primary_zone()) {
1844
-    	sprintf(_federated, "%.*s%s%.*s", (int)bind->exchange.len, (char*)bind->exchange.bytes,
1845
-    			(bind->routing_key.len == 0 ? "" : "."),
1846
-				(int)bind->routing_key.len, (char*)bind->routing_key.bytes
1847
-    			);
1848
-    	federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated);
1849
-		if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, federated_exchange, federated_routing_key, kz_amqp_empty_table) < 0
1850
-			|| kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn)))
1851
-		{
1852
-			ret = -RET_AMQP_ERROR;
1853
-			goto error;
1854
-		}
2125
+	kz_amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_table);
2126
+	if (kz_amqp_error("Declaring consumer queue", amqp_get_rpc_reply(kz_conn->conn)))
2127
+	{
2128
+		ret = -RET_AMQP_ERROR;
2129
+		goto error;
2130
+	}
2131
+
2132
+	if (kz_amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->exchange, bind->queue, bind->queue_bindings, kz_amqp_empty_table) < 0
2133
+		|| kz_amqp_error("Binding consumer queue", amqp_get_rpc_reply(kz_conn->conn)))
2134
+	{
2135
+		ret = -RET_AMQP_ERROR;
2136
+		goto error;
1855 2137
     }
1856 2138
 
1857
-    if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
2139
+    if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue->name, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0
1858 2140
 	    || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn)))
1859 2141
     {
1860 2142
 		ret = -RET_AMQP_ERROR;
... ...
@@ -1865,8 +2147,6 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int i
1865 2147
 	chan[idx].consumer = bind;
1866 2148
     ret = idx;
1867 2149
  error:
1868
- 	 kz_local_amqp_bytes_free(federated_exchange);
1869
- 	 kz_local_amqp_bytes_free(federated_routing_key);
1870 2150
      return ret;
1871 2151
 }
1872 2152
 
... ...
@@ -1902,14 +2182,14 @@ int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel
1902 2182
     }
1903 2183
 
1904 2184
     if(kz_json_get_object(json_obj, BLF_JSON_SERVERID) == NULL) {
1905
-        json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)srv->channels[idx].targeted->routing_key.bytes));
2185
+        json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)srv->channels[idx].targeted->queue_bindings->routing.bytes));
1906 2186
     	amqp_bytes_free(payload);
1907 2187
         payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj)));
1908 2188
     }
1909 2189
 
1910 2190
 	int amqpres = amqp_basic_publish(srv->producer->conn, srv->channels[idx].channel, exchange, routing_key, 0, 0, &props, payload);
1911 2191
 	if ( amqpres != AMQP_STATUS_OK ) {
1912
-		LM_ERR("Failed to publish\n");
2192
+		LM_ERR("Failed to publish %i : %s\n", amqpres, amqp_error_string2(amqpres));
1913 2193
 		ret = -1;
1914 2194
 		goto error;
1915 2195
 	}
... ...
@@ -2006,7 +2286,16 @@ void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey)
2006 2286
     char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey);
2007 2287
 
2008 2288
     json_extract_field(key, ev_category);
2289
+    if(ev_category.len == 0 && event_key) {
2290
+	    ev_category.s = event_key;
2291
+	    ev_category.len = strlen(event_key);
2292
+    }
2293
+
2009 2294
     json_extract_field(subkey, ev_name);
2295
+    if(ev_name.len == 0 && event_subkey) {
2296
+	    ev_name.s = event_subkey;
2297
+	    ev_name.len = strlen(event_subkey);
2298
+    }
2010 2299
 
2011 2300
     sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
2012 2301
     for (p=buffer ; *p; ++p) *p = tolower(*p);
... ...
@@ -2510,14 +2799,13 @@ void kz_send_targeted_cmd(int server_id, amqp_bytes_t body)
2510 2799
     kz_amqp_cmd_ptr cmd = NULL;
2511 2800
     json_object* JObj = NULL;
2512 2801
 	char* payload = kz_local_amqp_bytes_dup(body);
2513
-	json_obj_ptr json_obj = NULL;
2514 2802
 
2515 2803
 	if(payload == NULL) {
2516 2804
 		LM_ERR("error allocating message payload\n");
2517 2805
 		goto error;
2518 2806
 	}
2519 2807
 
2520
-	json_obj = kz_json_parse(payload );
2808
+	json_obj_ptr json_obj = kz_json_parse(payload );
2521 2809
     if (json_obj == NULL) {
2522 2810
 		LM_ERR("error parsing json payload\n");
2523 2811
 		goto error;
... ...
@@ -2587,6 +2875,9 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
2587 2875
     	return;
2588 2876
     }
2589 2877
 
2878
+    json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone));
2879
+
2880
+
2590 2881
     json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID);
2591 2882
     if(JObj != NULL) {
2592 2883
         const char* _kz_server_id_str = json_object_get_string(JObj);
... ...
@@ -2622,6 +2913,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
2622 2913
 	ptr->payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj));
2623 2914
 	ptr->cmd = cmd;
2624 2915
 	ptr->message_id = message_id;
2916
+	ptr->routing_key = kz_str_from_amqp_bytes(envelope->routing_key);
2625 2917
 
2626 2918
 	if(bind) {
2627 2919
 		ptr->event_key = kz_amqp_bytes_dup(bind->event_key);
... ...
@@ -2912,7 +3204,7 @@ int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback
2912 3204
 
2913 3205
 	return 0;
2914 3206
 
2915
-error: 
3207
+error:
2916 3208
 
2917 3209
 	if (timer_ev)
2918 3210
 		pkg_free(timer_ev);
... ...
@@ -166,20 +166,53 @@ typedef struct {
166 166
 	char* event_key;
167 167
 	char* event_subkey;
168 168
 	str* message_id;
169
+	str* routing_key;
169 170
 	kz_amqp_cmd_ptr cmd;
170 171
 } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr;
171 172
 
172 173
 typedef struct {
173
-	amqp_bytes_t exchange;
174
-	amqp_bytes_t exchange_type;
175
-	amqp_bytes_t routing_key;
176
-	amqp_bytes_t queue;
177
-	amqp_bytes_t event_key;
178
-	amqp_bytes_t event_subkey;
174
+	amqp_bytes_t name;
175
+	amqp_bytes_t type;
176
+	amqp_boolean_t passive;
177
+	amqp_boolean_t durable;
178
+	amqp_boolean_t auto_delete;
179
+	amqp_boolean_t internal;
180
+} kz_amqp_exchange, *kz_amqp_exchange_ptr;
181
+
182
+typedef struct {
183
+	amqp_bytes_t name;
179 184
 	amqp_boolean_t passive;
180 185
 	amqp_boolean_t durable;
181 186
 	amqp_boolean_t exclusive;
182 187
 	amqp_boolean_t auto_delete;
188
+} kz_amqp_queue, *kz_amqp_queue_ptr;
189
+
190
+typedef struct kz_amqp_routings_t {
191
+	amqp_bytes_t routing;
192
+	struct kz_amqp_routings_t* next;
193
+} kz_amqp_routings, *kz_amqp_routings_ptr;
194
+
195
+typedef struct kz_amqp_exchange_binding_t {
196
+	kz_amqp_exchange_ptr from_exchange;
197
+	kz_amqp_routings_ptr routing;
198
+	struct kz_amqp_exchange_binding_t* next;
199
+} kz_amqp_exchange_binding, *kz_amqp_exchange_binding_ptr;
200
+
201
+typedef struct {
202
+//	amqp_bytes_t exchange;
203
+//	amqp_bytes_t exchange_type;
204
+	kz_amqp_exchange_ptr exchange;
205
+	kz_amqp_exchange_binding_ptr exchange_bindings;
206
+	kz_amqp_queue_ptr queue;
207
+	kz_amqp_routings_ptr queue_bindings;
208
+//	amqp_bytes_t routing_key;
209
+//	amqp_bytes_t queue;
210
+	amqp_bytes_t event_key;
211
+	amqp_bytes_t event_subkey;
212
+//	amqp_boolean_t passive;
213
+//	amqp_boolean_t durable;
214
+//	amqp_boolean_t exclusive;
215
+//	amqp_boolean_t auto_delete;
183 216
 	amqp_boolean_t no_ack;
184 217
 	amqp_boolean_t wait_for_consumer_ack;
185 218
 	amqp_boolean_t federate;
... ...
@@ -280,6 +313,14 @@ void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer);
280 313
 int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data);
281 314
 void kz_amqp_heartbeat_proc(int fd, short event, void *arg);
282 315
 
316
+void kz_amqp_queue_free(kz_amqp_queue_ptr exchange);
317
+void kz_amqp_exchange_free(kz_amqp_exchange_ptr exchange);
318
+void kz_amqp_exchange_bindings_free(kz_amqp_exchange_binding_ptr binding);
319
+void kz_amqp_routing_free(kz_amqp_routings_ptr routing);
320
+kz_amqp_queue_ptr kz_amqp_queue_new(str *name);
321
+kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type);
322
+kz_amqp_routings_ptr kz_amqp_routing_new(char* routing);
323
+
283 324
 static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
284 325
 {
285 326
 	amqp_connection_close_t *mconn;
... ...
@@ -325,3 +366,4 @@ static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x)
325 366
 
326 367
 
327 368
 #endif /* KZ_AMQP_H_ */
369
+
... ...
@@ -37,20 +37,15 @@
37 37
 #include "../../core/pvar.h"
38 38
 #include "../../core/usr_avp.h"
39 39
 
40
-# define json_foreach_key(obj,key) \
41
-	char *key;\
42
-	struct lh_entry *entry ## key; \
43
-	struct lh_entry *entry_next ## key = NULL; \
44
-	for(entry ## key = json_object_get_object(obj)->head; \
45
-		(entry ## key ? ( \
46
-			key = (char*)entry ## key->k, \
47
-			entry_next ## key = entry ## key->next, \
48
-			entry ## key) : 0); \
49
-		entry ## key = entry_next ## key)
50 40
 
51 41
 
52 42
 static str kz_pv_str_empty = {"", 0};
53 43
 
44
+enum json_type kz_json_get_type(struct json_object *jso)
45
+{
46
+  return json_object_get_type(jso);
47
+}
48
+
54 49
 char** str_split(char* a_str, const char a_delim)
55 50
 {
56 51
     char** result    = 0;
... ...
@@ -322,3 +317,4 @@ int kz_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst)
322 317
 
323 318
 	return 1;
324 319
 }
320
+
... ...
@@ -36,8 +36,61 @@
36 36
 int kz_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst);
37 37
 int kz_json_get_field_ex(str* json, str* field, pv_value_p dst_val);
38 38
 int kz_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst);
39
+enum json_type kz_json_get_type(struct json_object *jso);
39 40
 
40 41
 struct json_object* kz_json_parse(const char *str);
41 42
 struct json_object* kz_json_get_object(struct json_object* jso, const char *key);
42 43
 
44
+#if defined(__GNUC__) && !defined(__STRICT_ANSI__) && __STDC_VERSION__ >= 199901L
45
+
46
+# define json_foreach(obj,key,val) \
47
+	char *key; \
48
+	struct json_object *val __attribute__((__unused__)); \
49
+	for(struct lh_entry *entry ## key = json_object_get_object(obj)->head, *entry_next ## key = NULL; \
50
+		({ if(entry ## key) { \
51
+			key = (char*)entry ## key->k; \
52
+			val = (struct json_object*)entry ## key->v; \
53
+			entry_next ## key = entry ## key->next; \
54
+		} ; entry ## key; }); \
55
+		entry ## key = entry_next ## key )
56
+
57
+# define json_foreach_key(obj,key) \
58
+	char *key; \
59
+	for(struct lh_entry *entry ## key = json_object_get_object(obj)->head, *entry_next ## key = NULL; \
60
+		({ if(entry ## key) { \
61
+			key = (char*)entry ## key->k; \
62
+			entry_next ## key = entry ## key->next; \
63
+		} ; entry ## key; }); \
64
+		entry ## key = entry_next ## key )
65
+
66
+#else /* ANSI C or MSC */
67
+
68
+# define json_foreach(obj,key,val) \
69
+	char *key;\
70
+	struct json_object *val; \
71
+	struct lh_entry *entry ## key; \
72
+	struct lh_entry *entry_next ## key = NULL; \
73
+	for(entry ## key = json_object_get_object(obj)->head; \
74
+		(entry ## key ? ( \
75
+			key = (char*)entry ## key->k, \
76
+			val = (struct json_object*)entry ## key->v, \
77
+			entry_next ## key = entry ## key->next, \
78
+			entry ## key) : 0); \
79
+		entry ## key = entry_next ## key)
80
+
81
+# define json_foreach_key(obj,key) \
82
+	char *key;\
83
+	struct lh_entry *entry ## key; \
84
+	struct lh_entry *entry_next ## key = NULL; \
85
+	for(entry ## key = json_object_get_object(obj)->head; \
86
+		(entry ## key ? ( \
87
+			key = (char*)entry ## key->k, \
88
+			entry_next ## key = entry ## key->next, \
89
+			entry ## key) : 0); \
90
+		entry ## key = entry_next ## key)
91
+
92
+#endif /* defined(__GNUC__) && !defined(__STRICT_ANSI__) && __STDC_VERSION__ >= 199901L */
93
+
94
+
95
+
43 96
 #endif /* KZ_JSON_H_ */