/** * * Copyright (C) 2013 Charles Chance (Sipcentric Ltd) * * This file is part of Kamailio, a free SIP server. * * Kamailio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version * * Kamailio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #include "ht_dmq.h" #include "ht_api.h" typedef struct _ht_dmq_repdata { int action; str htname; str cname; int type; int intval; str strval; int expire; } ht_dmq_repdata_t; typedef struct _ht_dmq_jdoc_cell_group { int count; int size; srjson_doc_t jdoc; srjson_t *jdoc_cells; } ht_dmq_jdoc_cell_group_t; static str ht_dmq_content_type = str_init("application/json"); static str dmq_200_rpl = str_init("OK"); static str dmq_400_rpl = str_init("Bad Request"); static str dmq_500_rpl = str_init("Server Internal Error"); static int dmq_cell_group_empty_size = 12; // {"cells":[]} static int dmq_cell_group_max_size = 60000; static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group; extern int ht_dmq_init_sync; dmq_api_t ht_dmqb; dmq_peer_t* ht_dmq_peer = NULL; dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0}; int ht_dmq_send(str* body, dmq_node_t* node); int ht_dmq_send_sync(dmq_node_t* node); int ht_dmq_handle_sync(srjson_doc_t* jdoc); static int ht_dmq_cell_group_init(void) { if (ht_dmq_jdoc_cell_group.jdoc.root) return 0; // already initialised ht_dmq_jdoc_cell_group.count = 0; ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size; srjson_InitDoc(&ht_dmq_jdoc_cell_group.jdoc, NULL); ht_dmq_jdoc_cell_group.jdoc.root = srjson_CreateObject(&ht_dmq_jdoc_cell_group.jdoc); if (ht_dmq_jdoc_cell_group.jdoc.root==NULL) { LM_ERR("cannot create json root object! \n"); return -1; } ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc); if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) { LM_ERR("cannot create json cells array! \n"); srjson_DestroyDoc(&ht_dmq_jdoc_cell_group.jdoc); return -1; } return 0; } static int ht_dmq_cell_group_write(str* htname, ht_cell_t* ptr) { // jsonify cell and add to array str tmp; srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc; srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells; srjson_t * jdoc_cell = srjson_CreateObject(jdoc); if(!jdoc_cell) { LM_ERR("cannot create cell json root\n"); return -1; } // add json overhead if(ptr->flags&AVP_VAL_STR) { ht_dmq_jdoc_cell_group.size += 54; // {"htname":"","cname":"","type":,"strval":"","expire":} } else { ht_dmq_jdoc_cell_group.size += 52; // {"htname":"","cname":"","type":,"intval":,"expire":} } srjson_AddStrToObject(jdoc, jdoc_cell, "htname", htname->s, htname->len); ht_dmq_jdoc_cell_group.size += htname->len; srjson_AddStrToObject(jdoc, jdoc_cell, "cname", ptr->name.s, ptr->name.len); ht_dmq_jdoc_cell_group.size += ptr->name.len; if (ptr->flags&AVP_VAL_STR) { srjson_AddNumberToObject(jdoc, jdoc_cell, "type", AVP_VAL_STR); ht_dmq_jdoc_cell_group.size += 1; srjson_AddStrToObject(jdoc, jdoc_cell, "strval", ptr->value.s.s, ptr->value.s.len); ht_dmq_jdoc_cell_group.size += ptr->value.s.len; } else { srjson_AddNumberToObject(jdoc, jdoc_cell, "type", 0); ht_dmq_jdoc_cell_group.size += 1; srjson_AddNumberToObject(jdoc, jdoc_cell, "intval", ptr->value.n); tmp.s = sint2str((long)ptr->value.n, &tmp.len); ht_dmq_jdoc_cell_group.size += tmp.len; } srjson_AddNumberToObject(jdoc, jdoc_cell, "expire", ptr->expire); tmp.s = sint2str((long)ptr->expire, &tmp.len); ht_dmq_jdoc_cell_group.size += tmp.len; srjson_AddItemToArray(jdoc, jdoc_cells, jdoc_cell); ht_dmq_jdoc_cell_group.count++; return 0; } static int ht_dmq_cell_group_flush(dmq_node_t* node) { srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc; srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells; int ret = 0; srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells); LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size); jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root); if(jdoc->buf.s==NULL) { LM_ERR("unable to serialize data\n"); ret = -1; goto cleanup; } jdoc->buf.len = strlen(jdoc->buf.s); LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s); if (ht_dmq_send(&jdoc->buf, node)!=0) { LM_ERR("unable to send data\n"); ret = -1; } cleanup: srjson_DeleteItemFromObject(jdoc, jdoc->root, "cells"); ht_dmq_jdoc_cell_group.count = 0; ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size; if(jdoc->buf.s!=NULL) { jdoc->free_fn(jdoc->buf.s); jdoc->buf.s = NULL; } ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc); if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) { LM_ERR("cannot re-create json cells array! \n"); ret = -1; } return ret; } static void ht_dmq_cell_group_destroy() { srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc; if(jdoc->buf.s!=NULL) { jdoc->free_fn(jdoc->buf.s); jdoc->buf.s = NULL; } srjson_DestroyDoc(jdoc); } /** * @brief add notification peer */ int ht_dmq_initialize() { dmq_peer_t not_peer; /* load the DMQ API */ if (dmq_load_api(&ht_dmqb)!=0) { LM_ERR("cannot load dmq api\n"); return -1; } else { LM_DBG("loaded dmq api\n"); } not_peer.callback = ht_dmq_handle_msg; not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL); not_peer.description.s = "htable"; not_peer.description.len = 6; not_peer.peer_id.s = "htable"; not_peer.peer_id.len = 6; ht_dmq_peer = ht_dmqb.register_dmq_peer(¬_peer); if(!ht_dmq_peer) { LM_ERR("error in register_dmq_peer\n"); goto error; } else { LM_DBG("dmq peer registered\n"); } return 0; error: return -1; } int ht_dmq_send(str* body, dmq_node_t* node) { if (!ht_dmq_peer) { LM_ERR("ht_dmq_peer is null!\n"); return -1; } if (node) { LM_DBG("sending dmq message ...\n"); ht_dmqb.send_message(ht_dmq_peer, body, node, &ht_dmq_resp_callback, 1, &ht_dmq_content_type); } else { LM_DBG("sending dmq broadcast...\n"); ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type); } return 0; } /** * @brief ht dmq callback */ int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { int content_length; str body; ht_dmq_action_t action = HT_DMQ_NONE; str htname, cname; int type = 0, mode = 0; int_str val; srjson_doc_t jdoc; srjson_t *it = NULL; /* received dmq message */ LM_DBG("dmq message received\n"); srjson_InitDoc(&jdoc, NULL); if(!msg->content_length) { LM_ERR("no content length header found\n"); goto invalid; } content_length = get_content_length(msg); if(!content_length) { LM_DBG("content length is 0\n"); goto invalid; } body.s = get_body(msg); body.len = content_length; if (!body.s) { LM_ERR("unable to get body\n"); goto error; } /* parse body */ LM_DBG("body: %.*s\n", body.len, body.s); jdoc.buf = body; if(jdoc.root == NULL) { jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s); if(jdoc.root == NULL) { LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s); goto invalid; } } if (unlikely(strcmp(jdoc.root->child->string, "cells")==0)) { ht_dmq_handle_sync(&jdoc); } else { for(it=jdoc.root->child; it; it = it->next) { LM_DBG("found field: %s\n", it->string); if (strcmp(it->string, "action")==0) { action = SRJSON_GET_INT(it); } else if (strcmp(it->string, "htname")==0) { htname.s = it->valuestring; htname.len = strlen(htname.s); } else if (strcmp(it->string, "cname")==0) { cname.s = it->valuestring; cname.len = strlen(cname.s); } else if (strcmp(it->string, "type")==0) { type = SRJSON_GET_INT(it); } else if (strcmp(it->string, "strval")==0) { val.s.s = it->valuestring; val.s.len = strlen(val.s.s); } else if (strcmp(it->string, "intval")==0) { val.n = SRJSON_GET_INT(it); } else if (strcmp(it->string, "mode")==0) { mode = SRJSON_GET_INT(it); } else { LM_ERR("unrecognized field in json object\n"); goto invalid; } } if (unlikely(action == HT_DMQ_SYNC)) { ht_dmq_send_sync(dmq_node); } else { if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) { LM_ERR("failed to replay action\n"); goto error; } } } srjson_DestroyDoc(&jdoc); resp->reason = dmq_200_rpl; resp->resp_code = 200; return 0; invalid: srjson_DestroyDoc(&jdoc); resp->reason = dmq_400_rpl; resp->resp_code = 400; return 0; error: srjson_DestroyDoc(&jdoc); resp->reason = dmq_500_rpl; resp->resp_code = 500; return 0; } int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) { srjson_doc_t jdoc; LM_DBG("replicating action to dmq peers...\n"); srjson_InitDoc(&jdoc, NULL); jdoc.root = srjson_CreateObject(&jdoc); if(jdoc.root==NULL) { LM_ERR("cannot create json root\n"); goto error; } srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action); srjson_AddStrToObject(&jdoc, jdoc.root, "htname", htname->s, htname->len); if (cname!=NULL) { srjson_AddStrToObject(&jdoc, jdoc.root, "cname", cname->s, cname->len); } if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE || action==HT_DMQ_RM_CELL_RE || action==HT_DMQ_RM_CELL_SW) { srjson_AddNumberToObject(&jdoc, jdoc.root, "type", type); if (type&AVP_VAL_STR) { srjson_AddStrToObject(&jdoc, jdoc.root, "strval", val->s.s, val->s.len); } else { srjson_AddNumberToObject(&jdoc, jdoc.root, "intval", val->n); } } srjson_AddNumberToObject(&jdoc, jdoc.root, "mode", mode); jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); if(jdoc.buf.s!=NULL) { jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); if (ht_dmq_send(&jdoc.buf, 0)!=0) { goto error; } jdoc.free_fn(jdoc.buf.s); jdoc.buf.s = NULL; } else { LM_ERR("unable to serialize data\n"); goto error; } srjson_DestroyDoc(&jdoc); return 0; error: if(jdoc.buf.s!=NULL) { jdoc.free_fn(jdoc.buf.s); jdoc.buf.s = NULL; } srjson_DestroyDoc(&jdoc); return -1; } /* Replay DMQ action Return 0 for non-error. Allt other returns are parsed as error. */ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) { ht_t* ht; ht = ht_get_table(htname); if(ht==NULL) { LM_ERR("unable to get table\n"); return -1; } LM_DBG("replaying action %d on %.*s=>%.*s...\n", action, htname->len, htname->s, cname->len, cname->s); if (action==HT_DMQ_SET_CELL) { return ht_set_cell(ht, cname, type, val, mode); } else if (action==HT_DMQ_SET_CELL_EXPIRE) { return ht_set_cell_expire(ht, cname, 0, val); } else if (action==HT_DMQ_DEL_CELL) { return ht_del_cell(ht, cname); } else if (action==HT_DMQ_RM_CELL_RE) { return ht_rm_cell_re(&val->s, ht, mode); } else if (action==HT_DMQ_RM_CELL_SW) { return ht_rm_cell_op(&val->s, ht, mode, HT_RM_OP_SW); } else { LM_ERR("unrecognized action\n"); return -1; } } int ht_dmq_request_sync() { srjson_doc_t jdoc; LM_DBG("requesting sync from dmq peers\n"); srjson_InitDoc(&jdoc, NULL); jdoc.root = srjson_CreateObject(&jdoc); if(jdoc.root==NULL) { LM_ERR("cannot create json root\n"); goto error; } srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC); jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); if(jdoc.buf.s==NULL) { LM_ERR("unable to serialize data\n"); goto error; } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); if (ht_dmq_send(&jdoc.buf, 0)!=0) { goto error; } jdoc.free_fn(jdoc.buf.s); jdoc.buf.s = NULL; srjson_DestroyDoc(&jdoc); return 0; error: if(jdoc.buf.s!=NULL) { jdoc.free_fn(jdoc.buf.s); jdoc.buf.s = NULL; } srjson_DestroyDoc(&jdoc); return -1; } int ht_dmq_send_sync(dmq_node_t* node) { ht_t *ht; ht_cell_t *it; time_t now; int i; ht = ht_get_root(); if(ht==NULL) { LM_DBG("no htables to sync!\n"); return 0; } if (ht_dmq_cell_group_init() < 0) return -1; now = time(NULL); while (ht != NULL) { if (!ht->dmqreplicate) goto skip; for(i=0; i<ht->htsize; i++) { ht_slot_lock(ht, i); it = ht->entries[i].first; while(it) { if(ht->htexpire > 0) { if (it->expire <= now) { LM_DBG("skipping expired entry\n"); it = it->next; continue; } } if (ht_dmq_cell_group_write(&ht->name, it) < 0) { ht_slot_unlock(ht, i); goto error; } if (ht_dmq_jdoc_cell_group.size >= dmq_cell_group_max_size) { LM_DBG("sending group count[%d]size[%d]\n", ht_dmq_jdoc_cell_group.count, ht_dmq_jdoc_cell_group.size); if (ht_dmq_cell_group_flush(node) < 0) { ht_slot_unlock(ht, i); goto error; } } it = it->next; } ht_slot_unlock(ht, i); } skip: ht = ht->next; } if (ht_dmq_cell_group_flush(node) < 0) goto error; ht_dmq_cell_group_destroy(); return 0; error: ht_dmq_cell_group_destroy(); return -1; } int ht_dmq_handle_sync(srjson_doc_t* jdoc) { LM_DBG("handling sync\n"); srjson_t* cells = NULL; srjson_t* cell = NULL; srjson_t* it = NULL; str htname = STR_NULL; str cname = STR_NULL; int type = 0; int_str val = {0}; int expire = 0; ht_t* ht = NULL; time_t now = 0; cells = jdoc->root->child; cell = cells->child; now = time(NULL); while (cell) { for(it=cell->child; it; it = it->next) { if (strcmp(it->string, "htname")==0) { htname.s = it->valuestring; htname.len = strlen(htname.s); } else if (strcmp(it->string, "cname")==0) { cname.s = it->valuestring; cname.len = strlen(cname.s); } else if (strcmp(it->string, "type")==0) { type = SRJSON_GET_INT(it); } else if (strcmp(it->string, "strval")==0) { val.s.s = it->valuestring; val.s.len = strlen(val.s.s); } else if (strcmp(it->string, "intval")==0) { val.n = SRJSON_GET_INT(it); } else if (strcmp(it->string, "expire")==0) { expire = SRJSON_GET_INT(it); } else { LM_WARN("unrecognized field in json object\n"); } } if(htname.s!=NULL && htname.len>0 && cname.s!=NULL && cname.len>0) { ht = ht_get_table(&htname); if(ht==NULL) { LM_WARN("unable to get table %.*s\n", htname.len, (htname.s)?htname.s:""); } else { if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) { LM_WARN("unable to set cell %.*s in table %.*s\n", cname.len, cname.s, ht->name.len, ht->name.s); } } } cell = cell->next; } return 0; } /** * @brief dmq response callback */ int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) { LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param); return 0; }