modules_k/seas/statistics.c
aaf285a6
 /* $Id$
  *
  * Copyright (C) 2006-2007 VozTelecom Sistemas S.L
  *
  * This file is part of openser, a free SIP server.
  *
  * openser is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * openser is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License 
  * along with this program; if not, write to the Free Software 
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  */
 
d38650d3
 #include <stdlib.h>
 #include <time.h>
 #include <netdb.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/time.h>
 #include <netinet/in.h>
302bdd35
 #include <netinet/in_systm.h>
d38650d3
 #include <netinet/ip.h> /* superset of previous */
 #include <string.h>
 #include <unistd.h>
 #include <stdio.h>
 #include <signal.h>
 
 #include "statistics.h"
b26faedd
 #include "seas.h" /*SLOG*/
d38650d3
 #include "../../mem/shm_mem.h"
 #include "../../resolve.h"
 #include "../../ut.h"
 #include "../../dprint.h"
 #include "../../locking.h"
 #define STATS_PAY 101
 
 struct statstable*  seas_stats_table;
 int stats_fd;
 char use_stats=0;
 pid_t pid;
 
 static void sig_handler(int signo)
 {
    switch(signo){
       case SIGTERM:
b96cb23e
 	 LOG(L_ERR,"Stats process caught SIGTERM, shutting down..\n");
d38650d3
 	 close(stats_fd);
 	 destroy_seas_stats_table();
 	 exit(0);
       default:
b96cb23e
 	 LOG(L_DBG,"caught signal %d\n",signo);
d38650d3
    }
b96cb23e
    LOG(L_WARN,"Statistics process:caught signal (%d)\n",signo);
d38650d3
 }
 
 struct statstable* init_seas_stats_table()
 {
    /*allocs the table*/
    seas_stats_table= (struct statstable*)shm_malloc( sizeof( struct statstable ) );
    if (!seas_stats_table) {
b96cb23e
       LOG(L_ERR, "no shmem for stats table (%d bytes)\n",(int)sizeof(struct statstable));
d38650d3
       return 0;
    }
    memset(seas_stats_table, 0, sizeof(struct statstable) );
    if(0==(seas_stats_table->mutex=lock_alloc())){
b96cb23e
       LOG(L_ERR,"couldn't alloc mutex (get_lock_t)\n");
d38650d3
       shm_free(seas_stats_table);
       return 0;
    }
    lock_init(seas_stats_table->mutex);
    return seas_stats_table;
 }
 
fcb8aeaf
 inline void destroy_seas_stats_table()
d38650d3
 {
    /*deallocs the table*/
    if(seas_stats_table){
       lock_destroy(seas_stats_table->mutex);
       shm_free(seas_stats_table);
       seas_stats_table=(struct statstable *)0;
    }
 }
 
 /** This will be called from within w_as_relay()
  *
  * TODO handle locking ?
  */
 inline void as_relay_stat(struct cell *t)
 {
    struct statscell *s;
    struct totag_elem *to;
    if(t==0)
       return;
    if(t->fwded_totags != 0){
b96cb23e
       LOG(L_DBG,"seas:as_relay_stat() unable to put a payload in fwded_totags because it is being used !!\n");
d38650d3
       return;
    }
    if(!(s=shm_malloc(sizeof(struct statscell)))){
       return;
    }
    if(!(to=shm_malloc(sizeof(struct totag_elem)))){
       shm_free(s);
       return;
    }
    memset(s,0,sizeof(struct statscell));
    gettimeofday(&(s->u.uas.as_relay),NULL);
    s->type=UAS_T;
    to->tag.len=0;
    to->tag.s=(char *)s;
    to->next=0;
    to->acked=STATS_PAY;
    t->fwded_totags=to;
    lock_get(seas_stats_table->mutex);
    (seas_stats_table->started_transactions)++;
    lock_release(seas_stats_table->mutex);
 }
 
 /** this will be called from the SEAS event dispatcher
  * when it writes the event to the socket
  *
  * Parameters: a cell OR its hash_index and its label
  *
  * TODO handle locking/mutexing ?
  */
 inline void event_stat(struct cell *t)
 {
    struct statscell *s;
    struct totag_elem *to;
    if(t==0){
       /*seas_f.tmb.t_lookup_ident(&t,hash_index,label); BAD bcos it refcounts, and there's no way to simply unrefcount from outside TM*/
       return;
    }
    if(t->fwded_totags == 0){
b96cb23e
       LOG(L_DBG,"seas:event_stat() unabe to set the event_stat timeval: no payload found at cell!! (fwded_totags=0)\n");
d38650d3
       return;
    }
    /*esto da un CORE DUMP cuando hay mucha carga.. warning*/
    to=t->fwded_totags;
    while(to){
       if(to->acked==STATS_PAY){
 	 s=(struct statscell *)to->tag.s;
 	 gettimeofday(&(s->u.uas.event_sent),NULL);
 	 return;
       }else
 	 to=to->next;
    }
    return;
 }
 
 /** param i is in milliseconds*/
 static inline int assignIndex(int i)
 {
    return (i/100)>14?14:(i/100);
 }
 
 /** this will be called from the SEAS action dispatcher
  * when it receives the action from the socket
  */
 inline void action_stat(struct cell *t)
 {
    unsigned int seas_dispatch,as_delay;
    struct timeval *t1,*t2,*t3;
    struct statscell *s;
    struct totag_elem *to;
    if(t==0)
       return;
    if(t->fwded_totags == 0){
b96cb23e
       LOG(L_DBG,"seas:event_stat() unable to set the event_stat timeval: no payload found at cell!! (fwded_totags=0)\n");
d38650d3
       return;
    }
    to=t->fwded_totags;
    while(to){
       if(to->acked==STATS_PAY){
 	 s=(struct statscell *)to->tag.s;
 	 gettimeofday(&(s->u.uas.action_recvd),NULL);
 	 break;
       }else
 	 to=to->next;
    }
    /**no statistics found**/
    if(to==0)
       return;
    t1=&(s->u.uas.as_relay);
    t2=&(s->u.uas.event_sent);
    t3=&(s->u.uas.action_recvd);
    seas_dispatch = (t2->tv_sec - t1->tv_sec)*1000 + (t2->tv_usec-t1->tv_usec)/1000;
    as_delay = (t3->tv_sec - t2->tv_sec)*1000 + (t3->tv_usec-t2->tv_usec)/1000;
 
    lock_get(seas_stats_table->mutex);
    {
       seas_stats_table->dispatch[assignIndex(seas_dispatch)]++;
       seas_stats_table->event[assignIndex(seas_dispatch)]++;
       (seas_stats_table->finished_transactions)++;
    }
    lock_release(seas_stats_table->mutex);
 }
 
 
 /**
  * stats socket sould be an IP_address:port or unix://path/to_file
  * TODO handling unix sockets and IPv6 !!
  *
  * returns 
  * 0 if no stats
  * 1 if stats properly started
  * -1 if error
  */
 int start_stats_server(char *stats_socket)
 {
    char *p,*port;
    unsigned short stats_port;
    struct hostent *he;
    /*use sockaddr_storage ??*/
    struct sockaddr_in su;
    int optval;
 
    use_stats=0;
    port=(char *)0;
    he=(struct hostent *)0;
    stats_fd=-1;
    p=stats_socket;
 
    if(p==0 || *p==0)
       return 0;
 
    if(!init_seas_stats_table()){
b96cb23e
       LOG(L_ERR,"unable to init stats table, disabling statistics\n");
d38650d3
       return -1;
    }
    while(*p){
       if(*p == ':'){
 	 *p=0;
 	 port=p+1;
 	 break;
       }
    }
    if(!(he=resolvehost(stats_socket,0)))
       goto error;
    if(port==(char*)0 || *port==0)
       stats_port=5088;
    else if(!(stats_port=str2s(port,strlen(port),0))){
b96cb23e
 	 LOG(L_ERR,"invalid port %s\n",port);
d38650d3
 	 goto error;
       }
    if((stats_fd=socket(he->h_addrtype, SOCK_STREAM, 0))==-1){
b96cb23e
       LOG(L_ERR,"trying to open server socket (%s)\n",strerror(errno));
d38650d3
       goto error;
    }
    optval=1;
    if (setsockopt(stats_fd, SOL_SOCKET, SO_REUSEADDR, (void*)&optval, sizeof(optval))==-1) {
b96cb23e
       LOG(L_ERR,"setsockopt (%s)\n",strerror(errno));
d38650d3
       goto error;
    }
    su.sin_family = he->h_addrtype;
    su.sin_port=htons(stats_port);
    memcpy(&su.sin_addr,he->h_addr_list[0],4);
    if((bind(stats_fd,(struct sockaddr*)&su,sizeof(struct sockaddr_in)))==-1){
b96cb23e
       LOG(L_ERR, "bind (%s)\n",strerror(errno));
d38650d3
       goto error;
    }
    if(listen(stats_fd, 10)==-1){
b96cb23e
       LOG(L_ERR, "listen (%s)\n",strerror(errno));
d38650d3
       goto error;
    }
    if(!(pid=fork())){/*child*/
       signal(SIGTERM,sig_handler);
       serve_stats(stats_fd);
       printf("Statistics Server Process exits !!\n");
       exit(0);
    }else if(pid>0){/*parent*/
       close(stats_fd);
    }else{/*error*/
b96cb23e
       LOG(L_ERR,"Unable to create stats server process\n");
d38650d3
       goto error;
    }
    use_stats=1;
    return 1;
 error:
    if(stats_fd!=-1)
       close(stats_fd);
    destroy_seas_stats_table();
    return -1;
 }
 
 /**
  * stats socket sould be an IP_address:port or unix://path/to_file
  * TODO handling unix sockets and IPv6 !!
  *
  * returns 
  * 0 if no stats
  * 1 if stats properly started
  * -1 if error
  */
fcb8aeaf
 inline int stop_stats_server()
d38650d3
 {
    if(pid)
       kill(SIGTERM,pid);
    return 0;
 }
 
 void serve_stats(int fd)
 {
    union sockaddr_union su;
    int sock,i,retrn;
    socklen_t su_len;
    char f;
    /* we install our signal handler..*/
    signal(SIGTERM,sig_handler);
    signal(SIGHUP,sig_handler);
    signal(SIGPIPE,sig_handler);
    signal(SIGQUIT,sig_handler);
    signal(SIGINT,sig_handler);
    signal(SIGCHLD,sig_handler);
 
    while(1){
       su_len = sizeof(union sockaddr_union);
       sock=-1;
       sock=accept(fd, &su.s, &su_len);
       if(sock==-1){
 	 if(errno==EINTR){
 	    continue;
 	 }else{
b96cb23e
 	    LOG(L_ERR,  "error while accepting connection: %s\n", strerror(errno));
d38650d3
 	    return ;
 	 }
       }
       while(0!=(i=read(sock,&f,1))){
 	 if(i==-1){
 	    if(errno==EINTR){
 	       continue;
 	    }else{
b96cb23e
 	       LOG(L_ERR,"unknown error reading from socket\n"); 
d38650d3
 	       close(sock);
 	       /** and continue accept()'ing*/
 	       break;
 	    }
 	 }
 	 retrn=print_stats_info(f,sock);
 	 if(retrn==-1){
 	    /**simple error happened, dont worry*/
b96cb23e
 	       LOG(L_ERR,"printing statistics \n");
d38650d3
 	       continue;
 	 }else if(retrn==-2){
 	    /**let's go to the outer loop, and receive more Statistics clients*/
b96cb23e
 	    LOG(L_ERR,"Statistics client left\n");
d38650d3
 	    close(sock);
 	    break;
 	 }
       }
    }
 }
 
 /**
  * (from snprintf manual)
  * "The  functions  snprintf()  and vsnprintf() do not write more than size bytes (including the trailing '\0').  If the output was truncated due to
  * this limit then the return value is the number of characters (not including the trailing '\0') which would have been written to the final string
  * if  enough space had been available. Thus, a return value of size or more means that the output was truncated."
  */
 inline int print_stats_info(int f,int sock)
 {
 #define STATS_BUF_SIZE  400
    int j,k,writen;
    char buf[STATS_BUF_SIZE];
 
    writen=0;
    if(0>(k=snprintf(buf,STATS_BUF_SIZE,      "Timings:      0-1   1-2   2-3   3-4   4-5   5-6   6-7   7-8   8-9   9-10  10-11 11-12 12-13 13-14 14+\n"))){
       goto error;
    }else{
       if(k>STATS_BUF_SIZE){
 	 j=STATS_BUF_SIZE;
 	 goto send;
       }
       j=k;
    }
    lock_get(seas_stats_table->mutex);
    if(0>(k=snprintf(&buf[j],STATS_BUF_SIZE-j,"UAS:dispatch: %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d\n",\
 	       seas_stats_table->dispatch[0],seas_stats_table->dispatch[1],seas_stats_table->dispatch[2],seas_stats_table->dispatch[3],seas_stats_table->dispatch[4]\
 	       ,seas_stats_table->dispatch[5],seas_stats_table->dispatch[6],seas_stats_table->dispatch[7],seas_stats_table->dispatch[8],seas_stats_table->dispatch[9],\
 	       seas_stats_table->dispatch[10],seas_stats_table->dispatch[11],seas_stats_table->dispatch[12],seas_stats_table->dispatch[13],seas_stats_table->dispatch[14]))){
       goto error;
    }else{
       if(k>(STATS_BUF_SIZE-j)){
 	 j=STATS_BUF_SIZE;
 	 goto send;
       }
       j+=k;
    }
    if(0>(k=snprintf(&buf[j],STATS_BUF_SIZE-j,"UAS:event:    %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d %-5d\n",\
 	       seas_stats_table->event[0],seas_stats_table->event[1],seas_stats_table->event[2],seas_stats_table->event[3],seas_stats_table->event[4]\
 	       ,seas_stats_table->event[5],seas_stats_table->event[6],seas_stats_table->event[7],seas_stats_table->event[8],seas_stats_table->event[9],\
 	       seas_stats_table->event[10],seas_stats_table->event[11],seas_stats_table->event[12],seas_stats_table->event[13],seas_stats_table->event[14]))){
       goto error;
    }else{
       if(k>STATS_BUF_SIZE-j){
 	 j=STATS_BUF_SIZE;
 	 goto send;
       }
       j+=k;
    }
    if(0>(k=snprintf(&buf[j],STATS_BUF_SIZE-j,"Started Transactions: %d\nTerminated Transactions:%d\nReceived replies:%d\nReceived:%d\n",\
 	       seas_stats_table->started_transactions,seas_stats_table->finished_transactions,seas_stats_table->received_replies,seas_stats_table->received))){
       goto error;
    }else{
       if(k>STATS_BUF_SIZE-j){
 	 j=STATS_BUF_SIZE;
 	 goto send;
       }
       j+=k;
    }
 send:
    lock_release(seas_stats_table->mutex);
 again:/*mutex is released*/
    k=write(sock,buf,j);
    if(k<0){
       switch(errno){
 	 case EINTR:
 	    goto again;
 	 case EPIPE:
 	    return -2;
       }
    }
    writen+=k;
    if(writen<j)
       goto again;
    return writen;
 error:/*mutex is locked*/
    lock_release(seas_stats_table->mutex);
    return -1;
 }
 
 inline void stats_reply()
 {
    lock_get(seas_stats_table->mutex);
    seas_stats_table->received_replies++;
    lock_release(seas_stats_table->mutex);
 }