/* * Copyright (C) 2002-2003 Fhg Fokus * Copyright (C) 2007 iptego GmbH * * This file is part of SEMS, a free SIP media server. * * SEMS is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. This program is released under * the GPL with the additional exemption that compiling, linking, * and/or using OpenSSL is allowed. * * For a license to use the SEMS software under conditions * other than those described here, or to purchase support for this * software, please contact iptel.org by e-mail at the following addresses: * info@iptel.org * * SEMS is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "StatsUDPServer.h" #include "Statistics.h" #include "AmConfigReader.h" #include "AmSessionContainer.h" #include "AmUtils.h" #include "AmConfig.h" #include "log.h" #include "AmPlugIn.h" #include "AmApi.h" #include <string> using std::string; #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h> #include <unistd.h> #include <netinet/in_systm.h> #include <netinet/ip.h> #define CTRL_MSGBUF_SIZE 2048 // int msg_get_line(char*& msg_c, char* str, size_t len) // { // size_t l; // char* s=str; // if(!len) // return 0; // for(l=len; l && (*msg_c) && (*msg_c !='\n'); msg_c++ ){ // if(*msg_c!='\r'){ // *(s++) = *msg_c; // l--; // } // } // if(*msg_c) // msg_c++; // if(l>0){ // // We need one more character // // for trailing '\0'. // *s='\0'; // return int(s-str); // } // else // // buffer overran. // return -1; // } // int msg_get_param(char*& msg_c, string& p) // { // char line_buf[MSG_BUF_SIZE]; // if( msg_get_line(msg_c,line_buf,MSG_BUF_SIZE) != -1 ){ // if(!strcmp(".",line_buf)) // line_buf[0]='\0'; // p = line_buf; // return 0; // } // return -1; // } StatsUDPServer* StatsUDPServer::_instance=0; StatsUDPServer* StatsUDPServer::instance() { if(!_instance) { _instance = new StatsUDPServer(); if(_instance->init() != 0){ delete _instance; _instance = 0; } else { _instance->start(); } } return _instance; } StatsUDPServer::StatsUDPServer() : sd(0) { sc = AmSessionContainer::instance(); } StatsUDPServer::~StatsUDPServer() { if(sd) close(sd); } int StatsUDPServer::init() { string udp_addr; int udp_port = 0; int optval; AmConfigReader cfg; if(cfg.loadFile(add2path(AmConfig::ModConfigPath,1, MOD_NAME ".conf"))) return -1; udp_port = (int)cfg.getParameterInt("monit_udp_port",(unsigned int)-1); if(udp_port == -1){ ERROR("invalid port number in the monit_udp_port parameter\n "); return -1; } if(!udp_port) udp_port = DEFAULT_MONIT_UDP_PORT; DBG("udp_port = %i\n",udp_port); udp_addr = cfg.getParameter("monit_udp_ip",""); sd = socket(PF_INET,SOCK_DGRAM,0); if(sd == -1){ ERROR("could not open socket: %s\n",strerror(errno)); return -1; } /* set sock opts? */ optval=1; /* tos */ optval=IPTOS_LOWDELAY; if (setsockopt(sd, IPPROTO_IP, IP_TOS, (void*)&optval, sizeof(optval)) ==-1){ ERROR("WARNING: setsockopt(tos): %s\n", strerror(errno)); /* continue since this is not critical */ } struct sockaddr_in sa; memset(&sa,0,sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(udp_port); if(!inet_aton(udp_addr.c_str(),(in_addr*)&sa.sin_addr.s_addr)){ // non valid address ERROR("invalid IP in the monit_udp_ip parameter\n"); return -1; } //bool socket_bound = false; //while(!socket_bound){ if( bind(sd,(sockaddr*)&sa,sizeof(struct sockaddr_in)) == -1 ){ ERROR("could not bind socket at port %i: %s\n",udp_port,strerror(errno)); //udp_port += 1; //sa.sin_port = htons(udp_port); return -1; } else { INFO("stats server listening on %s:%i\n",udp_addr.c_str(), udp_port); //socket_bound = true; } //} return 0; } void StatsUDPServer::run() { DBG("running StatsUDPServer...\n"); struct sockaddr_in addr; socklen_t addrlen = sizeof(struct sockaddr_in); char msg_buf[MSG_BUF_SIZE]; int msg_buf_s; while(true){ msg_buf_s = recvfrom(sd,msg_buf,MSG_BUF_SIZE,0,(sockaddr*)&addr,&addrlen); if(msg_buf_s == -1){ switch(errno){ case EINTR: case EAGAIN: continue; default: break; }; ERROR("recvfrom: %s\n",strerror(errno)); break; } //printf("received packet from: %s:%i\n", // inet_ntoa(addr.sin_addr),ntohs(addr.sin_port)); string reply; struct sockaddr_in reply_addr; if(execute(msg_buf,reply,reply_addr) == -1) continue; send_reply(reply,addr); } } static int msg_get_line(char*& msg_c, char* str, size_t len) { size_t l; char* s=str; if(!len) return 0; for(l=len; l && (*msg_c) && (*msg_c !='\n'); msg_c++ ){ *(s++) = *msg_c; l--; } if(*msg_c) msg_c++; if(l>0){ // We need one more character // for trailing '\0'. *s='\0'; return int(s-str); } else { ERROR("buffer too small (size=%u)\n",(unsigned int)len); // buffer overran. return -1; } } static int msg_get_param(char*& msg_c, string& p, char* line_buf, unsigned int size) { if( msg_get_line(msg_c,line_buf,size) != -1 ){ if(!strcmp(".",line_buf)) line_buf[0]='\0'; p = line_buf; } else { ERROR("msg_get_line failed\n"); return -1; } return 0; } int StatsUDPServer::execute(char* msg_buf, string& reply, struct sockaddr_in& addr) { char buffer[CTRL_MSGBUF_SIZE]; string cmd_str,reply_addr,reply_port; char *msg_c = msg_buf; msg_get_param(msg_c,cmd_str,buffer,CTRL_MSGBUF_SIZE); if(cmd_str == "calls") reply = "Active calls: " + int2str(AmSession::getSessionNum()) + "\n"; else if (cmd_str == "which") { reply = "calls - number of active calls (Session Container size)\n" "which - print available commands\n" "version - return SEMS version\n" "set_loglevel <loglevel> - set log level\n" "get_loglevel - get log level\n" "set_cpslimit <limit> - set maximum allowed CPS\n" "get_cpslimit - get maximum allowed CPS\n" "set_shutdownmode <1 or 0> - turns on and off shutdown mode\n" "get_shutdownmode - returns the shutdown mode's current state\n" "get_callsavg - get number of active calls (average since the last query)\n" "get_callsmax - get maximum of active calls since the last query\n" "get_cpsavg - get calls per second (5 sec average)\n" "get_cpsmax - get maximum of CPS since the last query\n" "DI <factory> <function> (<args>)* - invoke DI command\n" "\n" "When in shutdown mode, SEMS will answer with the configured 5xx errorcode to\n" "new INVITE and OPTIONS requests.\n" ; } else if (cmd_str == "version") { reply = SEMS_VERSION; } else if (cmd_str.length() > 4 && cmd_str.substr(0, 4) == "set_") { // setters if (cmd_str.substr(4, 8) == "loglevel") { if (!AmConfig::setLogLevel(&cmd_str.c_str()[13])) reply= "invalid loglevel value.\n"; else reply= "loglevel set to "+int2str(log_level)+".\n"; } else if (cmd_str.substr(4, 8) == "cpslimit") { int tmp; if(sscanf(&cmd_str.c_str()[13],"%u",&tmp) != 1) reply= "invalid CPS limit\n"; else { sc->setCPSLimit(tmp); reply= "CPS limit set to "+int2str(sc->getCPSLimit().first)+".\n"; } } else if (cmd_str.substr(4, 12) == "shutdownmode") { int tmp; if(sscanf(&cmd_str.c_str()[17],"%u",&tmp) != 1) reply= "invalid shutdownmode\n"; else { if(tmp) { AmConfig::ShutdownMode = true; reply= "Shutdownmode activated!\n"; } else { AmConfig::ShutdownMode = false; reply= "Shutdownmode deactivated!\n"; } } } else reply = "Unknown command: '" + cmd_str + "'\n"; } else if (cmd_str.length() > 4 && cmd_str.substr(0, 4) == "get_") { // setters if (cmd_str.substr(4, 8) == "loglevel") { reply= "loglevel is "+int2str(log_level)+".\n"; } else if(cmd_str.substr(4, 8) == "callsavg") reply = "Average active calls: " + int2str(AmSession::getAvgSessionNum()) + "\n"; else if(cmd_str.substr(4, 8) == "callsmax") reply = "Maximum active calls: " + int2str(AmSession::getMaxSessionNum()) + "\n"; else if(cmd_str.substr(4, 6) == "cpsavg") reply = "Average calls per second: " + int2str(sc->getAvgCPS()) + "\n"; else if(cmd_str.substr(4, 6) == "cpsmax") reply = "Maximum calls per second: " + int2str(sc->getMaxCPS()) + "\n"; else if(cmd_str.substr(4, 8) == "cpslimit") reply = "CPS hard limit: " + int2str(sc->getCPSLimit().first) + ", CPS limit: " + int2str(sc->getCPSLimit().second) + "\n"; else if (cmd_str.substr(4, 12) == "shutdownmode") { if(AmConfig::ShutdownMode) { reply= "Shutdownmode active!\n"; } else { reply= "Shutdownmode inactive!\n"; } } else reply = "Unknown command: '" + cmd_str + "'\n"; } else if (cmd_str.length() > 4 && cmd_str.substr(0, 3) == "DI ") { // Dynamic Invocation size_t p = cmd_str.find(' ', 4); string fact_name = cmd_str.substr(3, p-3); if (!fact_name.length()) { reply = "could not parse DI factory name.\n"; return 0; } size_t p2 = cmd_str.find(' ', p+1); if (p2 == string::npos) p2 = cmd_str.length(); string fct_name = cmd_str.substr(p+1, p2-p-1); p=p2+1; if (!fct_name.length()) { reply = "could not parse function name.\n"; return 0; } try { // args need to be stored in string vector, // because stl copyconstructor does not copy // underlying c_str vector<string> s_args; while (p<cmd_str.length()) { p2 = cmd_str.find(' ', p); if (p2 == string::npos) { if (p+1<cmd_str.length()) p2=cmd_str.length(); else break; } s_args.push_back(string(cmd_str.substr(p, p2-p))); p=p2+1; } AmArg args; for (vector<string>::iterator it = s_args.begin(); it != s_args.end(); it++) { args.push(it->c_str()); // DBG("mod '%s' added arg a '%s'\n", // fact_name.c_str(), // it->c_str()); } AmDynInvokeFactory* di_f = AmPlugIn::instance()->getFactory4Di(fact_name); if(!di_f){ reply = "could not get '" + fact_name + "' factory\n"; return 0; } AmDynInvoke* di = di_f->getInstance(); if(!di){ reply = "could not get DI instance from factory\n"; return 0; } AmArg ret; di->invoke(fct_name, args, ret); reply=AmArg::print(ret); } catch (const AmDynInvoke::NotImplemented& e) { reply = "Exception occured: AmDynInvoke::NotImplemented '"+ e.what+"'\n"; return 0; } catch (...) { reply = "Exception occured.\n"; return 0; } } else reply = "Unknown command: '" + cmd_str + "'\n"; return 0; } int StatsUDPServer::send_reply(const string& reply, const struct sockaddr_in& reply_addr) { int err = sendto(sd,reply.c_str(),reply.length()+1,0, (const sockaddr*)&reply_addr, sizeof(struct sockaddr_in)); return (err <= 0) ? -1 : 0; }