src/modules/dmq/notification_peer.c
1977645c
 /*
  * dmq module - distributed message queue
  *
  * Copyright (C) 2011 Bucur Marius - Ovidiu
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License 
  * along with this program; if not, write to the Free Software 
9e1ff448
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
1977645c
  *
  */
 
 
7c81dea1
 #include "notification_peer.h"
f403eea5
 
2b7ae0f0
 #define MAXDMQURILEN 255
 #define MAXDMQHOSTS 30
c55ec2ba
 
d500589d
 str dmq_notification_content_type = str_init("text/plain");
 dmq_resp_cback_t dmq_notification_resp_callback = {&notification_resp_callback_f, 0};
7c81dea1
 
b29a45f0
 int *dmq_init_callback_done = 0;
b1aadf4c
 
 
1977645c
 /**
  * @brief add notification peer
  */
 int add_notification_peer()
 {
f403eea5
 	dmq_peer_t not_peer;
4d52cc05
 
 	memset(&not_peer, 0, sizeof(dmq_peer_t));
d500589d
 	not_peer.callback = dmq_notification_callback_f;
b1aadf4c
 	not_peer.init_callback = NULL;
f403eea5
 	not_peer.description.s = "notification_peer";
 	not_peer.description.len = 17;
 	not_peer.peer_id.s = "notification_peer";
 	not_peer.peer_id.len = 17;
 	dmq_notification_peer = register_dmq_peer(&not_peer);
 	if(!dmq_notification_peer) {
a3918ba7
 		LM_ERR("error in register_dmq_peer\n");
 		goto error;
 	}
 	/* add itself to the node list */
d500589d
 	dmq_self_node = add_dmq_node(dmq_node_list, &dmq_server_address);
 	if(!dmq_self_node) {
a3918ba7
 		LM_ERR("error adding self node\n");
 		goto error;
 	}
f403eea5
 	/* local node - only for self */
d500589d
 	dmq_self_node->local = 1;
 	dmq_self_node->status = DMQ_NODE_ACTIVE;
f403eea5
 	return 0;
 error:
 	return -1;
 }
 
c55ec2ba
 /**********
 * Create IP URI
 *
 * INPUT:
 *   Arg (1) = container for hosts
 *   Arg (2) = host index
 *   Arg (3) = host name pointer
 *   Arg (4) = host name length
 *   Arg (5) = parsed URI pointer
 * OUTPUT: 0=unable to create URI
 **********/
 
2b7ae0f0
 int create_IP_uri(char **puri_list, int host_index, char *phost, int hostlen,
 		sip_uri_t *puri)
c55ec2ba
 
 {
 	int pos;
 	char *plist;
 	char *perr = "notification host count reached max!\n";
 
 	/**********
 	* insert
 	* o scheme
 	* o user name/password
 	* o host
 	* o port
 	* o parameters
 	**********/
 
2b7ae0f0
 	plist = puri_list[host_index];
 	if(puri->type == SIPS_URI_T) {
127c8cb4
 		memcpy(plist, "sips:", 5);
c55ec2ba
 		pos = 5;
 	} else {
127c8cb4
 		memcpy(plist, "sip:", 4);
c55ec2ba
 		pos = 4;
 	}
2b7ae0f0
 	if(puri->user.s) {
127c8cb4
 		memcpy(&plist[pos], puri->user.s, puri->user.len);
c55ec2ba
 		pos += puri->user.len;
2b7ae0f0
 		if(puri->passwd.s) {
 			plist[pos++] = ':';
127c8cb4
 			memcpy(&plist[pos], puri->passwd.s, puri->passwd.len);
c55ec2ba
 			pos += puri->passwd.len;
 		}
2b7ae0f0
 		plist[pos++] = '@';
c55ec2ba
 	}
2b7ae0f0
 	if((pos + hostlen) > MAXDMQURILEN) {
 		LM_WARN("%s", perr);
c55ec2ba
 		return 0;
 	}
127c8cb4
 	memcpy(&plist[pos], phost, hostlen);
c55ec2ba
 	pos += hostlen;
2b7ae0f0
 	if(puri->port_no) {
 		if((pos + 6) > MAXDMQURILEN) {
 			LM_WARN("%s", perr);
c55ec2ba
 			return 0;
 		}
2b7ae0f0
 		plist[pos++] = ':';
 		pos += ushort2sbuf(puri->port_no, &plist[pos], 5);
c55ec2ba
 	}
2b7ae0f0
 	if(puri->params.s) {
 		if((pos + puri->params.len) >= MAXDMQURILEN) {
 			LM_WARN("%s", perr);
c55ec2ba
 			return 0;
 		}
2b7ae0f0
 		plist[pos++] = ';';
127c8cb4
 		memcpy(&plist[pos], puri->params.s, puri->params.len);
c55ec2ba
 		pos += puri->params.len;
 	}
2b7ae0f0
 	plist[pos] = '\0';
c55ec2ba
 	return 1;
 }
 
 /**********
 * Get DMQ Host List
 *
 * INPUT:
 *   Arg (1) = container for hosts
 *   Arg (2) = maximum number of hosts
 *   Arg (3) = host string pointer
 *   Arg (4) = parsed URI pointer
 *   Arg (5) = search SRV flag
 * OUTPUT: number of hosts found
 **********/
 
2b7ae0f0
 int get_dmq_host_list(
 		char **puri_list, int max_hosts, str *phost, sip_uri_t *puri, int bSRV)
c55ec2ba
 
 {
 	int host_cnt, len;
 	unsigned short origport, port;
2b7ae0f0
 	str pstr[1];
 	char pname[256], pIP[IP6_MAX_STR_SIZE + 2];
c55ec2ba
 	struct rdata *phead, *prec;
 	struct srv_rdata *psrv;
 
 	/**********
 	* o IP address?
 	* o make null terminated name
 	* o search SRV?
 	**********/
 
2b7ae0f0
 	if(str2ip(phost) || str2ip6(phost)) {
 		if(!create_IP_uri(puri_list, 0, phost->s, phost->len, puri)) {
 			LM_DBG("adding DMQ node IP host %.*s=%s\n", phost->len, phost->s,
 					puri_list[0]);
c55ec2ba
 			return 0;
 		}
 		return 1;
 	}
2b7ae0f0
 	strncpy(pname, phost->s, phost->len);
 	pname[phost->len] = '\0';
c55ec2ba
 	host_cnt = 0;
2b7ae0f0
 	if(bSRV) {
c55ec2ba
 		/**********
 		* get SRV records
 		**********/
 
 		port = puri->port_no;
2b7ae0f0
 		phead = get_record(pname, T_SRV, RES_ONLY_TYPE);
 		for(prec = phead; prec; prec = prec->next) {
c55ec2ba
 			/**********
 			* o matching port?
 			* o check max
 			* o save original port
 			* o check target
 			* o restore port
 			**********/
 
2b7ae0f0
 			psrv = (struct srv_rdata *)prec->rdata;
 			if(port && (port != psrv->port)) {
 				continue;
 			}
 			if(host_cnt == max_hosts) {
 				LM_WARN("notification host count reached max!\n");
 				free_rdata_list(phead);
c55ec2ba
 				return host_cnt;
 			}
 			pstr->s = psrv->name;
 			pstr->len = psrv->name_len;
 			origport = puri->port_no;
 			puri->port_no = psrv->port;
2b7ae0f0
 			host_cnt += get_dmq_host_list(&puri_list[host_cnt],
 					MAXDMQHOSTS - host_cnt, pstr, puri, 0);
c55ec2ba
 			puri->port_no = origport;
 		}
2b7ae0f0
 		if(phead)
 			free_rdata_list(phead);
c55ec2ba
 	}
 
 	/**********
 	* get A records
 	**********/
 
2b7ae0f0
 	phead = get_record(pname, T_A, RES_ONLY_TYPE);
 	for(prec = phead; prec; prec = prec->next) {
c55ec2ba
 		/**********
 		* o check max
 		* o create URI
 		**********/
 
2b7ae0f0
 		if(host_cnt == max_hosts) {
 			LM_WARN("notification host count reached max!\n");
 			free_rdata_list(phead);
c55ec2ba
 			return host_cnt;
 		}
2b7ae0f0
 		len = ip4tosbuf(
 				((struct a_rdata *)prec->rdata)->ip, pIP, IP4_MAX_STR_SIZE);
 		pIP[len] = '\0';
 		if(create_IP_uri(puri_list, host_cnt, pIP, len, puri)) {
 			LM_DBG("adding DMQ node A host %s=%s\n", pname,
 					puri_list[host_cnt]);
c55ec2ba
 			host_cnt++;
 		}
 	}
2b7ae0f0
 	if(phead)
 		free_rdata_list(phead);
c55ec2ba
 
 	/**********
 	* get AAAA records
 	**********/
 
2b7ae0f0
 	phead = get_record(pname, T_AAAA, RES_ONLY_TYPE);
 	for(prec = phead; prec; prec = prec->next) {
c55ec2ba
 		/**********
 		* o check max
 		* o create URI
 		**********/
 
2b7ae0f0
 		if(host_cnt == max_hosts) {
 			LM_WARN("notification host count reached max!\n");
 			free_rdata_list(phead);
c55ec2ba
 			return host_cnt;
 		}
2b7ae0f0
 		pIP[0] = '[';
 		len = ip6tosbuf(((struct aaaa_rdata *)prec->rdata)->ip6, &pIP[1],
 					  IP6_MAX_STR_SIZE)
 			  + 1;
 		pIP[len++] = ']';
 		pIP[len] = '\0';
 		if(create_IP_uri(puri_list, host_cnt, pIP, len, puri)) {
 			LM_DBG("adding DMQ node AAAA host %s=%s\n", pname,
 					puri_list[host_cnt]);
c55ec2ba
 			host_cnt++;
 		}
 	}
2b7ae0f0
 	if(phead)
 		free_rdata_list(phead);
c55ec2ba
 	return host_cnt;
 }
 
1977645c
 /**
  * @brief add a server node and notify it
  */
2b7ae0f0
 dmq_node_t *add_server_and_notify(str *paddr)
1977645c
 {
2b7ae0f0
 	char puri_data[MAXDMQHOSTS * (MAXDMQURILEN + 1)];
 	char *puri_list[MAXDMQHOSTS];
c55ec2ba
 	dmq_node_t *pfirst, *pnode;
 	int host_cnt, index;
2b7ae0f0
 	sip_uri_t puri[1];
 	str pstr[1];
c55ec2ba
 
 	/**********
 	* o init data area
 	* o get list of hosts
 	* o process list
 	**********/
 
d500589d
 	if(!dmq_multi_notify) {
 		pfirst = add_dmq_node(dmq_node_list, paddr);
c55ec2ba
 	} else {
 		/**********
 		* o init data area
 		* o get list of hosts
 		* o process list
 		**********/
 
2b7ae0f0
 		for(index = 0; index < MAXDMQHOSTS; index++) {
 			puri_list[index] = &puri_data[index * (MAXDMQURILEN + 1)];
c55ec2ba
 		}
2b7ae0f0
 		if(parse_uri(paddr->s, paddr->len, puri) < 0) {
c55ec2ba
 			/* this is supposed to be good but just in case... */
2b7ae0f0
 			LM_ERR("add_server_and_notify address invalid\n");
c55ec2ba
 			return 0;
 		}
 		pfirst = NULL;
 		host_cnt =
2b7ae0f0
 				get_dmq_host_list(puri_list, MAXDMQHOSTS, &puri->host, puri, 1);
 		for(index = 0; index < host_cnt; index++) {
 			pstr->s = puri_list[index];
 			pstr->len = strlen(puri_list[index]);
d500589d
 			if(!find_dmq_node_uri(dmq_node_list, pstr)) { // check for duplicates
 				pnode = add_dmq_node(dmq_node_list, pstr);
2b7ae0f0
 				if(pnode && !pfirst) {
 					pfirst = pnode;
 				}
8e955bad
 			}
c55ec2ba
 		}
c70705c4
 	}
c55ec2ba
 
 	/**********
 	* o found at least one?
 	* o request node list
 	**********/
 
2b7ae0f0
 	if(!pfirst) {
 		LM_ERR("error adding notification node\n");
c55ec2ba
 		return NULL;
a3918ba7
 	}
2b7ae0f0
 	if(request_nodelist(pfirst, 2) < 0) {
 		LM_ERR("error requesting initial nodelist\n");
c55ec2ba
 		return NULL;
 	}
 	return pfirst;
a3918ba7
 }
 
c70705c4
 /**
  * extract the node list from the body of a notification request SIP message
  * the SIP request will look something like:
  * 	KDMQ sip:10.0.0.0:5062
  * 	To: ...
  * 	From: ...
  * 	Max-Forwards: ...
  * 	Content-Length: 22
  * 	
  * 	sip:host1:port1;param1=value1
  * 	sip:host2:port2;param2=value2
  * 	...
  */
2b7ae0f0
 int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
1977645c
 {
a3918ba7
 	int content_length, total_nodes = 0;
 	str body;
 	str tmp_uri;
f403eea5
 	dmq_node_t *cur = NULL;
42e653b5
 	dmq_node_t *ret, *find;
a3918ba7
 	char *tmp, *end, *match;
549f96e5
 
2b7ae0f0
 	if(!msg->content_length && (parse_headers(msg, HDR_CONTENTLENGTH_F, 0) < 0
 									   || !msg->content_length)) {
a3918ba7
 		LM_ERR("no content length header found\n");
 		return -1;
 	}
 	content_length = get_content_length(msg);
 	if(!content_length) {
 		LM_DBG("content length is 0\n");
 		return total_nodes;
 	}
 	body.s = get_body(msg);
 	body.len = content_length;
 	tmp = body.s;
 	end = body.s + body.len;
2b7ae0f0
 
f403eea5
 	/* acquire big list lock */
 	lock_get(&update_list->lock);
a3918ba7
 	while(tmp < end) {
 		match = q_memchr(tmp, '\n', end - tmp);
f403eea5
 		if(match) {
a3918ba7
 			match++;
f403eea5
 		} else {
a3918ba7
 			/* for the last line - take all of it */
 			match = end;
 		}
 		/* create the orig_uri from the parsed uri line and trim it */
 		tmp_uri.s = tmp;
f403eea5
 		tmp_uri.len = match - tmp - 1;
 		tmp = match;
 		/* trim the \r, \n and \0's */
841b6c1f
 		trim_r(tmp_uri);
42e653b5
 		find = build_dmq_node(&tmp_uri, 0);
2b7ae0f0
 		if(find == NULL)
42e653b5
 			return -1;
 		ret = find_dmq_node(update_list, find);
2b7ae0f0
 		if(!ret) {
841b6c1f
 			LM_DBG("found new node %.*s\n", STR_FMT(&tmp_uri));
 			cur = build_dmq_node(&tmp_uri, 1);
 			if(!cur) {
 				LM_ERR("error creating new dmq node\n");
 				goto error;
 			}
f403eea5
 			cur->next = update_list->nodes;
 			update_list->nodes = cur;
 			update_list->count++;
 			total_nodes++;
8ffcb5f7
 		} else if(!ret->local && find->uri.params.s && 
 					ret->status != find->status && ret->status != DMQ_NODE_DISABLED) {
 			/* don't update the node if it is in ending state */
2b7ae0f0
 			LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri),
 					ret->status, find->status);
42e653b5
 			ret->status = find->status;
 			total_nodes++;
f403eea5
 		}
42e653b5
 		destroy_dmq_node(find, 0);
a3918ba7
 	}
42e653b5
 
f403eea5
 	/* release big list lock */
a3918ba7
 	lock_release(&update_list->lock);
 	return total_nodes;
f403eea5
 error:
 	lock_release(&update_list->lock);
 	return -1;
7c81dea1
 }
 
b1aadf4c
 
2b7ae0f0
 int run_init_callbacks()
 {
 	dmq_peer_t *crt;
b1aadf4c
 
d500589d
 	if(dmq_peer_list == 0) {
cc5f96f9
 		LM_WARN("peer list is null\n");
 		return 0;
 	}
d500589d
 	crt = dmq_peer_list->peers;
b1aadf4c
 	while(crt) {
2b7ae0f0
 		if(crt->init_callback) {
b1aadf4c
 			crt->init_callback();
 		}
 		crt = crt->next;
 	}
 	return 0;
 }
 
 
1977645c
 /**
  * @brief dmq notification callback
  */
d500589d
 int dmq_notification_callback_f(
2b7ae0f0
 		struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node)
1977645c
 {
a3918ba7
 	int nodes_recv;
2b7ae0f0
 	str *response_body = NULL;
9c3524a6
 	int maxforwards = 0;
8620313f
 	/* received dmqnode list */
f403eea5
 	LM_DBG("dmq triggered from dmq_notification_callback\n");
2b7ae0f0
 
841b6c1f
 	/* extract the maxforwards value, if any */
 	if(msg->maxforwards) {
2b7ae0f0
 		if(msg->maxforwards->parsed > 0) {
9c3524a6
 			/* maxfwd module has parsed and decreased the value in the msg buf */
 			/* maxforwards->parsed contains the original value */
 			maxforwards = (int)(long)(msg->maxforwards->parsed) - 1;
 		} else {
 			str2sint(&msg->maxforwards->body, &maxforwards);
 			maxforwards--;
 		}
841b6c1f
 	}
d500589d
 	nodes_recv = extract_node_list(dmq_node_list, msg);
42e653b5
 	LM_DBG("received %d new or changed nodes\n", nodes_recv);
f403eea5
 	response_body = build_notification_body();
2b7ae0f0
 	if(response_body == NULL) {
1977645c
 		LM_ERR("no response body\n");
 		goto error;
 	}
d500589d
 	resp->content_type = dmq_notification_content_type;
f403eea5
 	resp->reason = dmq_200_rpl;
 	resp->body = *response_body;
 	resp->resp_code = 200;
2b7ae0f0
 
f403eea5
 	/* if we received any new nodes tell about them to the others */
50f1a5e0
 	if(nodes_recv > 0 && maxforwards > 0) {
841b6c1f
 		/* maxforwards is set to 0 so that the message is will not be in a spiral */
1977645c
 		bcast_dmq_message(dmq_notification_peer, response_body, 0,
d500589d
 				&dmq_notification_resp_callback, maxforwards,
 				&dmq_notification_content_type);
f403eea5
 	}
 	pkg_free(response_body);
2b7ae0f0
 	if(dmq_init_callback_done && !*dmq_init_callback_done) {
b1aadf4c
 		*dmq_init_callback_done = 1;
 		run_init_callbacks();
 	}
a3918ba7
 	return 0;
f403eea5
 error:
 	return -1;
a3918ba7
 }
 
c70705c4
 /**
  * builds the body of a notification message from the list of servers 
a3918ba7
  * the result will look something like:
  * sip:host1:port1;param1=value1\r\n
  * sip:host2:port2;param2=value2\r\n
  * sip:host3:port3;param3=value3
  */
2b7ae0f0
 str *build_notification_body()
1977645c
 {
a3918ba7
 	/* the length of the current line describing the server */
 	int slen;
 	/* the current length of the body */
 	int clen = 0;
2b7ae0f0
 	dmq_node_t *cur_node = NULL;
 	str *body;
a3918ba7
 	body = pkg_malloc(sizeof(str));
2b7ae0f0
 	if(body == NULL) {
1977645c
 		LM_ERR("no more pkg\n");
 		return NULL;
 	}
a3918ba7
 	memset(body, 0, sizeof(str));
 	/* we allocate a chunk of data for the body */
 	body->len = NBODY_LEN;
 	body->s = pkg_malloc(body->len);
2b7ae0f0
 	if(body->s == NULL) {
1977645c
 		LM_ERR("no more pkg\n");
 		pkg_free(body);
 		return NULL;
 	}
a3918ba7
 	/* we add each server to the body - each on a different line */
d500589d
 	lock_get(&dmq_node_list->lock);
 	cur_node = dmq_node_list->nodes;
a3918ba7
 	while(cur_node) {
3d2b3868
 		if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
 			LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
 			/* body->len - clen - 2 bytes left to write - including the \r\n */
 			slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
 			if(slen < 0) {
 				LM_ERR("cannot build_node_string\n");
 				goto error;
 			}
 			clen += slen;
 			body->s[clen++] = '\r';
 			body->s[clen++] = '\n';
a3918ba7
 		}
 		cur_node = cur_node->next;
8620313f
 	}
d500589d
 	lock_release(&dmq_node_list->lock);
a3918ba7
 	body->len = clen;
 	return body;
 error:
d500589d
 	lock_release(&dmq_node_list->lock);
a3918ba7
 	pkg_free(body->s);
 	pkg_free(body);
 	return NULL;
 }
 
1977645c
 /**
  * @brief request node list
  */
2b7ae0f0
 int request_nodelist(dmq_node_t *node, int forward)
1977645c
 {
2b7ae0f0
 	str *body;
a3918ba7
 	int ret;
1977645c
 	body = build_notification_body();
2b7ae0f0
 	if(body == NULL) {
1977645c
 		LM_ERR("no notification body\n");
 		return -1;
 	}
bca756f8
 	ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
d500589d
 			&dmq_notification_resp_callback, forward,
 			&dmq_notification_content_type, 1);
a3918ba7
 	pkg_free(body->s);
 	pkg_free(body);
 	return ret;
f403eea5
 }
 
1977645c
 /**
  * @brief notification response callback
  */
2b7ae0f0
 int notification_resp_callback_f(
 		struct sip_msg *msg, int code, dmq_node_t *node, void *param)
1977645c
 {
f403eea5
 	int ret;
b1aadf4c
 	int nodes_recv;
 
f403eea5
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
b1aadf4c
 	if(code == 200) {
8ffcb5f7
 		/* be sure that the node that answered is in active state */
d500589d
 		update_dmq_node_status(dmq_node_list, node, DMQ_NODE_ACTIVE);
 		nodes_recv = extract_node_list(dmq_node_list, msg);
b1aadf4c
 		LM_DBG("received %d new or changed nodes\n", nodes_recv);
2b7ae0f0
 		if(dmq_init_callback_done && !*dmq_init_callback_done) {
b1aadf4c
 			*dmq_init_callback_done = 1;
 			run_init_callbacks();
 		}
 	} else if(code == 408) {
2b7ae0f0
 		if(STR_EQ(node->orig_uri, dmq_notification_address)) {
69dc8ec9
 			LM_ERR("not deleting notification_peer\n");
d500589d
 			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);	
69dc8ec9
 			return 0;
 		}
8ffcb5f7
 		if (node->status == DMQ_NODE_DISABLED) {
 			/* deleting node - the server did not respond */
 			LM_ERR("deleting server %.*s because of failed request\n",
 				STR_FMT(&node->orig_uri));
d500589d
 			ret = del_dmq_node(dmq_node_list, node);
8ffcb5f7
 			LM_DBG("del_dmq_node returned %d\n", ret);
 		} else {
 			/* put the node in disabled state and wait for the next ping before deleting it */
d500589d
 			update_dmq_node_status(dmq_node_list, node, DMQ_NODE_DISABLED);
8ffcb5f7
 		}
f403eea5
 	}
 	return 0;
50f1a5e0
 }