name mode size
..
doc 040000
Makefile 100644 1.1kB
README 100644 21.45kB
const.c 100644 1.28kB
const.h 100644 952B
defs.h 100644 4.56kB
kazoo.c 100644 11.69kB
kz_amqp.c 100644 57.77kB
kz_amqp.h 100644 5.54kB
kz_fixup.c 100644 2.55kB
kz_fixup.h 100644 558B
kz_json.c 100644 5.56kB
kz_json.h 100644 478B
kz_pua.c 100644 13.76kB
kz_pua.h 100644 125B
kz_trans.c 100644 9.41kB
kz_trans.h 100644 1.17kB
README
KAZOO Module 2600hz Inc. <engineering@2600hz.com> Edited by Luis Azedo <luis@2600hz.com> Copyright © 2010, 2014 2600hz __________________________________________________________________ Table of Contents 1. Admin Guide 1. Overview 2. How it works 2.1. event routes 2.2. aknowledge messages 3. Dependencies 3.1. Kamailio Modules 3.2. External Libraries or Applications 4. Parameters 4.1. amqp related 4.1.1. node_hostname(str) 4.1.2. amqp_consumer_processes(int) 4.1.3. amqp_consumer_event_key(str) 4.1.4. amqp_consumer_event_subkey(str) 4.1.5. amqp_max_channels(str) 4.1.6. amqp_connection(str) 4.2. execution control 4.2.1. amqp_consumer_loop_count(int) 4.2.2. amqp_internal_loop_count(int) 4.2.3. amqp_consumer_ack_loop_count(int) 4.2.4. consume_messages_on_reconnect(int) 4.2.5. single_consumer_on_reconnect(int) 4.3. timers 4.3.1. amqp_consumer_ack_timeout(str) 4.3.2. amqp_interprocess_timeout(str) 4.3.3. amqp_waitframe_timout(str) 4.3.4. amqp_query_timout(str) 4.4. presence related 4.4.1. db_url(str) 4.4.2. presentity_table(str) 5. Functions 5.1. amqp related 5.1.1. kazoo_publish(exchange, routing_key, json_payload) 5.1.2. kazoo_query(exchange, routing_key, json_payload [, target_var]) 5.1.3. kazoo_subscribe(exchange, exchange_type, queue, routing_key) 5.1.4. kazoo_subscribe(json_description) 5.2. presence related 5.2.1. kazoo_pua_publish(json_payload) 5.3. other 5.3.1. kazoo_encode(to_encode, target_var) 5.3.2. kazoo_json(json_payload, field, target_var) 6. Exported pseudo-variables 7. Transformations List of Examples 1.1. define the event route 1.2. Set node_hostname parameter 1.3. Set amqp_consumer_processes parameter 1.4. Set amqp_consumer_event_key parameter 1.5. Set amqp_consumer_event_subkey parameter 1.6. Set amqp_max_channels parameter 1.7. Set amqp_connection parameter 1.8. Set amqp_consumer_loop_count parameter 1.9. Set amqp_internal_loop_count parameter 1.10. Set amqp_consumer_ack_loop_count parameter 1.11. Set consume_messages_on_reconnect parameter 1.12. Set single_consumer_on_reconnect parameter 1.13. Set amqp_consumer_ack_timeout parameter 1.14. Set amqp_interprocess_timeout parameter 1.15. Set amqp_waitframe_timout parameter 1.16. Set amqp_query_timout parameter 1.17. Set db_url parameter 1.18. Set presentity_table parameter 1.19. kazoo_publish usage 1.20. kazoo_query usage 1.21. kazoo_subscribe usage 1.22. kazoo_subscribe usage 1.23. kazoo_pua_publish usage 1.24. kazoo_encode usage 1.25. kazoo_json usage 1.26. kz.json usage 1.27. kz.encode usage Chapter 1. Admin Guide Table of Contents 1. Overview 2. How it works 2.1. event routes 2.2. aknowledge messages 3. Dependencies 3.1. Kamailio Modules 3.2. External Libraries or Applications 4. Parameters 4.1. amqp related 4.1.1. node_hostname(str) 4.1.2. amqp_consumer_processes(int) 4.1.3. amqp_consumer_event_key(str) 4.1.4. amqp_consumer_event_subkey(str) 4.1.5. amqp_max_channels(str) 4.1.6. amqp_connection(str) 4.2. execution control 4.2.1. amqp_consumer_loop_count(int) 4.2.2. amqp_internal_loop_count(int) 4.2.3. amqp_consumer_ack_loop_count(int) 4.2.4. consume_messages_on_reconnect(int) 4.2.5. single_consumer_on_reconnect(int) 4.3. timers 4.3.1. amqp_consumer_ack_timeout(str) 4.3.2. amqp_interprocess_timeout(str) 4.3.3. amqp_waitframe_timout(str) 4.3.4. amqp_query_timout(str) 4.4. presence related 4.4.1. db_url(str) 4.4.2. presentity_table(str) 5. Functions 5.1. amqp related 5.1.1. kazoo_publish(exchange, routing_key, json_payload) 5.1.2. kazoo_query(exchange, routing_key, json_payload [, target_var]) 5.1.3. kazoo_subscribe(exchange, exchange_type, queue, routing_key) 5.1.4. kazoo_subscribe(json_description) 5.2. presence related 5.2.1. kazoo_pua_publish(json_payload) 5.3. other 5.3.1. kazoo_encode(to_encode, target_var) 5.3.2. kazoo_json(json_payload, field, target_var) 6. Exported pseudo-variables 7. Transformations 1. Overview The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio. From a high-level, the purpose of the module might be for things like: * Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database) * Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus * Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy supported operations are: * publish json payloads to rabbitmq * publish json payloads to rabbitmq and wait for correlated response message * subscribe to an exchange with a routing key The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function 2. How it works 2.1. event routes 2.2. aknowledge messages The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process. 2.1. event routes The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload. Kazoo module will try to execute the event route from most significant to less significant. define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_val ue]]] we can set the key/subkey pair on a subscription base. check the payload on subscribe. Example 1.1. define the event route ... modparam("kazoo", "amqp_consumer_event_key", "Event-Category") modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name") ... event_route[kazoo:consumer-event-presence-update] { # presence is the value extracted from Event-Category field in json payload # update is the value extracted from Event-Name field in json payload xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json ,From})"); ... } event_route[kazoo:consumer-event-presence] { # presence is the value extracted from Event-Category field in json payload xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json ,From})"); ... } event_route[kazoo:consumer-event-event-category-event-name] { # event-category is the name of the amqp_consumer_event_key parameter # event-name is the name of the amqp_consumer_event_subkey parameter # this event route is executed if we can't find the previous ... } event_route[kazoo:consumer-event-event-category] { # event-category is the name of the amqp_consumer_event_key parameter # this event route is executed if we can't find the previous ... } event_route[kazoo:consumer-event] { # this event route is executed if we can't find the previous } 2.2. aknowledge messages Consumed messages have the option of being acknowledge in two ways: * immediately when received * after processing by the worker 3. Dependencies 3.1. Kamailio Modules 3.2. External Libraries or Applications 3.1. Kamailio Modules The following modules must be loaded before this module: * none. 3.2. External Libraries or Applications * librabbitmq. * libjson. * libuuid. 4. Parameters 4.1. amqp related 4.1.1. node_hostname(str) 4.1.2. amqp_consumer_processes(int) 4.1.3. amqp_consumer_event_key(str) 4.1.4. amqp_consumer_event_subkey(str) 4.1.5. amqp_max_channels(str) 4.1.6. amqp_connection(str) 4.2. execution control 4.2.1. amqp_consumer_loop_count(int) 4.2.2. amqp_internal_loop_count(int) 4.2.3. amqp_consumer_ack_loop_count(int) 4.2.4. consume_messages_on_reconnect(int) 4.2.5. single_consumer_on_reconnect(int) 4.3. timers 4.3.1. amqp_consumer_ack_timeout(str) 4.3.2. amqp_interprocess_timeout(str) 4.3.3. amqp_waitframe_timout(str) 4.3.4. amqp_query_timout(str) 4.4. presence related 4.4.1. db_url(str) 4.4.2. presentity_table(str) 4.1. amqp related 4.1.1. node_hostname(str) The name of this host to register in rabbitmq. Default value is NULL. you must set this parameter value for the module to work Example 1.2. Set node_hostname parameter ... modparam("kazoo", "node_hostname", "sipproxy.mydomain.com") ... 4.1.2. amqp_consumer_processes(int) The number of worker processes to handle messages consumption. Default value is 4. Example 1.3. Set amqp_consumer_processes parameter ... modparam("kazoo", "amqp_consumer_processes", 10) ... 4.1.3. amqp_consumer_event_key(str) The default name of the field in json payload to compose the event name 1st part Default value is “Event-Category”. Example 1.4. Set amqp_consumer_event_key parameter ... modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name") ... 4.1.4. amqp_consumer_event_subkey(str) The default name of the field in json payload to compose the event name 2nd part Default value is “Event-Name”. Example 1.5. Set amqp_consumer_event_subkey parameter ... modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name") ... 4.1.5. amqp_max_channels(str) The number of pre allocated channels for the connection. Default value is 50. Example 1.6. Set amqp_max_channels parameter ... modparam("kazoo", "amqp_max_channels", 100) ... 4.1.6. amqp_connection(str) The connection url to rabbitmq. can be set multiple times for failover. Example 1.7. Set amqp_connection parameter ... modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672") modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672") ... 4.2. execution control execution control of main loop can be controlled by changing the parameter values in this section. The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout. On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop. ... while(true) // main loop while(ACK or timeout) // acknowledge from worker process while(SEND or timeout) // anything to send ? while(CONSUME or timeout) // any data on consumed exchanges ? ... 4.2.1. amqp_consumer_loop_count(int) The consumer loop count. Default value is 10. Example 1.8. Set amqp_consumer_loop_count parameter ... modparam("kazoo", "amqp_consumer_loop_count", 3) ... 4.2.2. amqp_internal_loop_count(int) The internal listen for commands loop count. Default value is 5. Example 1.9. Set amqp_internal_loop_count parameter ... modparam("kazoo", "amqp_internal_loop_count", 1) ... 4.2.3. amqp_consumer_ack_loop_count(int) The work ack loop count. Default value is 20. Example 1.10. Set amqp_consumer_ack_loop_count parameter ... modparam("kazoo", "amqp_consumer_ack_loop_count", 5) ... 4.2.4. consume_messages_on_reconnect(int) This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed. Default value is 1. Example 1.11. Set consume_messages_on_reconnect parameter ... modparam("kazoo", "consume_messages_on_reconnect", 0) ... 4.2.5. single_consumer_on_reconnect(int) When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. this parameter allows to use the same worker when kazoo reconnects to rabbitmq. Default value is 1. Example 1.12. Set single_consumer_on_reconnect parameter ... modparam("kazoo", "single_consumer_on_reconnect", 0) ... 4.3. timers each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro 4.3.1. amqp_consumer_ack_timeout(str) Timeout when checking for aknowledge from workers. Default value is 100000 micro. Example 1.13. Set amqp_consumer_ack_timeout parameter ... modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1) modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000) ... 4.3.2. amqp_interprocess_timeout(str) Timeout when checking for commands (publish/query) for sending to rabbitmq. Default value is 100000 micro. Example 1.14. Set amqp_interprocess_timeout parameter ... modparam("kazoo", "amqp_interprocess_timeout_sec", 1) modparam("kazoo", "amqp_interprocess_timeout_micro", 200000) ... 4.3.3. amqp_waitframe_timout(str) Timeout when checking for messages from rabbitmq. Default value is 100000 micro. Example 1.15. Set amqp_waitframe_timout parameter ... modparam("kazoo", "amqp_waitframe_timout_sec", 1) modparam("kazoo", "amqp_waitframe_timout_micro", 200000) ... 4.3.4. amqp_query_timout(str) Timeout when checking for reply messages from rabbitmq for kazoo_query commands. Default value is 2 sec. Example 1.16. Set amqp_query_timout parameter ... modparam("kazoo", "amqp_query_timout_sec", 1) modparam("kazoo", "amqp_query_timout_micro", 200000) ... 4.4. presence related 4.4.1. db_url(str) The database for the presentity table. If set, the kazoo_ppua_publish function will update the presentity status in the database. Default value is “NULL”. Example 1.17. Set db_url parameter ... modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio") ... 4.4.2. presentity_table(str) The name of the presentity table in the database. Default value is “presentity”. Example 1.18. Set presentity_table parameter ... modparam("kazoo", "presentity_table", "my_presentity_table") ... 5. Functions 5.1. amqp related 5.1.1. kazoo_publish(exchange, routing_key, json_payload) 5.1.2. kazoo_query(exchange, routing_key, json_payload [, target_var]) 5.1.3. kazoo_subscribe(exchange, exchange_type, queue, routing_key) 5.1.4. kazoo_subscribe(json_description) 5.2. presence related 5.2.1. kazoo_pua_publish(json_payload) 5.3. other 5.3.1. kazoo_encode(to_encode, target_var) 5.3.2. kazoo_json(json_payload, field, target_var) 5.1. amqp related 5.1.1. kazoo_publish(exchange, routing_key, json_payload) The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded. This function can be used from ANY ROUTE. Example 1.19. kazoo_publish usage ... $var(amqp_payload_request) = "{'Event-Category' : 'directory', 'Event-Name' : ' reg_success', 'Contact' : '" + $var(fs_contact) + "', 'Call-ID' : '" + $ci + "' , 'Realm' : '" + $fd +"', 'Username' : '" + $fU + "', 'From-User' : '" + $fU + "', 'From-Host' : '" + $fd + "', 'To-User' : '" + $tU +"', 'To-Host' : '" + $td + "', 'User-Agent' : '" + $ua +"' ," + $var(register_contants)+ " }"; $var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU ; kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request)); ... 5.1.2. kazoo_query(exchange, routing_key, json_payload [, target_var]) The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded. target_var is optional as the function also puts the result in pseudo-variable $kzR. This function can be used from ANY ROUTE. Example 1.20. kazoo_query usage ... $var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "' , 'Active-Only' : false }"; kazoo_encode("$ci", "$var(callid_encoded)"); $var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), " $var(amqp_result)")) { kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } } ... 5.1.3. kazoo_subscribe(exchange, exchange_type, queue, routing_key) The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded. This function must be called from event_route[kazoo:mod-init]. Example 1.21. kazoo_subscribe usage ... event_route[kazoo:mod-init] { kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOS TNAME"); } event_route[kazoo:consumer-event] { xlog("L_INFO","Received json payload : $kzE"); } ... 5.1.4. kazoo_subscribe(json_description) The function takes additional parameters to the subscribe function. json payload fields description * exchange : str, required * type : str, required * queue : str, required * routing : str, required * auto_delete : int, default 1 * durable : int, default 0 * no_ack : int, default 1 * wait_for_consumer_ack : int, default 0 * event_key : str, no default * event_subkey : str, no default This function must be called from event_route[kazoo:mod-init]. Example 1.22. kazoo_subscribe usage ... event_route[kazoo:mod-init] { $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'du rable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }"; kazoo_subscribe("$var(payload)"); } event_route[kazoo:consumer-event] { xlog("L_INFO","Received json payload : $kzE"); } ... 5.2. presence related 5.2.1. kazoo_pua_publish(json_payload) The function build presentity state from json_payload and updates presentity table. This function can be used from ANY ROUTE. Example 1.23. kazoo_pua_publish usage ... event_route[kazoo:consumer-event-presence-update] { xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz. json,From})"); kazoo_pua_publish($kzE); pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package} )", 1); } ... 5.3. other 5.3.1. kazoo_encode(to_encode, target_var) The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter. This function can be used from ANY ROUTE. Example 1.24. kazoo_encode usage ... kazoo_encode("$ci", "$var(callid_encoded)"); $var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; ... 5.3.2. kazoo_json(json_payload, field, target_var) The function extracts the value from a json payload and puts the result in the 3rd parameter. It can use nested values for the query part. This function can be used from ANY ROUTE. Example 1.25. kazoo_json usage ... kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } ... 6. Exported pseudo-variables * $kzR Contains the payload result of kazoo_query execution. * $kzE Contains the payload of a consumed message 7. Transformations The prefix for kazoo transformations is kz. * json Example 1.26. kz.json usage ... #kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); $du = $kzR{kz.json,Channels[0].switch_url}; if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } ... * encode Example 1.27. kz.encode usage ... #kazoo_encode("$ci", "$var(callid_encoded)"); #$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; $var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode}) ...