KAZOO Module

2600hz Inc.

   <engineering@2600hz.com>

Edited by

Luis Azedo

   <luis@2600hz.com>

   Copyright © 2010, 2014 2600hz
     __________________________________________________________________

   Table of Contents

   1. Admin Guide

        1. Overview
        2. How it works

              2.1. event routes
              2.2. acknowledge messages

        3. Dependencies

              3.1. Kamailio Modules
              3.2. External Libraries or Applications

        4. Parameters

              4.1. amqp related

                    4.1.1. node_hostname(str)
                    4.1.2. amqp_consumer_processes(int)
                    4.1.3. amqp_consumer_event_key(str)
                    4.1.4. amqp_consumer_event_subkey(str)
                    4.1.5. amqp_max_channels(str)
                    4.1.6. amqp_connection(str)
                    4.1.7. event_callback(str)

              4.2. execution control

                    4.2.1. amqp_consumer_loop_count(int)
                    4.2.2. amqp_internal_loop_count(int)
                    4.2.3. amqp_consumer_ack_loop_count(int)
                    4.2.4. consume_messages_on_reconnect(int)
                    4.2.5. single_consumer_on_reconnect(int)

              4.3. timers

                    4.3.1. amqp_consumer_ack_timeout(str)
                    4.3.2. amqp_interprocess_timeout(str)
                    4.3.3. amqp_waitframe_timeout(str)
                    4.3.4. amqp_query_timeout(str)
                    4.3.5. amqp_query_timeout_avp(str)

              4.4. presence related

                    4.4.1. db_url(str)
                    4.4.2. presentity_table(str)
                    4.4.3. pua_mode(int)

        5. Functions

              5.1. amqp related

                    5.1.1. kazoo_publish(exchange, routing_key,
                            json_payload)

                    5.1.2. kazoo_query(exchange, routing_key, json_payload
                            [, target_var])

                    5.1.3. kazoo_subscribe(exchange, exchange_type, queue,
                            routing_key)

                    5.1.4. kazoo_subscribe(json_description)

              5.2. presence related

                    5.2.1. kazoo_pua_publish(json_payload)

              5.3. other

                    5.3.1. kazoo_encode(to_encode, target_var)
                    5.3.2. kazoo_json(json_payload, field, target_var)

        6. Exported pseudo-variables
        7. Transformations

   List of Examples

   1.1. define the event route
   1.2. Set node_hostname parameter
   1.3. Set amqp_consumer_processes parameter
   1.4. Set amqp_consumer_event_key parameter
   1.5. Set amqp_consumer_event_subkey parameter
   1.6. Set amqp_max_channels parameter
   1.7. Set amqp_connection parameter
   1.8. Set event_callback parameter
   1.9. Set amqp_consumer_loop_count parameter
   1.10. Set amqp_internal_loop_count parameter
   1.11. Set amqp_consumer_ack_loop_count parameter
   1.12. Set consume_messages_on_reconnect parameter
   1.13. Set single_consumer_on_reconnect parameter
   1.14. Set amqp_consumer_ack_timeout parameter
   1.15. Set amqp_interprocess_timeout parameter
   1.16. Set amqp_waitframe_timeout parameter
   1.17. Set amqp_query_timeout parameter
   1.18. >Set amqp_query_timeout_avp parameter
   1.19. Set db_url parameter
   1.20. Set presentity_table parameter
   1.21. Set pua_mode parameter
   1.22. kazoo_publish usage
   1.23. kazoo_query usage
   1.24. kazoo_subscribe usage
   1.25. kazoo_subscribe usage
   1.26. kazoo_pua_publish usage
   1.27. kazoo_encode usage
   1.28. kazoo_json usage
   1.29. kz.json usage
   1.30. kz.encode usage

Chapter 1. Admin Guide

   Table of Contents

   1. Overview
   2. How it works

        2.1. event routes
        2.2. acknowledge messages

   3. Dependencies

        3.1. Kamailio Modules
        3.2. External Libraries or Applications

   4. Parameters

        4.1. amqp related

              4.1.1. node_hostname(str)
              4.1.2. amqp_consumer_processes(int)
              4.1.3. amqp_consumer_event_key(str)
              4.1.4. amqp_consumer_event_subkey(str)
              4.1.5. amqp_max_channels(str)
              4.1.6. amqp_connection(str)
              4.1.7. event_callback(str)

        4.2. execution control

              4.2.1. amqp_consumer_loop_count(int)
              4.2.2. amqp_internal_loop_count(int)
              4.2.3. amqp_consumer_ack_loop_count(int)
              4.2.4. consume_messages_on_reconnect(int)
              4.2.5. single_consumer_on_reconnect(int)

        4.3. timers

              4.3.1. amqp_consumer_ack_timeout(str)
              4.3.2. amqp_interprocess_timeout(str)
              4.3.3. amqp_waitframe_timeout(str)
              4.3.4. amqp_query_timeout(str)
              4.3.5. amqp_query_timeout_avp(str)

        4.4. presence related

              4.4.1. db_url(str)
              4.4.2. presentity_table(str)
              4.4.3. pua_mode(int)

   5. Functions

        5.1. amqp related

              5.1.1. kazoo_publish(exchange, routing_key, json_payload)
              5.1.2. kazoo_query(exchange, routing_key, json_payload [,
                      target_var])

              5.1.3. kazoo_subscribe(exchange, exchange_type, queue,
                      routing_key)

              5.1.4. kazoo_subscribe(json_description)

        5.2. presence related

              5.2.1. kazoo_pua_publish(json_payload)

        5.3. other

              5.3.1. kazoo_encode(to_encode, target_var)
              5.3.2. kazoo_json(json_payload, field, target_var)

   6. Exported pseudo-variables
   7. Transformations

1. Overview

   The Kazoo is a general purpose AMQP connector (tested with
   rabbitmq-server). It exposes publish/consume capabilities into
   Kamailio.

   From a high-level, the purpose of the module might be for things like:
     * Integrate to an AMQP application to make real-time routing
       decisions (instead of using, say, a SQL database)
     * Provide a real-time integration into your program, instead of your
       database, so you can overlay additional logic in your preferred
       language while also utilizing a message bus
     * Utilize messaging to have a distributed messaging layer, such that
       machines processing requests/responses/events can go up/down or
       share the workload and your Kamailio node will still be happy

   supported operations are:
     * publish json payloads to rabbitmq
     * publish json payloads to rabbitmq and wait for correlated response
       message
     * subscribe to an exchange with a routing key

   The Kazoo module also has support to publish updates to presence module
   thru the kazoo_pua_publish function

2. How it works

   2.1. event routes
   2.2. acknowledge messages

   The module works with a main forked process that does the communication
   with rabbitmq for issuing publishes, waiting for replies and consuming
   messages. When it consumes a message it defers the process to a worker
   process so that it doesn't block this main process.

2.1. event routes

   The worker process issues an event-route where we can act on the
   received payload. The name of the event-route is composed by values
   extracted from the payload.

   Kazoo module will try to execute the event route from most significant
   to less significant. define the event route like
   event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_val
   ue]]]

   we can set the key/subkey pair on a subscription base. check the
   payload on subscribe.

   Example 1.1. define the event route
...
modparam("kazoo", "amqp_consumer_event_key", "Event-Category")
modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name")
...

event_route[kazoo:consumer-event-presence-update]
{
# presence is the value extracted from Event-Category field in json payload
# update is the value extracted from Event-Name field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,
From})");
...
}

event_route[kazoo:consumer-event-presence]
{
# presence is the value extracted from Event-Category field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,
From})");
...
}

event_route[kazoo:consumer-event-event-category-event-name]
{
# event-category is the name of the amqp_consumer_event_key parameter
# event-name is the name of the amqp_consumer_event_subkey parameter
# this event route is executed if we can't find the previous
...
}

event_route[kazoo:consumer-event-event-category]
{
# event-category is the name of the amqp_consumer_event_key parameter
# this event route is executed if we can't find the previous
...
}

event_route[kazoo:consumer-event]
{
# this event route is executed if we can't find the previous
}

2.2. acknowledge messages

   Consumed messages have the option of being acknowledge in two ways:
     * immediately when received
     * after processing by the worker

3. Dependencies

   3.1. Kamailio Modules
   3.2. External Libraries or Applications

3.1. Kamailio Modules

   The following modules must be loaded before this module:
     * none.

3.2. External Libraries or Applications

     * librabbitmq.
     * libjson.
     * libuuid.

4. Parameters

   4.1. amqp related

        4.1.1. node_hostname(str)
        4.1.2. amqp_consumer_processes(int)
        4.1.3. amqp_consumer_event_key(str)
        4.1.4. amqp_consumer_event_subkey(str)
        4.1.5. amqp_max_channels(str)
        4.1.6. amqp_connection(str)
        4.1.7. event_callback(str)

   4.2. execution control

        4.2.1. amqp_consumer_loop_count(int)
        4.2.2. amqp_internal_loop_count(int)
        4.2.3. amqp_consumer_ack_loop_count(int)
        4.2.4. consume_messages_on_reconnect(int)
        4.2.5. single_consumer_on_reconnect(int)

   4.3. timers

        4.3.1. amqp_consumer_ack_timeout(str)
        4.3.2. amqp_interprocess_timeout(str)
        4.3.3. amqp_waitframe_timeout(str)
        4.3.4. amqp_query_timeout(str)
        4.3.5. amqp_query_timeout_avp(str)

   4.4. presence related

        4.4.1. db_url(str)
        4.4.2. presentity_table(str)
        4.4.3. pua_mode(int)

4.1. amqp related

4.1.1. node_hostname(str)

   The name of this host to register in rabbitmq.

   Default value is NULL. you must set this parameter value for the module
   to work

   Example 1.2. Set node_hostname parameter
...
modparam("kazoo", "node_hostname", "sipproxy.mydomain.com")
...

4.1.2. amqp_consumer_processes(int)

   The number of worker processes to handle messages consumption.

   Default value is 4.

   Example 1.3. Set amqp_consumer_processes parameter
...
modparam("kazoo", "amqp_consumer_processes", 10)
...

4.1.3. amqp_consumer_event_key(str)

   The default name of the field in json payload to compose the event name
   1st part

   Default value is “Event-Category”.

   Example 1.4. Set amqp_consumer_event_key parameter
...
modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name")
...

4.1.4. amqp_consumer_event_subkey(str)

   The default name of the field in json payload to compose the event name
   2nd part

   Default value is “Event-Name”.

   Example 1.5. Set amqp_consumer_event_subkey parameter
...
modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name")
...

4.1.5. amqp_max_channels(str)

   The number of pre allocated channels for the connection.

   Default value is 50.

   Example 1.6. Set amqp_max_channels parameter
...
modparam("kazoo", "amqp_max_channels", 100)
...

4.1.6. amqp_connection(str)

   The connection url to rabbitmq. can be set multiple times for failover.

   Example 1.7. Set amqp_connection parameter
...
modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672")
modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")
...

4.1.7. event_callback(str)

   The name of the function in the kemi configuration file (embedded
   scripting language such as Lua, Python, ...) to be executed instead of
   event_route[...] blocks.

   The function receives a string parameter with the name of the event,
   the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'.

   Example 1.8. Set event_callback parameter
    ...
    modparam("kazoo", "event_callback", "ksr_kazoo_event")
    ...

4.2. execution control

   execution control of main loop can be controlled by changing the
   parameter values in this section.

   The main loop has 3 sub-loops were it listen for actions to execute
   with a timeout. These group of parameters allow to set the maximum
   number of times the sub-loop is executed if it doesn't timeout.

   On busy systems, we may have a condition where a sub-loop never times
   out because it always has data to process. The purpose of these
   parameters is to set a maximum number of times it executes before it
   handles control to the next sub-loop.
...
while(true) // main  loop
while(ACK or timeout)  // acknowledge from worker process
while(SEND or timeout) // anything to send ?
while(CONSUME or timeout) // any data on consumed exchanges ?
...

4.2.1. amqp_consumer_loop_count(int)

   The consumer loop count.

   Default value is 10.

   Example 1.9. Set amqp_consumer_loop_count parameter
...
modparam("kazoo", "amqp_consumer_loop_count", 3)
...

4.2.2. amqp_internal_loop_count(int)

   The internal listen for commands loop count.

   Default value is 5.

   Example 1.10. Set amqp_internal_loop_count parameter
...
modparam("kazoo", "amqp_internal_loop_count", 1)
...

4.2.3. amqp_consumer_ack_loop_count(int)

   The work ack loop count.

   Default value is 20.

   Example 1.11. Set amqp_consumer_ack_loop_count parameter
...
modparam("kazoo", "amqp_consumer_ack_loop_count", 5)
...

4.2.4. consume_messages_on_reconnect(int)

   This parameter indicates if the module ignores the loop counters on
   reconnect and consumes all the pending messages ready to be consumed.

   Default value is 1.

   Example 1.12. Set consume_messages_on_reconnect parameter
...
modparam("kazoo", "consume_messages_on_reconnect", 0)
...

4.2.5. single_consumer_on_reconnect(int)

   When the main loop receives a message from rabbitmq it will defer the
   execution to a worker in a round-robin manner. this parameter allows to
   use the same worker when kazoo reconnects to rabbitmq.

   Default value is 1.

   Example 1.13. Set single_consumer_on_reconnect parameter
...
modparam("kazoo", "single_consumer_on_reconnect", 0)
...

4.3. timers

   each functional parameter related to timers come with 2 reflected
   parameters. name_sec and name_micro

4.3.1. amqp_consumer_ack_timeout(str)

   Timeout when checking for acknowledge from workers.

   Default value is 100000 micro.

   Example 1.14. Set amqp_consumer_ack_timeout parameter
...
modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1)
modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000)
...

4.3.2. amqp_interprocess_timeout(str)

   Timeout when checking for commands (publish/query) for sending to
   rabbitmq.

   Default value is 100000 micro.

   Example 1.15. Set amqp_interprocess_timeout parameter
...
modparam("kazoo", "amqp_interprocess_timeout_sec", 1)
modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)
...

4.3.3. amqp_waitframe_timeout(str)

   Timeout when checking for messages from rabbitmq.

   Default value is 100000 micro.

   Example 1.16. Set amqp_waitframe_timeout parameter
...
modparam("kazoo", "amqp_waitframe_timeout_sec", 1)
modparam("kazoo", "amqp_waitframe_timeout_micro", 200000)
...

4.3.4. amqp_query_timeout(str)

   Timeout when checking for reply messages from rabbitmq for kazoo_query
   commands.

   Default value is 2 sec.

   Example 1.17. Set amqp_query_timeout parameter
...
modparam("kazoo", "amqp_query_timeout_sec", 1)
modparam("kazoo", "amqp_query_timeout_micro", 200000)
...

4.3.5. amqp_query_timeout_avp(str)

   avp holding the value in seconds for Timeout when checking for reply
   messages from rabbitmq for kazoo_query commands.

   Default value is NULL (no value).

   Example 1.18. >Set amqp_query_timeout_avp parameter
...
modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)")

route[SOME_ROUTE]
{
    $var(kz_timeout) = 12;
    kazoo_query(exchange, routingkey, payload);
}

...

4.4. presence related

4.4.1. db_url(str)

   The database for the presentity table.

   If set, the kazoo_ppua_publish function will update the presentity
   status in the database.

   Default value is “NULL”.

   Example 1.19. Set db_url parameter
...
modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
...

4.4.2. presentity_table(str)

   The name of the presentity table in the database.

   Default value is “presentity”.

   Example 1.20. Set presentity_table parameter
...
modparam("kazoo", "presentity_table", "my_presentity_table")
...

4.4.3. pua_mode(int)

   Control if the module has to connect to presence database tables. Set
   it to 0 to not connect to database.

   Default value is “1”.

   Example 1.21. Set pua_mode parameter
...
modparam("kazoo", "pua_mode", 0)
...

5. Functions

   5.1. amqp related

        5.1.1. kazoo_publish(exchange, routing_key, json_payload)
        5.1.2. kazoo_query(exchange, routing_key, json_payload [,
                target_var])

        5.1.3. kazoo_subscribe(exchange, exchange_type, queue,
                routing_key)

        5.1.4. kazoo_subscribe(json_description)

   5.2. presence related

        5.2.1. kazoo_pua_publish(json_payload)

   5.3. other

        5.3.1. kazoo_encode(to_encode, target_var)
        5.3.2. kazoo_json(json_payload, field, target_var)

5.1. amqp related

5.1.1.  kazoo_publish(exchange, routing_key, json_payload)

   The function publishes a json payload to rabbitmq. The routing_key
   parameter should be encoded.

   This function can be used from ANY ROUTE.

   Example 1.22. kazoo_publish usage
...
$var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" :
 "reg_success", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,
})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU"
, "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(u
a{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU;
kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request));
...

5.1.2.  kazoo_query(exchange, routing_key, json_payload [, target_var])

   The function publishes a json payload to rabbitmq, waits for a
   correlated messageand puts the result in target_var. The routing_key
   parameter should be encoded. target_var is optional as the function
   also puts the result in pseudo-variable $kzR.

   This function can be used from ANY ROUTE.

   Example 1.23. kazoo_query usage
...
$var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' :
'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "',
'Active-Only' : false }";
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$
var(amqp_result)")) {
   kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
   if($du != $null) {
       xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
       return;
   }
}
...

5.1.3.  kazoo_subscribe(exchange, exchange_type, queue, routing_key)

   The function subscribes to exchange/type on queue with routing_key. The
   routing_key parameter should be encoded.

   This function must be called from event_route[kazoo:mod-init].

   Example 1.24. kazoo_subscribe usage
...
event_route[kazoo:mod-init]
{
   kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOST
NAME");
}

event_route[kazoo:consumer-event]
{
    xlog("L_INFO","Received json payload : $kzE");
}
...

5.1.4.  kazoo_subscribe(json_description)

   The function takes additional parameters to the subscribe function.

   json payload fields description
     * exchange : str, required
     * type : str, required
     * queue : str, required
     * routing : str, required
     * auto_delete : int, default 1
     * durable : int, default 0
     * no_ack : int, default 1
     * wait_for_consumer_ack : int, default 0
     * event_key : str, no default
     * event_subkey : str, no default

   This function must be called from event_route[kazoo:mod-init].

   Example 1.25. kazoo_subscribe usage
...
event_route[kazoo:mod-init]
{
    $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' :
'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'dura
ble' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }";
    kazoo_subscribe("$var(payload)");
}

event_route[kazoo:consumer-event]
{
    xlog("L_INFO","Received json payload : $kzE");
}
...

5.2. presence related

5.2.1.  kazoo_pua_publish(json_payload)

   The function build presentity state from json_payload and updates
   presentity table.

   This function can be used from ANY ROUTE.

   Example 1.26. kazoo_pua_publish usage
...
event_route[kazoo:consumer-event-presence-update]
{
    xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.j
son,From})");
    kazoo_pua_publish($kzE);
    pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})
", 1);
}
...

5.3. other

5.3.1.  kazoo_encode(to_encode, target_var)

   The function encodes the 1st parameter for amqp and puts the result in
   the 2nd parameter.

   This function can be used from ANY ROUTE.

   Example 1.27. kazoo_encode usage
...
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
...

5.3.2.  kazoo_json(json_payload, field, target_var)

   The function extracts the value from a json payload and puts the result
   in the 3rd parameter. It can use nested values for the query part.

   This function can be used from ANY ROUTE.

   Example 1.28. kazoo_json usage
...
kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
if($du != $null) {
  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
  return;
}
...

6. Exported pseudo-variables

     * $kzR Contains the payload result of kazoo_query execution.
     * $kzE Contains the payload of a consumed message

7. Transformations

   The prefix for kazoo transformations is kz.
     * json
       Example 1.29. kz.json usage
...
#kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
$du = $kzR{kz.json,Channels[0].switch_url};
if($du != $null) {
  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
  return;
}
...
     * encode
       Example 1.30. kz.encode usage
...
#kazoo_encode("$ci", "$var(callid_encoded)");
#$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
$var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode})
...