/**
 * dispatcher module - load balancing
 *
 * Copyright (C) 2004-2005 FhG Fokus
 * Copyright (C) 2006 Voice Sistem SRL
 * Copyright (C) 2015 Daniel-Constantin Mierla (asipto.com)
 *
 * This file is part of Kamailio, a free SIP server.
 *
 * Kamailio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version
 *
 * Kamailio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */

/*! \file
 * \ingroup dispatcher
 * \brief Dispatcher :: Dispatch
 */

/*! \defgroup dispatcher Dispatcher :: Load balancing and failover module
 * 	The dispatcher module implements a set of functions for distributing SIP requests on a
 *	set of servers, but also grouping of server resources.
 *
 *	- The module has an internal API exposed to other modules.
 *	- The module implements a couple of MI functions for managing the list of server resources
 */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>

#include "../../lib/kmi/mi.h"
#include "../../sr_module.h"
#include "../../dprint.h"
#include "../../error.h"
#include "../../ut.h"
#include "../../route.h"
#include "../../mem/mem.h"
#include "../../mod_fix.h"
#include "../../rpc.h"
#include "../../rpc_lookup.h"

#include "ds_ht.h"
#include "dispatch.h"
#include "config.h"
#include "api.h"

MODULE_VERSION

#define DS_SET_ID_COL			"setid"
#define DS_DEST_URI_COL			"destination"
#define DS_DEST_FLAGS_COL		"flags"
#define DS_DEST_PRIORITY_COL	"priority"
#define DS_DEST_ATTRS_COL		"attrs"
#define DS_TABLE_NAME			"dispatcher"

/** parameters */
char *dslistfile = CFG_DIR"dispatcher.list";
int  ds_force_dst   = 1;
int  ds_flags       = 0;
int  ds_use_default = 0;
static str dst_avp_param = {NULL, 0};
static str grp_avp_param = {NULL, 0};
static str cnt_avp_param = {NULL, 0};
static str dstid_avp_param = {NULL, 0};
static str attrs_avp_param = {NULL, 0};
static str sock_avp_param = {NULL, 0};
str hash_pvar_param = {NULL, 0};

int_str dst_avp_name;
unsigned short dst_avp_type;
int_str grp_avp_name;
unsigned short grp_avp_type;
int_str cnt_avp_name;
unsigned short cnt_avp_type;
int_str dstid_avp_name;
unsigned short dstid_avp_type;
int_str attrs_avp_name;
unsigned short attrs_avp_type;
int_str sock_avp_name;
unsigned short sock_avp_type;

pv_elem_t * hash_param_model = NULL;

int probing_threshold = 1; /* number of failed requests, before a destination
							   is taken into probing */
int inactive_threshold = 1; /* number of replied requests, before a destination
							   is taken into back in active state */
str ds_ping_method = str_init("OPTIONS");
str ds_ping_from   = str_init("sip:dispatcher@localhost");
static int ds_ping_interval = 0;
int ds_probing_mode  = DS_PROBE_NONE;

static str ds_ping_reply_codes_str= {NULL, 0};
static int** ds_ping_reply_codes = NULL;
static int* ds_ping_reply_codes_cnt;

str ds_default_socket       = {NULL, 0};
struct socket_info * ds_default_sockinfo = NULL;

int ds_hash_size = 0;
int ds_hash_expire = 7200;
int ds_hash_initexpire = 7200;
int ds_hash_check_interval = 30;

str ds_outbound_proxy = {0, 0};

/* tm */
struct tm_binds tmb;

/*db */
str ds_db_url            = {NULL, 0};
str ds_set_id_col        = str_init(DS_SET_ID_COL);
str ds_dest_uri_col      = str_init(DS_DEST_URI_COL);
str ds_dest_flags_col    = str_init(DS_DEST_FLAGS_COL);
str ds_dest_priority_col = str_init(DS_DEST_PRIORITY_COL);
str ds_dest_attrs_col    = str_init(DS_DEST_ATTRS_COL);
str ds_table_name        = str_init(DS_TABLE_NAME);

str ds_setid_pvname   = {NULL, 0};
pv_spec_t ds_setid_pv;
str ds_attrs_pvname   = {NULL, 0};
pv_spec_t ds_attrs_pv;

/** module functions */
static int mod_init(void);
static int child_init(int);

static int ds_parse_reply_codes();
static int ds_init_rpc(void);

static int w_ds_select_dst(struct sip_msg*, char*, char*);
static int w_ds_select_dst_limit(struct sip_msg*, char*, char*, char*);
static int w_ds_select_domain(struct sip_msg*, char*, char*);
static int w_ds_select_domain_limit(struct sip_msg*, char*, char*, char*);
static int w_ds_next_dst(struct sip_msg*, char*, char*);
static int w_ds_next_domain(struct sip_msg*, char*, char*);
static int w_ds_mark_dst0(struct sip_msg*, char*, char*);
static int w_ds_mark_dst1(struct sip_msg*, char*, char*);
static int w_ds_load_unset(struct sip_msg*, char*, char*);
static int w_ds_load_update(struct sip_msg*, char*, char*);

static int w_ds_is_from_list0(struct sip_msg*, char*, char*);
static int w_ds_is_from_list1(struct sip_msg*, char*, char*);
static int w_ds_is_from_list2(struct sip_msg*, char*, char*);
static int w_ds_is_from_list3(struct sip_msg*, char*, char*, char*);
static int w_ds_list_exist(struct sip_msg*, char*);

static int fixup_ds_is_from_list(void** param, int param_no);
static int fixup_ds_list_exist(void** param,int param_no);

static void destroy(void);

static int ds_warn_fixup(void** param, int param_no);

static struct mi_root* ds_mi_set(struct mi_root* cmd, void* param);
static struct mi_root* ds_mi_list(struct mi_root* cmd, void* param);
static struct mi_root* ds_mi_reload(struct mi_root* cmd_tree, void* param);
static int mi_child_init(void);

static cmd_export_t cmds[]={
	{"ds_select_dst",    (cmd_function)w_ds_select_dst,    2,
		fixup_igp_igp, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"ds_select_dst",    (cmd_function)w_ds_select_dst_limit,    3,
		fixup_igp_null, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"ds_select_domain", (cmd_function)w_ds_select_domain, 2,
		fixup_igp_igp, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"ds_select_domain", (cmd_function)w_ds_select_domain_limit, 3,
		fixup_igp_null, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"ds_next_dst",      (cmd_function)w_ds_next_dst,      0,
		ds_warn_fixup, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"ds_next_domain",   (cmd_function)w_ds_next_domain,   0,
		ds_warn_fixup, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"ds_mark_dst",      (cmd_function)w_ds_mark_dst0,     0,
		ds_warn_fixup, 0, REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE},
	{"ds_mark_dst",      (cmd_function)w_ds_mark_dst1,     1,
		ds_warn_fixup, 0, REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE},
	{"ds_is_from_list",  (cmd_function)w_ds_is_from_list0, 0,
		0, 0, REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE|BRANCH_ROUTE},
	{"ds_is_from_list",  (cmd_function)w_ds_is_from_list1, 1,
		fixup_igp_null, 0, ANY_ROUTE},
	{"ds_is_from_list",  (cmd_function)w_ds_is_from_list2, 2,
		fixup_ds_is_from_list, 0, ANY_ROUTE},
	{"ds_is_from_list",  (cmd_function)w_ds_is_from_list3, 3,
		fixup_ds_is_from_list, 0, ANY_ROUTE},
	{"ds_list_exist",  (cmd_function)w_ds_list_exist, 1,
		fixup_ds_list_exist, 0, ANY_ROUTE},
	{"ds_load_unset",    (cmd_function)w_ds_load_unset,   0,
		0, 0, ANY_ROUTE},
	{"ds_load_update",   (cmd_function)w_ds_load_update,  0,
		0, 0, ANY_ROUTE},
	{"bind_dispatcher",   (cmd_function)bind_dispatcher,  0,
		0, 0, 0},
	{0,0,0,0,0,0}
};


static param_export_t params[]={
	{"list_file",       PARAM_STRING, &dslistfile},
	{"db_url",		    PARAM_STR, &ds_db_url},
	{"table_name", 	    PARAM_STR, &ds_table_name},
	{"setid_col",       PARAM_STR, &ds_set_id_col},
	{"destination_col", PARAM_STR, &ds_dest_uri_col},
	{"flags_col",       PARAM_STR, &ds_dest_flags_col},
	{"priority_col",    PARAM_STR, &ds_dest_priority_col},
	{"attrs_col",       PARAM_STR, &ds_dest_attrs_col},
	{"force_dst",       INT_PARAM, &ds_force_dst},
	{"flags",           INT_PARAM, &ds_flags},
	{"use_default",     INT_PARAM, &ds_use_default},
	{"dst_avp",         PARAM_STR, &dst_avp_param},
	{"grp_avp",         PARAM_STR, &grp_avp_param},
	{"cnt_avp",         PARAM_STR, &cnt_avp_param},
	{"dstid_avp",       PARAM_STR, &dstid_avp_param},
	{"attrs_avp",       PARAM_STR, &attrs_avp_param},
	{"sock_avp",        PARAM_STR, &sock_avp_param},
	{"hash_pvar",       PARAM_STR, &hash_pvar_param},
	{"setid_pvname",    PARAM_STR, &ds_setid_pvname},
	{"attrs_pvname",    PARAM_STR, &ds_attrs_pvname},
	{"ds_probing_threshold", INT_PARAM, &probing_threshold},
	{"ds_inactive_threshold", INT_PARAM, &inactive_threshold},
	{"ds_ping_method",     PARAM_STR, &ds_ping_method},
	{"ds_ping_from",       PARAM_STR, &ds_ping_from},
	{"ds_ping_interval",   INT_PARAM, &ds_ping_interval},
	{"ds_ping_reply_codes", PARAM_STR, &ds_ping_reply_codes_str},
	{"ds_probing_mode",    INT_PARAM, &ds_probing_mode},
	{"ds_hash_size",       INT_PARAM, &ds_hash_size},
	{"ds_hash_expire",     INT_PARAM, &ds_hash_expire},
	{"ds_hash_initexpire", INT_PARAM, &ds_hash_initexpire},
	{"ds_hash_check_interval", INT_PARAM, &ds_hash_check_interval},
	{"outbound_proxy",  PARAM_STR, &ds_outbound_proxy},
	{"ds_default_socket",  PARAM_STR, &ds_default_socket},
	{0,0,0}
};


static mi_export_t mi_cmds[] = {
	{ "ds_set_state",   ds_mi_set,     0,                 0,  0            },
	{ "ds_list",        ds_mi_list,    MI_NO_INPUT_FLAG,  0,  0            },
	{ "ds_reload",      ds_mi_reload,  0,                 0,  mi_child_init},
	{ 0, 0, 0, 0, 0}
};


/** module exports */
struct module_exports exports= {
	"dispatcher",
	DEFAULT_DLFLAGS, /* dlopen flags */
	cmds,
	params,
	0,          /* exported statistics */
	mi_cmds,    /* exported MI functions */
	0,          /* exported pseudo-variables */
	0,          /* extra processes */
	mod_init,   /* module initialization function */
	0,
	(destroy_function) destroy,
	child_init  /* per-child init function */
};

/**
 * init module function
 */
static int mod_init(void)
{
	pv_spec_t avp_spec;
	str host;
	int port, proto;

	if(register_mi_mod(exports.name, mi_cmds)!=0)
	{
		LM_ERR("failed to register MI commands\n");
		return -1;
	}
	if(ds_init_rpc()<0)
	{
		LM_ERR("failed to register RPC commands\n");
		return -1;
	}

	if(cfg_declare("dispatcher", dispatcher_cfg_def,
				&default_dispatcher_cfg, cfg_sizeof(dispatcher),
				&dispatcher_cfg)){
		LM_ERR("Fail to declare the configuration\n");
		return -1;
	}

	/* Initialize the counter */
	ds_ping_reply_codes = (int**)shm_malloc(sizeof(unsigned int*));
	*ds_ping_reply_codes = 0;
	ds_ping_reply_codes_cnt = (int*)shm_malloc(sizeof(int));
	*ds_ping_reply_codes_cnt = 0;
	if(ds_ping_reply_codes_str.s) {
		cfg_get(dispatcher, dispatcher_cfg, ds_ping_reply_codes_str)
			= ds_ping_reply_codes_str;
		if(ds_parse_reply_codes()< 0)
		{
			return -1;
		}
	}
	/* copy threshholds to config */
	cfg_get(dispatcher, dispatcher_cfg, probing_threshold)
		= probing_threshold;
	cfg_get(dispatcher, dispatcher_cfg, inactive_threshold)
		= inactive_threshold;

	if (ds_default_socket.s && ds_default_socket.len > 0) {
		if (parse_phostport( ds_default_socket.s, &host.s, &host.len,
				&port, &proto)!=0) {
			LM_ERR("bad socket <%.*s>\n", ds_default_socket.len, ds_default_socket.s);
			return -1;
		}
		ds_default_sockinfo = grep_sock_info( &host, (unsigned short)port, proto);
		if (ds_default_sockinfo==0) {
			LM_WARN("non-local socket <%.*s>\n", ds_default_socket.len, ds_default_socket.s);
			return -1;
		}
		LM_INFO("default dispatcher socket set to <%.*s>\n", ds_default_socket.len, ds_default_socket.s);
	}

	if(init_data()!= 0)
		return -1;

	if(ds_db_url.s)
	{
		if(init_ds_db()!= 0)
		{
			LM_ERR("could not initiate a connect to the database\n");
			return -1;
		}
	} else {
		if(ds_load_list(dslistfile)!=0) {
			LM_ERR("no dispatching list loaded from file\n");
			return -1;
		} else {
			LM_DBG("loaded dispatching list\n");
		}
	}

	if (dst_avp_param.s && dst_avp_param.len > 0)
	{
		if (pv_parse_spec(&dst_avp_param, &avp_spec)==0
				|| avp_spec.type!=PVT_AVP)
		{
			LM_ERR("malformed or non AVP %.*s AVP definition\n",
					dst_avp_param.len, dst_avp_param.s);
			return -1;
		}

		if(pv_get_avp_name(0, &(avp_spec.pvp), &dst_avp_name, &dst_avp_type)!=0)
		{
			LM_ERR("[%.*s]- invalid AVP definition\n", dst_avp_param.len,
					dst_avp_param.s);
			return -1;
		}
	} else {
		dst_avp_name.n = 0;
		dst_avp_type = 0;
	}
	if (grp_avp_param.s && grp_avp_param.len > 0)
	{
		if (pv_parse_spec(&grp_avp_param, &avp_spec)==0
				|| avp_spec.type!=PVT_AVP)
		{
			LM_ERR("malformed or non AVP %.*s AVP definition\n",
					grp_avp_param.len, grp_avp_param.s);
			return -1;
		}

		if(pv_get_avp_name(0, &(avp_spec.pvp), &grp_avp_name, &grp_avp_type)!=0)
		{
			LM_ERR("[%.*s]- invalid AVP definition\n", grp_avp_param.len,
					grp_avp_param.s);
			return -1;
		}
	} else {
		grp_avp_name.n = 0;
		grp_avp_type = 0;
	}
	if (cnt_avp_param.s && cnt_avp_param.len > 0)
	{
		if (pv_parse_spec(&cnt_avp_param, &avp_spec)==0
				|| avp_spec.type!=PVT_AVP)
		{
			LM_ERR("malformed or non AVP %.*s AVP definition\n",
					cnt_avp_param.len, cnt_avp_param.s);
			return -1;
		}

		if(pv_get_avp_name(0, &(avp_spec.pvp), &cnt_avp_name, &cnt_avp_type)!=0)
		{
			LM_ERR("[%.*s]- invalid AVP definition\n", cnt_avp_param.len,
					cnt_avp_param.s);
			return -1;
		}
	} else {
		cnt_avp_name.n = 0;
		cnt_avp_type = 0;
	}
	if (dstid_avp_param.s && dstid_avp_param.len > 0)
	{
		if (pv_parse_spec(&dstid_avp_param, &avp_spec)==0
				|| avp_spec.type!=PVT_AVP)
		{
			LM_ERR("malformed or non AVP %.*s AVP definition\n",
					dstid_avp_param.len, dstid_avp_param.s);
			return -1;
		}

		if(pv_get_avp_name(0, &(avp_spec.pvp), &dstid_avp_name,
					&dstid_avp_type)!=0)
		{
			LM_ERR("[%.*s]- invalid AVP definition\n", dstid_avp_param.len,
					dstid_avp_param.s);
			return -1;
		}
	} else {
		dstid_avp_name.n = 0;
		dstid_avp_type = 0;
	}

	if (attrs_avp_param.s && attrs_avp_param.len > 0)
	{
		if (pv_parse_spec(&attrs_avp_param, &avp_spec)==0
				|| avp_spec.type!=PVT_AVP)
		{
			LM_ERR("malformed or non AVP %.*s AVP definition\n",
					attrs_avp_param.len, attrs_avp_param.s);
			return -1;
		}

		if(pv_get_avp_name(0, &(avp_spec.pvp), &attrs_avp_name,
					&attrs_avp_type)!=0)
		{
			LM_ERR("[%.*s]- invalid AVP definition\n", attrs_avp_param.len,
					attrs_avp_param.s);
			return -1;
		}
	} else {
		attrs_avp_name.n = 0;
		attrs_avp_type = 0;
	}

	if (sock_avp_param.s && sock_avp_param.len > 0)
	{
		if (pv_parse_spec(&sock_avp_param, &avp_spec)==0
				|| avp_spec.type!=PVT_AVP)
		{
			LM_ERR("malformed or non AVP %.*s AVP definition\n",
					sock_avp_param.len, sock_avp_param.s);
			return -1;
		}

		if(pv_get_avp_name(0, &(avp_spec.pvp), &sock_avp_name,
					&sock_avp_type)!=0)
		{
			LM_ERR("[%.*s]- invalid AVP definition\n", sock_avp_param.len,
					sock_avp_param.s);
			return -1;
		}
	} else {
		sock_avp_name.n = 0;
		sock_avp_type = 0;
	}

	if (hash_pvar_param.s && *hash_pvar_param.s) {
		if(pv_parse_format(&hash_pvar_param, &hash_param_model) < 0
				|| hash_param_model==NULL) {
			LM_ERR("malformed PV string: %s\n", hash_pvar_param.s);
			return -1;
		}
	} else {
		hash_param_model = NULL;
	}

	if(ds_setid_pvname.s!=0)
	{
		if(pv_parse_spec(&ds_setid_pvname, &ds_setid_pv)==NULL
				|| !pv_is_w(&ds_setid_pv))
		{
			LM_ERR("[%s]- invalid setid_pvname\n", ds_setid_pvname.s);
			return -1;
		}
	}

	if(ds_attrs_pvname.s!=0)
	{
		if(pv_parse_spec(&ds_attrs_pvname, &ds_attrs_pv)==NULL
				|| !pv_is_w(&ds_attrs_pv))
		{
			LM_ERR("[%s]- invalid attrs_pvname\n", ds_attrs_pvname.s);
			return -1;
		}
	}

	if (dstid_avp_param.s && dstid_avp_param.len > 0)
	{
		if(ds_hash_size>0)
		{
			if(ds_hash_load_init(1<<ds_hash_size, ds_hash_expire,
						ds_hash_initexpire)<0)
				return -1;
			register_timer(ds_ht_timer, NULL, ds_hash_check_interval);
		} else {
			LM_ERR("call load dispatching DSTID_AVP set but no size"
					" for hash table\n");
			return -1;
		}
	}
	/* Only, if the Probing-Timer is enabled the TM-API needs to be loaded: */
	if (ds_ping_interval > 0)
	{
		/*****************************************************
		 * TM-Bindings
		 *****************************************************/
		if (load_tm_api( &tmb ) == -1)
		{
			LM_ERR("could not load the TM-functions - disable DS ping\n");
			return -1;
		}
		/*****************************************************
		 * Register the PING-Timer
		 *****************************************************/
		register_timer(ds_check_timer, NULL, ds_ping_interval);
	}

	return 0;
}

/*! \brief
 * Initialize children
 */
static int child_init(int rank)
{
	srand((11+rank)*getpid()*7);

	return 0;
}

static int mi_child_init(void)
{

	if(ds_db_url.s)
		return ds_connect_db();
	return 0;

}

/*! \brief
 * destroy function
 */
static void destroy(void)
{
	ds_destroy_list();
	if(ds_db_url.s)
		ds_disconnect_db();
	ds_hash_load_destroy();
	if(ds_ping_reply_codes)
		shm_free(ds_ping_reply_codes);

}

#define GET_VALUE(param_name,param,i_value,s_value,value_flags) do{ \
	if(get_is_fparam(&(i_value), &(s_value), msg, (fparam_t*)(param), &(value_flags))!=0) { \
		LM_ERR("no %s value\n", (param_name)); \
		return -1; \
	} \
}while(0)

/*! \brief
 * parses string to dispatcher dst flags set
 * returns <0 on failure or int with flag on success.
 */
int ds_parse_flags( char* flag_str, int flag_len )
{
	int flag = 0;
	int i;

	for ( i=0; i<flag_len; i++)
	{
		if(flag_str[i]=='a' || flag_str[i]=='A') {
			flag &= ~(DS_STATES_ALL);
		} else if(flag_str[i]=='i' || flag_str[i]=='I') {
			flag |= DS_INACTIVE_DST;
		} else if(flag_str[i]=='d' || flag_str[i]=='D') {
			flag |= DS_DISABLED_DST;
		} else if(flag_str[i]=='t' || flag_str[i]=='T') {
			flag |= DS_TRYING_DST;
		} else if(flag_str[i]=='p' || flag_str[i]=='P') {
			flag |= DS_PROBING_DST;
		} else {
			flag = -1;
			break;
		}
	}

	return flag;
}

/**
 *
 */
static int w_ds_select(struct sip_msg* msg, char* set, char* alg, char* limit, int mode)
{
	unsigned int algo_flags, set_flags, limit_flags;
	str s_algo = STR_NULL;
	str s_set = STR_NULL;
	str s_limit = STR_NULL;
	int a, s, l;
	if(msg==NULL)
		return -1;

	GET_VALUE("destination set", set, s, s_set, set_flags);
	if (!(set_flags&PARAM_INT)) {
		if (set_flags&PARAM_STR)
			LM_ERR("unable to get destination set from [%.*s]\n", s_set.len, s_set.s);
		else
			LM_ERR("unable to get destination set\n");
		return -1;
	}
	GET_VALUE("algorithm", alg, a, s_algo, algo_flags);
	if (!(algo_flags&PARAM_INT)) {
		if (algo_flags&PARAM_STR)
			LM_ERR("unable to get algorithm from [%.*s]\n", s_algo.len, s_algo.s);
		else
			LM_ERR("unable to get algorithm\n");
		return -1;
	}

	if (limit) {
		GET_VALUE("limit", limit, l, s_limit, limit_flags);
		if (!(limit_flags&PARAM_INT)) {
			if (limit_flags&PARAM_STR)
				LM_ERR("unable to get dst number limit from [%.*s]\n", s_limit.len, s_limit.s);
			else
				LM_ERR("unable to get dst number limit\n");
			return -1;
		}
	} else {
		l = -1; /* will be casted to a rather big unsigned value */
	}

	return ds_select_dst_limit(msg, s, a, (unsigned int)l, mode);
}
/**
 *
 */
static int w_ds_select_dst(struct sip_msg* msg, char* set, char* alg)
{
	return w_ds_select(msg, set, alg, 0 /* limit number of dst*/, 0 /*set dst uri*/);
}

/**
 *
 */
static int w_ds_select_dst_limit(struct sip_msg* msg, char* set, char* alg, char* limit)
{
	return w_ds_select(msg, set, alg, limit /* limit number of dst*/, 0 /*set dst uri*/);
}

/**
 *
 */
static int w_ds_select_domain(struct sip_msg* msg, char* set, char* alg)
{
	return w_ds_select(msg, set, alg, 0 /* limit number of dst*/, 1 /*set host port*/);
}

/**
 *
 */
static int w_ds_select_domain_limit(struct sip_msg* msg, char* set, char* alg, char* limit)
{
	return w_ds_select(msg, set, alg, limit /* limit number of dst*/, 1 /*set host port*/);
}

/**
 *
 */
static int w_ds_next_dst(struct sip_msg *msg, char *str1, char *str2)
{
	return ds_next_dst(msg, 0/*set dst uri*/);
}

/**
 *
 */
static int w_ds_next_domain(struct sip_msg *msg, char *str1, char *str2)
{
	return ds_next_dst(msg, 1/*set host port*/);
}

/**
 *
 */
static int w_ds_mark_dst0(struct sip_msg *msg, char *str1, char *str2)
{
	int state;

	state = DS_INACTIVE_DST;
	if (ds_probing_mode==DS_PROBE_ALL)
		state |= DS_PROBING_DST;

	return ds_mark_dst(msg, state);
}

/**
 *
 */
static int w_ds_mark_dst1(struct sip_msg *msg, char *str1, char *str2)
{
	int state;

	if(str1==NULL)
		return w_ds_mark_dst0(msg, NULL, NULL);

	state = ds_parse_flags( str1, strlen(str1) );

	if ( state < 0 )
	{
		LM_WARN("Failed to parse flag: %s", str1 );
		return -1;
	}

	return ds_mark_dst(msg, state);
}


/**
 *
 */
static int w_ds_load_unset(struct sip_msg *msg, char *str1, char *str2)
{
	if(ds_load_unset(msg)<0)
		return -1;
	return 1;
}

/**
 *
 */
static int w_ds_load_update(struct sip_msg *msg, char *str1, char *str2)
{
	if(ds_load_update(msg)<0)
		return -1;
	return 1;
}

/**
 *
 */
static int ds_warn_fixup(void** param, int param_no)
{
	if(!dst_avp_param.s || !grp_avp_param.s || !cnt_avp_param.s || !sock_avp_param.s)
	{
		LM_ERR("failover functions used, but required AVP parameters"
				" are NULL -- feature disabled\n");
	}
	return 0;
}

/************************** MI STUFF ************************/

static struct mi_root* ds_mi_set(struct mi_root* cmd_tree, void* param)
{
	str sp;
	int ret;
	unsigned int group;
	int state;
	struct mi_node* node;

	node = cmd_tree->node.kids;
	if(node == NULL)
		return init_mi_tree(400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN);
	sp = node->value;
	if(sp.len<=0 || !sp.s)
	{
		LM_ERR("bad state value\n");
		return init_mi_tree(500, "bad state value", 15);
	}

	state = ds_parse_flags(sp.s, sp.len);
	if( state < 0 )
	{
		LM_ERR("unknow state value\n");
		return init_mi_tree(500, "unknown state value", 19);
	}
	node = node->next;
	if(node == NULL)
		return init_mi_tree(400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN);
	sp = node->value;
	if(sp.s == NULL)
	{
		return init_mi_tree(500, "group not found", 15);
	}

	if(str2int(&sp, &group))
	{
		LM_ERR("bad group value\n");
		return init_mi_tree( 500, "bad group value", 16);
	}

	node= node->next;
	if(node == NULL)
		return init_mi_tree( 400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN);

	sp = node->value;
	if(sp.s == NULL)
	{
		return init_mi_tree(500,"address not found", 18 );
	}

	ret = ds_reinit_state(group, &sp, state);

	if(ret!=0)
	{
		return init_mi_tree(404, "destination not found", 21);
	}

	return init_mi_tree( 200, MI_OK_S, MI_OK_LEN);
}




static struct mi_root* ds_mi_list(struct mi_root* cmd_tree, void* param)
{
	struct mi_root* rpl_tree;

	rpl_tree = init_mi_tree(200, MI_OK_S, MI_OK_LEN);
	if (rpl_tree==NULL)
		return 0;

	if( ds_print_mi_list(&rpl_tree->node)< 0 )
	{
		LM_ERR("failed to add node\n");
		free_mi_tree(rpl_tree);
		return 0;
	}

	return rpl_tree;
}

#define MI_ERR_RELOAD 			"ERROR Reloading data"
#define MI_ERR_RELOAD_LEN 		(sizeof(MI_ERR_RELOAD)-1)
#define MI_NOT_SUPPORTED		"DB mode not configured"
#define MI_NOT_SUPPORTED_LEN 	(sizeof(MI_NOT_SUPPORTED)-1)
#define MI_ERR_DSLOAD			"No reload support for call load dispatching"
#define MI_ERR_DSLOAD_LEN		(sizeof(MI_ERR_DSLOAD)-1)

static struct mi_root* ds_mi_reload(struct mi_root* cmd_tree, void* param)
{
	if(!ds_db_url.s) {
		if (ds_load_list(dslistfile)!=0)
			return init_mi_tree(500, MI_ERR_RELOAD, MI_ERR_RELOAD_LEN);
	} else {
		if(ds_reload_db()<0)
			return init_mi_tree(500, MI_ERR_RELOAD, MI_ERR_RELOAD_LEN);
	}
	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
}


static int w_ds_is_from_list0(struct sip_msg *msg, char *str1, char *str2)
{
	return ds_is_from_list(msg, -1);
}


static int w_ds_is_from_list1(struct sip_msg *msg, char *set, char *str2)
{
	int s;
	if(fixup_get_ivalue(msg, (gparam_p)set, &s)!=0)
	{
		LM_ERR("cannot get set id value\n");
		return -1;
	}
	return ds_is_from_list(msg, s);
}

static int w_ds_is_from_list2(struct sip_msg *msg, char *set, char *mode)
{
	int vset;
	int vmode;

	if(fixup_get_ivalue(msg, (gparam_t*)set, &vset)!=0)
	{
		LM_ERR("cannot get set id value\n");
		return -1;
	}
	if(fixup_get_ivalue(msg, (gparam_t*)mode, &vmode)!=0)
	{
		LM_ERR("cannot get mode value\n");
		return -1;
	}

	return ds_is_addr_from_list(msg, vset, NULL, vmode);
}

static int w_ds_is_from_list3(struct sip_msg *msg, char *set, char *mode, char *uri)
{
	int vset;
	int vmode;
	str suri;

	if(fixup_get_ivalue(msg, (gparam_t*)set, &vset)!=0)
	{
		LM_ERR("cannot get set id value\n");
		return -1;
	}
	if(fixup_get_ivalue(msg, (gparam_t*)mode, &vmode)!=0)
	{
		LM_ERR("cannot get mode value\n");
		return -1;
	}
	if(fixup_get_svalue(msg, (gparam_t*)uri, &suri)!=0)
	{
		LM_ERR("cannot get uri value\n");
		return -1;
	}

	return ds_is_addr_from_list(msg, vset, &suri, vmode);
}


static int fixup_ds_is_from_list(void** param, int param_no)
{
	if(param_no==1 || param_no==2)
		return fixup_igp_null(param, 1);
	if(param_no==3)
		return fixup_spve_null(param, 1);
	return 0;
}

/* Check if a given set exist in memory */
static int w_ds_list_exist(struct sip_msg *msg, char *param)
{
	int set;

	if(fixup_get_ivalue(msg, (gparam_p)param, &set)!=0)
	{
		LM_ERR("cannot get set id param value\n");
		return -1;
	}
	LM_DBG("--- Looking for dispatcher set %d\n", set);
	return ds_list_exist(set);
}

static int fixup_ds_list_exist(void** param, int param_no)
{
	return fixup_igp_null(param, param_no);
	return 0;
}

static int ds_parse_reply_codes() {
	param_t* params_list = NULL;
	param_t *pit=NULL;
	int list_size = 0;
	int i = 0;
	int pos = 0;
	int code = 0;
	str input = {0, 0};
	int* ds_ping_reply_codes_new = NULL;
	int* ds_ping_reply_codes_old = NULL;

	/* Validate String: */
	if (cfg_get(dispatcher, dispatcher_cfg, ds_ping_reply_codes_str).s == 0
			|| cfg_get(dispatcher, dispatcher_cfg, ds_ping_reply_codes_str).len<=0)
		return 0;

	/* parse_params will modify the string pointer of .s, so we need to make a copy. */
	input.s = cfg_get(dispatcher, dispatcher_cfg, ds_ping_reply_codes_str).s;
	input.len = cfg_get(dispatcher, dispatcher_cfg, ds_ping_reply_codes_str).len;

	/* Parse the parameters: */
	if (parse_params(&input, CLASS_ANY, 0, &params_list)<0)
		return -1;

	/* Get the number of entries in the list */
	for (pit = params_list; pit; pit=pit->next)
	{
		if (pit->name.len==4
				&& strncasecmp(pit->name.s, "code", 4)==0) {
			str2sint(&pit->body, &code);
			if ((code >= 100) && (code < 700))
				list_size += 1;
		} else if (pit->name.len==5
				&& strncasecmp(pit->name.s, "class", 5)==0) {
			str2sint(&pit->body, &code);
			if ((code >= 1) && (code < 7))
				list_size += 100;
		}
	}
	LM_DBG("Should be %d Destinations.\n", list_size);

	if (list_size > 0) {
		/* Allocate Memory for the new list: */
		ds_ping_reply_codes_new = (int*)shm_malloc(list_size * sizeof(int));
		if(ds_ping_reply_codes_new== NULL)
		{
			free_params(params_list);
			LM_ERR("no more memory\n");
			return -1;
		}

		/* Now create the list of valid reply-codes: */
		for (pit = params_list; pit; pit=pit->next)
		{
			if (pit->name.len==4
					&& strncasecmp(pit->name.s, "code", 4)==0) {
				str2sint(&pit->body, &code);
				if ((code >= 100) && (code < 700))
					ds_ping_reply_codes_new[pos++] = code;
			} else if (pit->name.len==5
					&& strncasecmp(pit->name.s, "class", 5)==0) {
				str2sint(&pit->body, &code);
				if ((code >= 1) && (code < 7)) {
					/* Add every code from this class, e.g. 100 to 199 */
					for (i = (code*100); i <= ((code*100)+99); i++)
						ds_ping_reply_codes_new[pos++] = i;
				}
			}
		}
	} else {
		ds_ping_reply_codes_new = 0;
	}
	free_params(params_list);

	/* More reply-codes? Change Pointer and then set number of codes. */
	if (list_size > *ds_ping_reply_codes_cnt) {
		// Copy Pointer
		ds_ping_reply_codes_old = *ds_ping_reply_codes;
		*ds_ping_reply_codes = ds_ping_reply_codes_new;
		// Done: Set new Number of entries:
		*ds_ping_reply_codes_cnt = list_size;
		// Free the old memory area:
		if(ds_ping_reply_codes_old)
			shm_free(ds_ping_reply_codes_old);
		/* Less or equal? Set the number of codes first. */
	} else {
		// Done:
		*ds_ping_reply_codes_cnt = list_size;
		// Copy Pointer
		ds_ping_reply_codes_old = *ds_ping_reply_codes;
		*ds_ping_reply_codes = ds_ping_reply_codes_new;
		// Free the old memory area:
		if(ds_ping_reply_codes_old)
			shm_free(ds_ping_reply_codes_old);
	}
	/* Print the list as INFO: */
	for (i =0; i< *ds_ping_reply_codes_cnt; i++)
	{
		LM_DBG("Dispatcher: Now accepting Reply-Code %d (%d/%d) as valid\n",
				(*ds_ping_reply_codes)[i], (i+1), *ds_ping_reply_codes_cnt);
	}
	return 0;
}

int ds_ping_check_rplcode(int code)
{
	int i;

	for (i =0; i< *ds_ping_reply_codes_cnt; i++)
	{
		if((*ds_ping_reply_codes)[i] == code)
			return 1;
	}

	return 0;
}

void ds_ping_reply_codes_update(str* gname, str* name){
	ds_parse_reply_codes();
}

/*** RPC implementation ***/

static const char* dispatcher_rpc_reload_doc[2] = {
	"Reload dispatcher destination sets",
	0
};


/*
 * RPC command to reload dispatcher destination sets
 */
static void dispatcher_rpc_reload(rpc_t* rpc, void* ctx)
{
	if(!ds_db_url.s) {
		if (ds_load_list(dslistfile)!=0) {
			rpc->fault(ctx, 500, "Reload Failed");
			return;
		}
	} else {
		if(ds_reload_db()<0) {
			rpc->fault(ctx, 500, "Reload Failed");
			return;
		}
	}
	return;
}



static const char* dispatcher_rpc_list_doc[2] = {
	"Return the content of dispatcher sets",
	0
};


/*
 * RPC command to print dispatcher destination sets
 */
static void dispatcher_rpc_list(rpc_t* rpc, void* ctx)
{
	void* th;
	void* ih;
	void* rh;
	void* sh;
	void* vh;
	void* wh;
	int j;
	char c[3];
	str data = {"", 0};
	ds_set_t *ds_list;
	int ds_list_nr;
	ds_set_t *list;

	ds_list = ds_get_list();
	ds_list_nr = ds_get_list_nr();

	if(ds_list==NULL || ds_list_nr<=0)
	{
		LM_ERR("no destination sets\n");
		rpc->fault(ctx, 500, "No Destination Sets");
		return;
	}

	/* add entry node */
	if (rpc->add(ctx, "{", &th) < 0)
	{
		rpc->fault(ctx, 500, "Internal error root reply");
		return;
	}
	if(rpc->struct_add(th, "d[",
				"NRSETS", ds_list_nr,
				"RECORDS",  &ih)<0)
	{
		rpc->fault(ctx, 500, "Internal error sets structure");
		return;
	}

	for(list = ds_list; list!= NULL; list= list->next)
	{
		if (rpc->struct_add(ih, "{", "SET", &sh) < 0)
		{
			rpc->fault(ctx, 500, "Internal error set structure");
			return;
		}
		if(rpc->struct_add(sh, "d[",
					"ID", list->id,
					"TARGETS", &rh)<0)
		{
			rpc->fault(ctx, 500, "Internal error creating set id");
			return;
		}

		for(j=0; j<list->nr; j++)
		{
			if(rpc->struct_add(rh, "{",
						"DEST", &vh)<0)
			{
				rpc->fault(ctx, 500, "Internal error creating dest");
				return;
			}

			memset(&c, 0, sizeof(c));
			if (list->dlist[j].flags & DS_INACTIVE_DST)
				c[0] = 'I';
			else if (list->dlist[j].flags & DS_DISABLED_DST)
				c[0] = 'D';
			else if (list->dlist[j].flags & DS_TRYING_DST)
				c[0] = 'T';
			else
				c[0] = 'A';

			if (list->dlist[j].flags & DS_PROBING_DST)
				c[1] = 'P';
			else
				c[1] = 'X';

			if (list->dlist[j].attrs.body.s)
			{
				if(rpc->struct_add(vh, "Ssd{",
							"URI", &list->dlist[j].uri,
							"FLAGS", c,
							"PRIORITY", list->dlist[j].priority,
							"ATTRS", &wh)<0)
				{
					rpc->fault(ctx, 500, "Internal error creating dest struct");
					return;
				}
				if(rpc->struct_add(wh, "SSddS",
							"BODY", &(list->dlist[j].attrs.body),
							"DUID", (list->dlist[j].attrs.duid.s)?
							&(list->dlist[j].attrs.duid):&data,
							"MAXLOAD", list->dlist[j].attrs.maxload,
							"WEIGHT", list->dlist[j].attrs.weight,
							"SOCKET", (list->dlist[j].attrs.socket.s)?
							&(list->dlist[j].attrs.socket):&data)<0)
				{
					rpc->fault(ctx, 500, "Internal error creating attrs struct");
					return;
				}
			} else {
				if(rpc->struct_add(vh, "Ssd",
							"URI", &list->dlist[j].uri,
							"FLAGS", c,
							"PRIORITY", list->dlist[j].priority)<0)
				{
					rpc->fault(ctx, 500, "Internal error creating dest struct");
					return;
				}
			}
		}
	}

	return;
}


static const char* dispatcher_rpc_set_state_doc[2] = {
	"Set the state of a destination address",
	0
};


/*
 * RPC command to set the state of a destination address
 */
static void dispatcher_rpc_set_state(rpc_t* rpc, void* ctx)
{
	int group;
	str dest;
	str state;
	int stval;

	if(rpc->scan(ctx, ".SdS", &state, &group, &dest)<3)
	{
		rpc->fault(ctx, 500, "Invalid Parameters");
		return;
	}
	if(state.len<=0 || state.s==NULL)
	{
		LM_ERR("bad state value\n");
		rpc->fault(ctx, 500, "Invalid State Parameter");
		return;
	}

	stval = 0;
	if(state.s[0]=='0' || state.s[0]=='I' || state.s[0]=='i') {
		/* set inactive */
		stval |= DS_INACTIVE_DST;
		if((state.len>1) && (state.s[1]=='P' || state.s[1]=='p'))
			stval |= DS_PROBING_DST;
	} else if(state.s[0]=='1' || state.s[0]=='A' || state.s[0]=='a') {
		/* set active */
		if((state.len>1) && (state.s[1]=='P' || state.s[1]=='p'))
			stval |= DS_PROBING_DST;
	} else if(state.s[0]=='2' || state.s[0]=='D' || state.s[0]=='d') {
		/* set disabled */
		stval |= DS_DISABLED_DST;
	} else if(state.s[0]=='3' || state.s[0]=='T' || state.s[0]=='t') {
		/* set trying */
		stval |= DS_TRYING_DST;
		if((state.len>1) && (state.s[1]=='P' || state.s[1]=='p'))
			stval |= DS_PROBING_DST;
	} else {
		LM_ERR("unknow state value\n");
		rpc->fault(ctx, 500, "Unknown State Value");
		return;
	}

	if(ds_reinit_state(group, &dest, stval)<0)
	{
		rpc->fault(ctx, 500, "State Update Failed");
		return;
	}

	return;
}


rpc_export_t dispatcher_rpc_cmds[] = {
	{"dispatcher.reload", dispatcher_rpc_reload,
		dispatcher_rpc_reload_doc, 0},
	{"dispatcher.list",   dispatcher_rpc_list,
		dispatcher_rpc_list_doc,   0},
	{"dispatcher.set_state",   dispatcher_rpc_set_state,
		dispatcher_rpc_set_state_doc,   0},
	{0, 0, 0, 0}
};

/**
 * register RPC commands
 */
static int ds_init_rpc(void)
{
	if (rpc_register_array(dispatcher_rpc_cmds)!=0)
	{
		LM_ERR("failed to register RPC commands\n");
		return -1;
	}
	return 0;
}