src/modules/htable/ht_dmq.c
6a20bf80
 /**
79ad0e2d
  *
6a20bf80
  * Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
79ad0e2d
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
9e1ff448
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
6a20bf80
  *
  */
 
 
 #include "ht_dmq.h"
 #include "ht_api.h"
 
 typedef struct _ht_dmq_repdata {
 	int action;
 	str htname;
 	str cname;
 	int type;
 	int intval;
 	str strval;
 	int expire;
 } ht_dmq_repdata_t;
 
57f87fcb
 typedef struct _ht_dmq_jdoc_cell_group {
 	int count;
 	int size;
 	srjson_doc_t jdoc;
 	srjson_t *jdoc_cells;
 } ht_dmq_jdoc_cell_group_t;
 
 static str ht_dmq_content_type = str_init("application/json");
 static str dmq_200_rpl  = str_init("OK");
 static str dmq_400_rpl  = str_init("Bad Request");
 static str dmq_500_rpl  = str_init("Server Internal Error");
 static int dmq_cell_group_empty_size = 12; // {"cells":[]}
 static int dmq_cell_group_max_size = 60000;
 static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group;
02bff604
 extern int ht_dmq_init_sync;
57f87fcb
 
6a20bf80
 dmq_api_t ht_dmqb;
 dmq_peer_t* ht_dmq_peer = NULL;
 dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
 
57f87fcb
 int ht_dmq_send(str* body, dmq_node_t* node);
 int ht_dmq_send_sync(dmq_node_t* node);
 int ht_dmq_handle_sync(srjson_doc_t* jdoc);
 
 static int ht_dmq_cell_group_init(void) {
 
 	if (ht_dmq_jdoc_cell_group.jdoc.root)
 		return 0; // already initialised
 
 	ht_dmq_jdoc_cell_group.count = 0;
 	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
 
 	srjson_InitDoc(&ht_dmq_jdoc_cell_group.jdoc, NULL);
 
 	ht_dmq_jdoc_cell_group.jdoc.root = srjson_CreateObject(&ht_dmq_jdoc_cell_group.jdoc);
 	if (ht_dmq_jdoc_cell_group.jdoc.root==NULL) {
 		LM_ERR("cannot create json root object! \n");
 		return -1;
 	}
 
 	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
 	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
 		LM_ERR("cannot create json cells array! \n");
 		srjson_DestroyDoc(&ht_dmq_jdoc_cell_group.jdoc);
 		return -1;
 	}
 
 	return 0;
 }
 
 static int ht_dmq_cell_group_write(str* htname, ht_cell_t* ptr) {
 
 	// jsonify cell and add to array
 
 	str tmp;
 	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
 	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
 	srjson_t * jdoc_cell = srjson_CreateObject(jdoc);
 
 	if(!jdoc_cell) {
 		LM_ERR("cannot create cell json root\n");
 		return -1;
 	}
 
 	// add json overhead
 	if(ptr->flags&AVP_VAL_STR) {
 		ht_dmq_jdoc_cell_group.size += 54; // {"htname":"","cname":"","type":,"strval":"","expire":}
 	} else {
 		ht_dmq_jdoc_cell_group.size += 52; // {"htname":"","cname":"","type":,"intval":,"expire":}
 	}
 
 	srjson_AddStrToObject(jdoc, jdoc_cell, "htname", htname->s, htname->len);
 	ht_dmq_jdoc_cell_group.size += htname->len;
 
 	srjson_AddStrToObject(jdoc, jdoc_cell, "cname", ptr->name.s, ptr->name.len);
 	ht_dmq_jdoc_cell_group.size += ptr->name.len;
 
 	if (ptr->flags&AVP_VAL_STR) {
 		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", AVP_VAL_STR);
 		ht_dmq_jdoc_cell_group.size += 1;
 		srjson_AddStrToObject(jdoc, jdoc_cell, "strval", ptr->value.s.s, ptr->value.s.len);
 		ht_dmq_jdoc_cell_group.size += ptr->value.s.len;
 	} else {
 		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", 0);
 		ht_dmq_jdoc_cell_group.size += 1;
 		srjson_AddNumberToObject(jdoc, jdoc_cell, "intval", ptr->value.n);
 		tmp.s = sint2str((long)ptr->value.n, &tmp.len);
 		ht_dmq_jdoc_cell_group.size += tmp.len;
 	}
 
 	srjson_AddNumberToObject(jdoc, jdoc_cell, "expire", ptr->expire);
 	tmp.s = sint2str((long)ptr->expire, &tmp.len);
 	ht_dmq_jdoc_cell_group.size += tmp.len;
 
 	srjson_AddItemToArray(jdoc, jdoc_cells, jdoc_cell);
 
 	ht_dmq_jdoc_cell_group.count++;
 
 	return 0;
 }
 
 static int ht_dmq_cell_group_flush(dmq_node_t* node) {
 
 	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
 	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
a176ad4f
 	int ret = 0;
57f87fcb
 
 	srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells);
 
a176ad4f
 	LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
57f87fcb
 	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
 	if(jdoc->buf.s==NULL) {
 		LM_ERR("unable to serialize data\n");
a176ad4f
 		ret = -1;
 		goto cleanup;
57f87fcb
 	}
 	jdoc->buf.len = strlen(jdoc->buf.s);
 
 	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
 	if (ht_dmq_send(&jdoc->buf, node)!=0) {
 		LM_ERR("unable to send data\n");
a176ad4f
 		ret = -1;
57f87fcb
 	}
 
a176ad4f
 cleanup:
 
 	srjson_DeleteItemFromObject(jdoc, jdoc->root, "cells");
 	ht_dmq_jdoc_cell_group.count = 0;
 	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
 
 	if(jdoc->buf.s!=NULL) {
 		jdoc->free_fn(jdoc->buf.s);
 		jdoc->buf.s = NULL;
 	}
57f87fcb
 
 	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
 	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
 		LM_ERR("cannot re-create json cells array! \n");
a176ad4f
 		ret = -1;
57f87fcb
 	}
 
a176ad4f
 	return ret;
57f87fcb
 }
 
 static void ht_dmq_cell_group_destroy() {
 
 	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
 
 	if(jdoc->buf.s!=NULL) {
 		jdoc->free_fn(jdoc->buf.s);
 		jdoc->buf.s = NULL;
 	}
 	srjson_DestroyDoc(jdoc);
 
 }
 
6a20bf80
 /**
  * @brief add notification peer
  */
 int ht_dmq_initialize()
 {
 	dmq_peer_t not_peer;
 
79ad0e2d
 	/* load the DMQ API */
 	if (dmq_load_api(&ht_dmqb)!=0) {
 		LM_ERR("cannot load dmq api\n");
 		return -1;
 	} else {
 		LM_DBG("loaded dmq api\n");
 	}
6a20bf80
 
 	not_peer.callback = ht_dmq_handle_msg;
57f87fcb
 	not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL);
6a20bf80
 	not_peer.description.s = "htable";
 	not_peer.description.len = 6;
 	not_peer.peer_id.s = "htable";
 	not_peer.peer_id.len = 6;
 	ht_dmq_peer = ht_dmqb.register_dmq_peer(&not_peer);
 	if(!ht_dmq_peer) {
 		LM_ERR("error in register_dmq_peer\n");
 		goto error;
 	} else {
 		LM_DBG("dmq peer registered\n");
 	}
 	return 0;
 error:
 	return -1;
 }
 
57f87fcb
 int ht_dmq_send(str* body, dmq_node_t* node) {
79ad0e2d
 	if (!ht_dmq_peer) {
 		LM_ERR("ht_dmq_peer is null!\n");
 		return -1;
 	}
57f87fcb
 	if (node) {
 		LM_DBG("sending dmq message ...\n");
 		ht_dmqb.send_message(ht_dmq_peer, body, node,
 				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
 	} else {
 		LM_DBG("sending dmq broadcast...\n");
 		ht_dmqb.bcast_message(ht_dmq_peer, body, 0,
 				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
 	}
79ad0e2d
 	return 0;
6a20bf80
 }
 
 /**
  * @brief ht dmq callback
  */
c2dcf4db
 int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
6a20bf80
 {
 	int content_length;
 	str body;
 	ht_dmq_action_t action = HT_DMQ_NONE;
 	str htname, cname;
 	int type = 0, mode = 0;
 	int_str val;
 	srjson_doc_t jdoc;
 	srjson_t *it = NULL;
 
 	/* received dmq message */
 	LM_DBG("dmq message received\n");
79ad0e2d
 
13ff91b1
 	srjson_InitDoc(&jdoc, NULL);
 
6a20bf80
 	if(!msg->content_length) {
 		LM_ERR("no content length header found\n");
 		goto invalid;
 	}
 	content_length = get_content_length(msg);
 	if(!content_length) {
 		LM_DBG("content length is 0\n");
 		goto invalid;
 	}
 
 	body.s = get_body(msg);
 	body.len = content_length;
 
 	if (!body.s) {
 		LM_ERR("unable to get body\n");
 		goto error;
 	}
 
 	/* parse body */
79ad0e2d
 	LM_DBG("body: %.*s\n", body.len, body.s);
6a20bf80
 
 	jdoc.buf = body;
 
 	if(jdoc.root == NULL) {
 		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
 		if(jdoc.root == NULL)
 		{
 			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
 			goto invalid;
 		}
 	}
 
57f87fcb
 	if (unlikely(strcmp(jdoc.root->child->string, "cells")==0)) {
 		ht_dmq_handle_sync(&jdoc);
 	} else {
 
 		for(it=jdoc.root->child; it; it = it->next)
 		{
 			LM_DBG("found field: %s\n", it->string);
 			if (strcmp(it->string, "action")==0) {
 				action = SRJSON_GET_INT(it);
 			} else if (strcmp(it->string, "htname")==0) {
 				htname.s = it->valuestring;
 				htname.len = strlen(htname.s);
 			} else if (strcmp(it->string, "cname")==0) {
 				cname.s = it->valuestring;
 				cname.len = strlen(cname.s);
 			} else if (strcmp(it->string, "type")==0) {
 				type = SRJSON_GET_INT(it);
 			} else if (strcmp(it->string, "strval")==0) {
 				val.s.s = it->valuestring;
 				val.s.len = strlen(val.s.s);
 			} else if (strcmp(it->string, "intval")==0) {
 				val.n = SRJSON_GET_INT(it);
 			} else if (strcmp(it->string, "mode")==0) {
 				mode = SRJSON_GET_INT(it);
 			} else {
 				LM_ERR("unrecognized field in json object\n");
 				goto invalid;
 			}
 		}
 
 		if (unlikely(action == HT_DMQ_SYNC)) {
 			ht_dmq_send_sync(dmq_node);
6a20bf80
 		} else {
57f87fcb
 			if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
 				LM_ERR("failed to replay action\n");
 				goto error;
 			}
6a20bf80
 		}
 
 	}
 
 	srjson_DestroyDoc(&jdoc);
 	resp->reason = dmq_200_rpl;
 	resp->resp_code = 200;
 	return 0;
 
 invalid:
 	srjson_DestroyDoc(&jdoc);
 	resp->reason = dmq_400_rpl;
 	resp->resp_code = 400;
 	return 0;
 
 error:
 	srjson_DestroyDoc(&jdoc);
 	resp->reason = dmq_500_rpl;
79ad0e2d
 	resp->resp_code = 500;
6a20bf80
 	return 0;
 }
 
53664ba2
 int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname,
 		int type, int_str* val, int mode)
 {
6a20bf80
 
 	srjson_doc_t jdoc;
 
79ad0e2d
 	LM_DBG("replicating action to dmq peers...\n");
6a20bf80
 
 	srjson_InitDoc(&jdoc, NULL);
 
 	jdoc.root = srjson_CreateObject(&jdoc);
 	if(jdoc.root==NULL) {
 		LM_ERR("cannot create json root\n");
 		goto error;
 	}
 
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
 	srjson_AddStrToObject(&jdoc, jdoc.root, "htname", htname->s, htname->len);
 	if (cname!=NULL) {
 		srjson_AddStrToObject(&jdoc, jdoc.root, "cname", cname->s, cname->len);
 	}
 
53664ba2
 	if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE
 			|| action==HT_DMQ_RM_CELL_RE || action==HT_DMQ_RM_CELL_SW) {
6a20bf80
 		srjson_AddNumberToObject(&jdoc, jdoc.root, "type", type);
 		if (type&AVP_VAL_STR) {
 			srjson_AddStrToObject(&jdoc, jdoc.root, "strval", val->s.s, val->s.len);
 		} else {
 			srjson_AddNumberToObject(&jdoc, jdoc.root, "intval", val->n);
 		}
 	}
 
79ad0e2d
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "mode", mode);
6a20bf80
 
 	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
 	if(jdoc.buf.s!=NULL) {
 		jdoc.buf.len = strlen(jdoc.buf.s);
 		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
57f87fcb
 		if (ht_dmq_send(&jdoc.buf, 0)!=0) {
6a20bf80
 			goto error;
 		}
 		jdoc.free_fn(jdoc.buf.s);
 		jdoc.buf.s = NULL;
 	} else {
 		LM_ERR("unable to serialize data\n");
 		goto error;
 	}
 
 	srjson_DestroyDoc(&jdoc);
 	return 0;
 
 error:
 	if(jdoc.buf.s!=NULL) {
 		jdoc.free_fn(jdoc.buf.s);
 		jdoc.buf.s = NULL;
 	}
 	srjson_DestroyDoc(&jdoc);
 	return -1;
 }
 
4d06b1e4
 /* Replay DMQ action
 
 Return 0 for non-error. Allt other returns are parsed as error.
 */
53664ba2
 int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname,
 		int type, int_str* val, int mode) {
6a20bf80
 
 	ht_t* ht;
 	ht = ht_get_table(htname);
 	if(ht==NULL) {
 		LM_ERR("unable to get table\n");
 		return -1;
 	}
 
53664ba2
 	LM_DBG("replaying action %d on %.*s=>%.*s...\n", action,
 			htname->len, htname->s, cname->len, cname->s);
6a20bf80
 
 	if (action==HT_DMQ_SET_CELL) {
 		return ht_set_cell(ht, cname, type, val, mode);
 	} else if (action==HT_DMQ_SET_CELL_EXPIRE) {
 		return ht_set_cell_expire(ht, cname, 0, val);
 	} else if (action==HT_DMQ_DEL_CELL) {
 		return ht_del_cell(ht, cname);
 	} else if (action==HT_DMQ_RM_CELL_RE) {
 		return ht_rm_cell_re(&val->s, ht, mode);
53664ba2
 	} else if (action==HT_DMQ_RM_CELL_SW) {
 		return ht_rm_cell_op(&val->s, ht, mode, HT_RM_OP_SW);
6a20bf80
 	} else {
57f87fcb
 		LM_ERR("unrecognized action\n");
 		return -1;
 	}
 }
 
 int ht_dmq_request_sync() {
 
 	srjson_doc_t jdoc;
 
 	LM_DBG("requesting sync from dmq peers\n");
 	srjson_InitDoc(&jdoc, NULL);
 
 	jdoc.root = srjson_CreateObject(&jdoc);
 	if(jdoc.root==NULL) {
 		LM_ERR("cannot create json root\n");
 		goto error;
 	}
 
 	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC);
 	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
 	if(jdoc.buf.s==NULL) {
 		LM_ERR("unable to serialize data\n");
 		goto error;
 	}
 	jdoc.buf.len = strlen(jdoc.buf.s);
 	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
 	if (ht_dmq_send(&jdoc.buf, 0)!=0) {
 		goto error;
 	}
 
 	jdoc.free_fn(jdoc.buf.s);
 	jdoc.buf.s = NULL;
 	srjson_DestroyDoc(&jdoc);
 	return 0;
 
 error:
 	if(jdoc.buf.s!=NULL) {
 		jdoc.free_fn(jdoc.buf.s);
 		jdoc.buf.s = NULL;
 	}
 	srjson_DestroyDoc(&jdoc);
 	return -1;
 }
 
 int ht_dmq_send_sync(dmq_node_t* node) {
 	ht_t *ht;
 	ht_cell_t *it;
 	time_t now;
 	int i;
 
 	ht = ht_get_root();
 	if(ht==NULL)
 	{
 		LM_DBG("no htables to sync!\n");
 		return 0;
 	}
 
 	if (ht_dmq_cell_group_init() < 0)
6a20bf80
 		return -1;
57f87fcb
 
 	now = time(NULL);
 
 	while (ht != NULL)
 	{
 		if (!ht->dmqreplicate)
 			goto skip;
 
 		for(i=0; i<ht->htsize; i++)
 		{
 			ht_slot_lock(ht, i);
 			it = ht->entries[i].first;
 			while(it)
 			{
 				if(ht->htexpire > 0) {
 					if (it->expire <= now) {
 						LM_DBG("skipping expired entry\n");
 						it = it->next;
 						continue;
 					}
 				}
 
 				if (ht_dmq_cell_group_write(&ht->name, it) < 0) {
 					ht_slot_unlock(ht, i);
 					goto error;
 				}
 
 				if (ht_dmq_jdoc_cell_group.size >= dmq_cell_group_max_size) {
 					LM_DBG("sending group count[%d]size[%d]\n", ht_dmq_jdoc_cell_group.count, ht_dmq_jdoc_cell_group.size);
 					if (ht_dmq_cell_group_flush(node) < 0) {
 						ht_slot_unlock(ht, i);
 						goto error;
 					}
 				}
 
 				it = it->next;
 			}
 			ht_slot_unlock(ht, i);
 		}
 
 skip:
 		ht = ht->next;
6a20bf80
 	}
57f87fcb
 
 	if (ht_dmq_cell_group_flush(node) < 0)
 		goto error;
 
 	ht_dmq_cell_group_destroy();
 	return 0;
 
 error:
 	ht_dmq_cell_group_destroy();
 	return -1;
 }
 
 int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
 	LM_DBG("handling sync\n");
 
e0256096
 	srjson_t* cells = NULL;
 	srjson_t* cell = NULL;
 	srjson_t* it = NULL;
 	str htname = STR_NULL;
 	str cname = STR_NULL;
 	int type = 0;
 	int_str val = {0};
 	int expire = 0;
 	ht_t* ht = NULL;
 	time_t now = 0;
57f87fcb
 
 	cells = jdoc->root->child;
 	cell = cells->child;
 
 	now = time(NULL);
 
 	while (cell) {
 		for(it=cell->child; it; it = it->next) {
 			if (strcmp(it->string, "htname")==0) {
 				htname.s = it->valuestring;
 				htname.len = strlen(htname.s);
 			} else if (strcmp(it->string, "cname")==0) {
 				cname.s = it->valuestring;
 				cname.len = strlen(cname.s);
 			} else if (strcmp(it->string, "type")==0) {
 				type = SRJSON_GET_INT(it);
 			} else if (strcmp(it->string, "strval")==0) {
 				val.s.s = it->valuestring;
 				val.s.len = strlen(val.s.s);
 			} else if (strcmp(it->string, "intval")==0) {
 				val.n = SRJSON_GET_INT(it);
 			} else if (strcmp(it->string, "expire")==0) {
 				expire = SRJSON_GET_INT(it);
 			} else {
 				LM_WARN("unrecognized field in json object\n");
 			}
 		}
 
24c92c4f
 		if(htname.s!=NULL && htname.len>0 && cname.s!=NULL
 				&& cname.len>0) {
 			ht = ht_get_table(&htname);
 			if(ht==NULL) {
 				LM_WARN("unable to get table %.*s\n",
 						htname.len, (htname.s)?htname.s:"");
6f0b67b6
 			} else {
 				if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) {
 					LM_WARN("unable to set cell %.*s in table %.*s\n",
 							cname.len, cname.s, ht->name.len, ht->name.s);
 				}
24c92c4f
 			}
78225d06
 		}
57f87fcb
 
 		cell = cell->next;
 	}
 	return 0;
6a20bf80
 }
 
 /**
  * @brief dmq response callback
  */
 int ht_dmq_resp_callback_f(struct sip_msg* msg, int code,
 		dmq_node_t* node, void* param)
 {
 	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
 	return 0;
 }