modules/dmq/notification_peer.c
1977645c
 /*
  * $Id$
  *
  * dmq module - distributed message queue
  *
  * Copyright (C) 2011 Bucur Marius - Ovidiu
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License 
  * along with this program; if not, write to the Free Software 
9e1ff448
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
1977645c
  *
  */
 
 
7c81dea1
 #include "notification_peer.h"
f403eea5
 
3f6445f4
 str notification_content_type = str_init("text/plain");
841b6c1f
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
7c81dea1
 
b29a45f0
 int *dmq_init_callback_done = 0;
b1aadf4c
 
 
1977645c
 /**
  * @brief add notification peer
  */
 int add_notification_peer()
 {
f403eea5
 	dmq_peer_t not_peer;
4d52cc05
 
 	memset(&not_peer, 0, sizeof(dmq_peer_t));
f403eea5
 	not_peer.callback = dmq_notification_callback;
b1aadf4c
 	not_peer.init_callback = NULL;
f403eea5
 	not_peer.description.s = "notification_peer";
 	not_peer.description.len = 17;
 	not_peer.peer_id.s = "notification_peer";
 	not_peer.peer_id.len = 17;
 	dmq_notification_peer = register_dmq_peer(&not_peer);
 	if(!dmq_notification_peer) {
a3918ba7
 		LM_ERR("error in register_dmq_peer\n");
 		goto error;
 	}
 	/* add itself to the node list */
c70705c4
 	self_node = add_dmq_node(node_list, &dmq_server_address);
 	if(!self_node) {
a3918ba7
 		LM_ERR("error adding self node\n");
 		goto error;
 	}
f403eea5
 	/* local node - only for self */
 	self_node->local = 1;
 	return 0;
 error:
 	return -1;
 }
 
1977645c
 /**
  * @brief add a server node and notify it
  */
 dmq_node_t* add_server_and_notify(str* server_address)
 {
f403eea5
 	/* add the notification server to the node list - if any */
1977645c
 	dmq_node_t* node;
 	
 	node = add_dmq_node(node_list, server_address);
f403eea5
 	if(!node) {
c70705c4
 		LM_ERR("error adding notification node\n");
 		goto error;
 	}
f403eea5
 	/* request initial list from the notification server */
50f1a5e0
 	if(request_nodelist(node, 2) < 0) {
f403eea5
 		LM_ERR("error requesting initial nodelist\n");
a3918ba7
 		goto error;
 	}
f403eea5
 	return node;
a3918ba7
 error:
f403eea5
 	return NULL;
a3918ba7
 }
 
c70705c4
 /**
  * extract the node list from the body of a notification request SIP message
  * the SIP request will look something like:
  * 	KDMQ sip:10.0.0.0:5062
  * 	To: ...
  * 	From: ...
  * 	Max-Forwards: ...
  * 	Content-Length: 22
  * 	
  * 	sip:host1:port1;param1=value1
  * 	sip:host2:port2;param2=value2
  * 	...
  */
1977645c
 int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
 {
a3918ba7
 	int content_length, total_nodes = 0;
 	str body;
 	str tmp_uri;
f403eea5
 	dmq_node_t *cur = NULL;
42e653b5
 	dmq_node_t *ret, *find;
a3918ba7
 	char *tmp, *end, *match;
549f96e5
 
 	if(!msg->content_length && (parse_headers(msg,HDR_CONTENTLENGTH_F,0)<0 || !msg->content_length)) {
a3918ba7
 		LM_ERR("no content length header found\n");
 		return -1;
 	}
 	content_length = get_content_length(msg);
 	if(!content_length) {
 		LM_DBG("content length is 0\n");
 		return total_nodes;
 	}
 	body.s = get_body(msg);
 	body.len = content_length;
 	tmp = body.s;
 	end = body.s + body.len;
 	
f403eea5
 	/* acquire big list lock */
 	lock_get(&update_list->lock);
a3918ba7
 	while(tmp < end) {
 		match = q_memchr(tmp, '\n', end - tmp);
f403eea5
 		if(match) {
a3918ba7
 			match++;
f403eea5
 		} else {
a3918ba7
 			/* for the last line - take all of it */
 			match = end;
 		}
 		/* create the orig_uri from the parsed uri line and trim it */
 		tmp_uri.s = tmp;
f403eea5
 		tmp_uri.len = match - tmp - 1;
 		tmp = match;
 		/* trim the \r, \n and \0's */
841b6c1f
 		trim_r(tmp_uri);
42e653b5
 		find = build_dmq_node(&tmp_uri, 0);
 		if(find==NULL)
 			return -1;
 		ret = find_dmq_node(update_list, find);
 		if (!ret) {
841b6c1f
 			LM_DBG("found new node %.*s\n", STR_FMT(&tmp_uri));
 			cur = build_dmq_node(&tmp_uri, 1);
 			if(!cur) {
 				LM_ERR("error creating new dmq node\n");
 				goto error;
 			}
f403eea5
 			cur->next = update_list->nodes;
 			update_list->nodes = cur;
 			update_list->count++;
 			total_nodes++;
42e653b5
 		} else if (find->params && ret->status != find->status) {
 			LM_DBG("updating status on %.*s from %d to %d\n",
 				STR_FMT(&tmp_uri), ret->status, find->status);
 			ret->status = find->status;
 			total_nodes++;
f403eea5
 		}
42e653b5
 		destroy_dmq_node(find, 0);
a3918ba7
 	}
42e653b5
 
f403eea5
 	/* release big list lock */
a3918ba7
 	lock_release(&update_list->lock);
 	return total_nodes;
f403eea5
 error:
 	lock_release(&update_list->lock);
 	return -1;
7c81dea1
 }
 
b1aadf4c
 
 int run_init_callbacks() {
 	dmq_peer_t* crt;
 
cc5f96f9
 	if(peer_list==0) {
 		LM_WARN("peer list is null\n");
 		return 0;
 	}
b1aadf4c
 	crt = peer_list->peers;
 	while(crt) {
 		if (crt->init_callback) {
 			crt->init_callback();
 		}
 		crt = crt->next;
 	}
 	return 0;
 }
 
 
1977645c
 /**
  * @brief dmq notification callback
  */
c2dcf4db
 int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
1977645c
 {
a3918ba7
 	int nodes_recv;
f403eea5
 	str* response_body = NULL;
9c3524a6
 	int maxforwards = 0;
8620313f
 	/* received dmqnode list */
f403eea5
 	LM_DBG("dmq triggered from dmq_notification_callback\n");
841b6c1f
 	
 	/* extract the maxforwards value, if any */
 	if(msg->maxforwards) {
9c3524a6
 		if (msg->maxforwards->parsed > 0) {
 			/* maxfwd module has parsed and decreased the value in the msg buf */
 			/* maxforwards->parsed contains the original value */
 			maxforwards = (int)(long)(msg->maxforwards->parsed) - 1;
 		} else {
 			str2sint(&msg->maxforwards->body, &maxforwards);
 			maxforwards--;
 		}
841b6c1f
 	}
a3918ba7
 	nodes_recv = extract_node_list(node_list, msg);
42e653b5
 	LM_DBG("received %d new or changed nodes\n", nodes_recv);
f403eea5
 	response_body = build_notification_body();
1977645c
 	if(response_body==NULL) {
 		LM_ERR("no response body\n");
 		goto error;
 	}
f403eea5
 	resp->content_type = notification_content_type;
 	resp->reason = dmq_200_rpl;
 	resp->body = *response_body;
 	resp->resp_code = 200;
 	
 	/* if we received any new nodes tell about them to the others */
50f1a5e0
 	if(nodes_recv > 0 && maxforwards > 0) {
841b6c1f
 		/* maxforwards is set to 0 so that the message is will not be in a spiral */
1977645c
 		bcast_dmq_message(dmq_notification_peer, response_body, 0,
3f6445f4
 				&notification_callback, maxforwards, &notification_content_type);
f403eea5
 	}
 	pkg_free(response_body);
b29a45f0
 	if (dmq_init_callback_done && !*dmq_init_callback_done) {
b1aadf4c
 		*dmq_init_callback_done = 1;
 		run_init_callbacks();
 	}
a3918ba7
 	return 0;
f403eea5
 error:
 	return -1;
a3918ba7
 }
 
c70705c4
 /**
  * builds the body of a notification message from the list of servers 
a3918ba7
  * the result will look something like:
  * sip:host1:port1;param1=value1\r\n
  * sip:host2:port2;param2=value2\r\n
  * sip:host3:port3;param3=value3
  */
1977645c
 str* build_notification_body()
 {
a3918ba7
 	/* the length of the current line describing the server */
 	int slen;
 	/* the current length of the body */
 	int clen = 0;
 	dmq_node_t* cur_node = NULL;
 	str* body;
 	body = pkg_malloc(sizeof(str));
1977645c
 	if(body==NULL) {
 		LM_ERR("no more pkg\n");
 		return NULL;
 	}
a3918ba7
 	memset(body, 0, sizeof(str));
 	/* we allocate a chunk of data for the body */
 	body->len = NBODY_LEN;
 	body->s = pkg_malloc(body->len);
1977645c
 	if(body->s==NULL) {
 		LM_ERR("no more pkg\n");
 		pkg_free(body);
 		return NULL;
 	}
a3918ba7
 	/* we add each server to the body - each on a different line */
 	lock_get(&node_list->lock);
 	cur_node = node_list->nodes;
 	while(cur_node) {
 		LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
 		/* body->len - clen - 2 bytes left to write - including the \r\n */
 		slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
 		if(slen < 0) {
 			LM_ERR("cannot build_node_string\n");
 			goto error;
 		}
 		clen += slen;
 		body->s[clen++] = '\r';
 		body->s[clen++] = '\n';
 		cur_node = cur_node->next;
8620313f
 	}
a3918ba7
 	lock_release(&node_list->lock);
 	body->len = clen;
 	return body;
 error:
5f05db99
 	lock_release(&node_list->lock);
a3918ba7
 	pkg_free(body->s);
 	pkg_free(body);
 	return NULL;
 }
 
1977645c
 /**
  * @brief request node list
  */
 int request_nodelist(dmq_node_t* node, int forward)
 {
 	str* body;
a3918ba7
 	int ret;
1977645c
 	body = build_notification_body();
 	if(body==NULL) {
 		LM_ERR("no notification body\n");
 		return -1;
 	}
78a87972
 	ret = bcast_dmq_message(dmq_notification_peer, body, NULL,
3f6445f4
 			&notification_callback, forward, &notification_content_type);
a3918ba7
 	pkg_free(body->s);
 	pkg_free(body);
 	return ret;
f403eea5
 }
 
1977645c
 /**
  * @brief notification response callback
  */
 int notification_resp_callback_f(struct sip_msg* msg, int code,
 		dmq_node_t* node, void* param)
 {
f403eea5
 	int ret;
b1aadf4c
 	int nodes_recv;
 
f403eea5
 	LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
b1aadf4c
 	if(code == 200) {
 		nodes_recv = extract_node_list(node_list, msg);
 		LM_DBG("received %d new or changed nodes\n", nodes_recv);
b29a45f0
 		if (dmq_init_callback_done && !*dmq_init_callback_done) {
b1aadf4c
 			*dmq_init_callback_done = 1;
 			run_init_callbacks();
 		}
 	} else if(code == 408) {
f403eea5
 		/* deleting node - the server did not respond */
 		LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri));
69dc8ec9
 		if (STR_EQ(node->orig_uri, dmq_notification_address)) {
 			LM_ERR("not deleting notification_peer\n");
 			return 0;
 		}
f403eea5
 		ret = del_dmq_node(node_list, node);
 		LM_DBG("del_dmq_node returned %d\n", ret);
 	}
 	return 0;
50f1a5e0
 }