/*
 * Copyright (C) 2011 Stefan Sayer
 *
 * This file is part of SEMS, a free SIP media server.
 *
 * SEMS is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * For a license to use the sems software under conditions
 * other than those described here, or to purchase support for this
 * software, please contact iptel.org by e-mail at the following addresses:
 *    info@iptel.org
 *
 * SEMS is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License 
 * along with this program; if not, write to the Free Software 
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

#include "DBRegAgent.h"
#include "AmSession.h"
#include "AmEventDispatcher.h"

#include <unistd.h>
#include <stdlib.h>

EXPORT_MODULE_FACTORY(DBRegAgent);
DEFINE_MODULE_INSTANCE(DBRegAgent, MOD_NAME);

mysqlpp::Connection DBRegAgent::MainDBConnection(mysqlpp::use_exceptions);
mysqlpp::Connection DBRegAgent::ProcessorDBConnection(mysqlpp::use_exceptions);

string DBRegAgent::joined_query;
string DBRegAgent::registrations_table = "registrations";

double DBRegAgent::reregister_interval = 0.5;
double DBRegAgent::minimum_reregister_interval = -1;

bool DBRegAgent::enable_ratelimiting = false;
unsigned int DBRegAgent::ratelimit_rate = 0;
unsigned int DBRegAgent::ratelimit_per = 0;
bool DBRegAgent::ratelimit_slowstart = false;

bool DBRegAgent::delete_removed_registrations = true;
bool DBRegAgent::delete_failed_deregistrations = false;
bool DBRegAgent::save_contacts = true;
bool DBRegAgent::db_read_contact = false;
string DBRegAgent::contact_hostport;
string DBRegAgent::outbound_proxy;
bool DBRegAgent::save_auth_replies = false;

unsigned int DBRegAgent::error_retry_interval = 300;

static void _timer_cb(RegTimer* timer, long subscriber_id, int data2) {
  DBRegAgent::instance()->timer_cb(timer, subscriber_id, data2);
}

DBRegAgent::DBRegAgent(const string& _app_name)
  : AmDynInvokeFactory(_app_name),
    AmEventQueue(this),
    uac_auth_i(NULL)
{
}

DBRegAgent::~DBRegAgent() {
}

int DBRegAgent::onLoad()
{

  DBG("loading db_reg_agent....\n");

  AmDynInvokeFactory* uac_auth_f = AmPlugIn::instance()->getFactory4Di("uac_auth");
  if (uac_auth_f == NULL) {
    WARN("unable to get a uac_auth factory. "
	 "registrations will not be authenticated.\n");
    WARN("(do you want to load uac_auth module?)\n");
  } else {
    uac_auth_i = uac_auth_f->getInstance();
  }

  AmConfigReader cfg;
  if(cfg.loadFile(add2path(AmConfig::ModConfigPath,1, MOD_NAME ".conf")))
    return -1;

  expires = cfg.getParameterInt("expires", 7200);
  DBG("requesting registration expires of %u seconds\n", expires);

  if (cfg.hasParameter("reregister_interval")) {
    reregister_interval = -1;
    reregister_interval = atof(cfg.getParameter("reregister_interval").c_str());
    if (reregister_interval <= 0 || reregister_interval > 1) {
      ERROR("configuration value 'reregister_interval' could not be read. "
	    "needs to be 0 .. 1.0 (recommended: 0.5)\n");
      return -1;
    }
  }

  if (cfg.hasParameter("minimum_reregister_interval")) {
    minimum_reregister_interval = -1;
    minimum_reregister_interval = atof(cfg.getParameter("minimum_reregister_interval").c_str());
    if (minimum_reregister_interval <= 0 || minimum_reregister_interval > 1) {
      ERROR("configuration value 'minimum_reregister_interval' could not be read. "
	    "needs to be 0 .. reregister_interval (recommended: 0.4)\n");
      return -1;
    }

    if (minimum_reregister_interval >= reregister_interval) {
      ERROR("configuration value 'minimum_reregister_interval' must be smaller "
	    "than reregister_interval (recommended: 0.4)\n");
      return -1;
    }
  }

  enable_ratelimiting = cfg.getParameter("enable_ratelimiting") == "yes";
  if (enable_ratelimiting) {
    if (!cfg.hasParameter("ratelimit_rate") || !cfg.hasParameter("ratelimit_per")) {
      ERROR("if ratelimiting is enabled, ratelimit_rate and ratelimit_per must be set\n");
      return -1;
    }
    ratelimit_rate = cfg.getParameterInt("ratelimit_rate", 0);
    ratelimit_per = cfg.getParameterInt("ratelimit_per", 0);
    if (!ratelimit_rate || !ratelimit_per) {
      ERROR("ratelimit_rate and ratelimit_per must be > 0\n");
      return -1;
    }
    ratelimit_slowstart = cfg.getParameter("ratelimit_slowstart") == "yes";

  }

  delete_removed_registrations =
    cfg.getParameter("delete_removed_registrations", "yes") == "yes";

  delete_failed_deregistrations =
    cfg.getParameter("delete_failed_deregistrations", "no") == "yes";

  save_contacts =
    cfg.getParameter("save_contacts", "yes") == "yes";

  db_read_contact =
    cfg.getParameter("db_read_contact", "no") == "yes";

  save_auth_replies =
    cfg.getParameter("save_auth_replies", "no") == "yes";

  contact_hostport = cfg.getParameter("contact_hostport");

  outbound_proxy = cfg.getParameter("outbound_proxy");

  error_retry_interval = cfg.getParameterInt("error_retry_interval", 300);
  if (!error_retry_interval) {
    WARN("disabled retry on errors!\n");
  }

  string mysql_server, mysql_user, mysql_passwd, mysql_db;

  mysql_server = cfg.getParameter("mysql_server", "localhost");

  mysql_user = cfg.getParameter("mysql_user");
  if (mysql_user.empty()) {
    ERROR(MOD_NAME ".conf parameter 'mysql_user' is missing.\n");
    return -1;
  }

  mysql_passwd = cfg.getParameter("mysql_passwd");
  if (mysql_passwd.empty()) {
    ERROR(MOD_NAME ".conf parameter 'mysql_passwd' is missing.\n");
    return -1;
  }

  mysql_db = cfg.getParameter("mysql_db", "sems");

  try {

    MainDBConnection.set_option(new mysqlpp::ReconnectOption(true));
    // matched instead of changed rows in result, so we know when to create DB entry
    MainDBConnection.set_option(new mysqlpp::FoundRowsOption(true));
    MainDBConnection.connect(mysql_db.c_str(), mysql_server.c_str(),
                      mysql_user.c_str(), mysql_passwd.c_str());
    if (!MainDBConnection) {
      ERROR("Database connection failed: %s\n", MainDBConnection.error());
      return -1;
    }

    ProcessorDBConnection.set_option(new mysqlpp::ReconnectOption(true));
    // matched instead of changed rows in result, so we know when to create DB entry
    ProcessorDBConnection.set_option(new mysqlpp::FoundRowsOption(true));
    ProcessorDBConnection.connect(mysql_db.c_str(), mysql_server.c_str(),
                      mysql_user.c_str(), mysql_passwd.c_str());
    if (!ProcessorDBConnection) {
      ERROR("Database connection failed: %s\n", ProcessorDBConnection.error());
      return -1;
    }

  } catch (const mysqlpp::Exception& er) {
    // Catch-all for any MySQL++ exceptions
    ERROR("MySQL++ error: %s\n", er.what());
    return -1;
  }

  // register us as SIP event receiver for MOD_NAME
  AmEventDispatcher::instance()->addEventQueue(MOD_NAME,this);

  if (!AmPlugIn::registerDIInterface(MOD_NAME, this)) {
    ERROR("registering "MOD_NAME" DI interface\n");
    return -1;
  }

  joined_query = cfg.getParameter("joined_query");
  if (joined_query.empty()) {
    // todo: name!
    ERROR("joined_query must be set\n");
    return -1;
  }

  if (cfg.hasParameter("registrations_table")) {
    registrations_table = cfg.getParameter("registrations_table");
  }
  DBG("using registrations table '%s'\n", registrations_table.c_str());

  if (!loadRegistrations()) {
    ERROR("loading registrations from DB\n");
    return -1;
  }

  DBG("starting registration timer thread...\n");
  registration_scheduler.start();

  // run_tests();

  start();

  return 0;
}

void DBRegAgent::onUnload() {
  if (running) {
    running = false;
    registration_scheduler._timer_thread_running = false;
    DBG("unclean shutdown. Waiting for processing thread to stop.\n");
    for (int i=0;i<400;i++) {
      if (shutdown_finished && registration_scheduler._shutdown_finished)
	break;
      usleep(2000); // 2ms
    }

    if (!shutdown_finished || !registration_scheduler._shutdown_finished) {
      WARN("processing thread could not be stopped, process will probably crash\n");
    }
  }

  DBG("closing main DB connection\n");
  MainDBConnection.disconnect();
  DBG("closing auxiliary DB connection\n");
  ProcessorDBConnection.disconnect();
}

bool DBRegAgent::loadRegistrations() {
  try {
    time_t now_time = time(NULL);

    mysqlpp::Query query = DBRegAgent::MainDBConnection.query();

    string query_string, table;

    query_string = joined_query;

    DBG("querying all registrations with : '%s'\n",
	query_string.c_str());

    query << query_string;
    mysqlpp::UseQueryResult res = query.use();
    
    // mysqlpp::Row::size_type row_count = res.num_rows();
    // DBG("got %zd subscriptions\n", row_count);

    while (mysqlpp::Row row = res.fetch_row()) {
      int status = 0; 
      long subscriber_id = row[COLNAME_SUBSCRIBER_ID];

      string contact_uri;
      if (db_read_contact && row[COLNAME_CONTACT] != mysqlpp::null) {
	contact_uri = (string) row[COLNAME_CONTACT];
      }

      if (row[COLNAME_STATUS] != mysqlpp::null)
	status = row[COLNAME_STATUS];
      else {
	DBG("registration status entry for id %ld does not exist, creating...\n",
	    subscriber_id);
	createDBRegistration(subscriber_id, ProcessorDBConnection);
      }

      DBG("got subscriber '%s@%s' status %i\n",
	  string(row[COLNAME_USER]).c_str(), string(row[COLNAME_REALM]).c_str(),
	  status);      

      switch (status) {
      case REG_STATUS_INACTIVE:
      case REG_STATUS_PENDING: // try again
      case REG_STATUS_FAILED:  // try again
	{
	  createRegistration(subscriber_id,
			     (string)row[COLNAME_USER],
			     (string)row[COLNAME_PASS],
			     (string)row[COLNAME_REALM],
			     contact_uri
			     );
	  scheduleRegistration(subscriber_id);
	}; break;

      case REG_STATUS_ACTIVE:
	{
	  createRegistration(subscriber_id,
			     (string)row[COLNAME_USER],
			     (string)row[COLNAME_PASS],
			     (string)row[COLNAME_REALM],
			     contact_uri
			     );

	  time_t dt_expiry = now_time;
	  if (row[COLNAME_EXPIRY] != mysqlpp::null) {
	    dt_expiry = (time_t)((mysqlpp::DateTime)row[COLNAME_EXPIRY]);
	  }

	  time_t dt_registration_ts = now_time;
	  if (row[COLNAME_REGISTRATION_TS] != mysqlpp::null) {
	    dt_registration_ts = (time_t)((mysqlpp::DateTime)row[COLNAME_REGISTRATION_TS]);
	  }

	  DBG("got expiry '%ld, registration_ts %ld, now %ld'\n",
	      dt_expiry, dt_registration_ts, now_time);

	  if (dt_registration_ts > now_time) {
	    WARN("needed to sanitize last_registration timestamp TS from the %ld (now %ld) - "
		 "DB host time mismatch?\n", dt_registration_ts, now_time);
	    dt_registration_ts = now_time;
	  }

	  // if expired add to pending registrations, else schedule re-regstration
	  if (dt_expiry <= now_time) {
	    DBG("scheduling imminent re-registration for subscriber %ld\n", subscriber_id);
	    scheduleRegistration(subscriber_id);
	  } else {
	    setRegistrationTimer(subscriber_id, dt_expiry, dt_registration_ts, now_time);
	  }
	  
	}; break;
      case REG_STATUS_REMOVED:
	{
	  DBG("ignoring removed registration %ld %s@%s", subscriber_id,
	      ((string)row[COLNAME_USER]).c_str(), ((string)row[COLNAME_REALM]).c_str());
	} break;

      case REG_STATUS_TO_BE_REMOVED:
	{
	  DBG("Scheduling Deregister of registration %ld %s@%s", subscriber_id,
	      ((string)row[COLNAME_USER]).c_str(), ((string)row[COLNAME_REALM]).c_str());
	  createRegistration(subscriber_id,
			     (string)row[COLNAME_USER],
			     (string)row[COLNAME_PASS],
			     (string)row[COLNAME_REALM],
			     contact_uri
			     );
	  scheduleDeregistration(subscriber_id);
	};
      }
    }


  } catch (const mysqlpp::Exception& er) {
    // Catch-all for any MySQL++ exceptions
    ERROR("MySQL++ error: %s\n", er.what());
    return false;
  }

  return true;
}

/** create registration in our list */
void DBRegAgent::createRegistration(long subscriber_id,
				    const string& user,
				    const string& pass,
				    const string& realm,
				    const string& contact) {

  string contact_uri = contact;
  if (contact_uri.empty() && !contact_hostport.empty()) {
    contact_uri = "sip:"+ user + "@" + contact_hostport;
  }

  string handle = AmSession::getNewId();
  SIPRegistrationInfo reg_info(realm, user,
			       user, // name
			       user, // auth_user
			       pass,
			       outbound_proxy, // proxy
			       contact_uri // contact
			       );

  registrations_mut.lock();
  try {
    if (registrations.find(subscriber_id) != registrations.end()) {
      registrations_mut.unlock();
      WARN("registration with ID %ld already exists, removing\n", subscriber_id);
      removeRegistration(subscriber_id);
      clearRegistrationTimer(subscriber_id);
      registrations_mut.lock();
    }

    AmSIPRegistration* reg = new AmSIPRegistration(handle, reg_info, "" /*MOD_NAME*/);
    reg->setExpiresInterval(expires);

    registrations[subscriber_id] = reg;
    registration_ltags[handle] = subscriber_id;

    if (NULL != uac_auth_i) {
      DBG("enabling UAC Auth for new registration.\n");
      
      // get a sessionEventHandler from uac_auth
      AmArg di_args,ret;
      AmArg a;
      a.setBorrowedPointer(reg);
      di_args.push(a);
      di_args.push(a);
      
      uac_auth_i->invoke("getHandler", di_args, ret);
      if (!ret.size()) {
	ERROR("Can not add auth handler to new registration!\n");
      } else {
	AmObject* p = ret.get(0).asObject();
	if (p != NULL) {
	  AmSessionEventHandler* h = dynamic_cast<AmSessionEventHandler*>(p);	
	  if (h != NULL)
	    reg->setSessionEventHandler(h);
	}
      }
    }
  } catch (const AmArg::OutOfBoundsException& e) {
    ERROR("OutOfBoundsException");
  } catch (const AmArg::TypeMismatchException& e) {
    ERROR("TypeMismatchException");
  } catch (...) {
    ERROR("unknown exception occured\n");
  }

  registrations_mut.unlock();

  // register us as SIP event receiver for this ltag
  AmEventDispatcher::instance()->addEventQueue(handle,this);

  DBG("created new registration with ID %ld and ltag '%s'\n",
      subscriber_id, handle.c_str());
}

void DBRegAgent::updateRegistration(long subscriber_id,
				    const string& user,
				    const string& pass,
				    const string& realm,
				    const string& contact) {

  registrations_mut.lock();
  map<long, AmSIPRegistration*>::iterator it=registrations.find(subscriber_id);
  if (it == registrations.end()) {
    registrations_mut.unlock();
    WARN("updateRegistration - registration %ld %s@%s unknown, creating\n",
	 subscriber_id, user.c_str(), realm.c_str());
    createRegistration(subscriber_id, user, pass, realm, contact);
    scheduleRegistration(subscriber_id);
    return;
  }

  bool need_reregister = it->second->getInfo().domain != realm
    || it->second->getInfo().user != user
    || it->second->getInfo().pwd  != pass
    || it->second->getInfo().contact != contact;

  string old_realm = it->second->getInfo().domain;
  string old_user = it->second->getInfo().user;
  it->second->setRegistrationInfo(SIPRegistrationInfo(realm, user,
						      user, // name
						      user, // auth_user
						      pass,
						      outbound_proxy,   // proxy
						      contact)); // contact
  registrations_mut.unlock();
  if (need_reregister) {
    DBG("user/realm for registration %ld changed (%s@%s -> %s@%s). "
	"Triggering immediate re-registration\n",
	subscriber_id, old_user.c_str(), old_realm.c_str(), user.c_str(), realm.c_str());
    scheduleRegistration(subscriber_id);
  }
}

/** remove registration from our list */
void DBRegAgent::removeRegistration(long subscriber_id) {
  bool res = false;
  string handle;
  registrations_mut.lock();
  map<long, AmSIPRegistration*>::iterator it = registrations.find(subscriber_id);
  if (it != registrations.end()) {
    handle = it->second->getHandle();
    registration_ltags.erase(handle);
    delete it->second;
    registrations.erase(it);
    res = true;
  }
  registrations_mut.unlock();

  if (res) {
    // deregister us as SIP event receiver for this ltag
    AmEventDispatcher::instance()->delEventQueue(handle);

    DBG("removed registration with ID %ld\n", subscriber_id);
  } else {
    DBG("registration with ID %ld not found for removing\n", subscriber_id);
  }
}

/** schedule this registration to REGISTER (immediately) */
void DBRegAgent::scheduleRegistration(long subscriber_id) {
  if (enable_ratelimiting) {
    registration_processor.
      postEvent(new RegistrationActionEvent(RegistrationActionEvent::Register,
					    subscriber_id));
  } else {
    // use our own thread
    postEvent(new RegistrationActionEvent(RegistrationActionEvent::Register,
					  subscriber_id));
  }
  DBG("added to pending actions: REGISTER of %ld\n", subscriber_id);
}

/** schedule this registration to de-REGISTER (immediately) */
void DBRegAgent::scheduleDeregistration(long subscriber_id) {
  if (enable_ratelimiting) {
    registration_processor.
      postEvent(new RegistrationActionEvent(RegistrationActionEvent::Deregister,
					    subscriber_id));
  } else {
    // use our own thread
      postEvent(new RegistrationActionEvent(RegistrationActionEvent::Deregister,
					    subscriber_id));
  }
  DBG("added to pending actions: DEREGISTER of %ld\n", subscriber_id);
}

void DBRegAgent::process(AmEvent* ev) {

  if (ev->event_id == RegistrationActionEventID) {
    RegistrationActionEvent* reg_action_ev =
      dynamic_cast<RegistrationActionEvent*>(ev);
    if (reg_action_ev) {
      onRegistrationActionEvent(reg_action_ev);
      return;
    }
  }

  AmSipReplyEvent* sip_rep = dynamic_cast<AmSipReplyEvent*>(ev);
  if (sip_rep) {
      onSipReplyEvent(sip_rep);
    return;
  }

  if (ev->event_id == E_SYSTEM) {
    AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev);
    if(sys_ev){	
      DBG("Session received system Event\n");
      if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
	running = false;
	registration_scheduler._timer_thread_running = false;
      }
      return;
    }
  }

  ERROR("unknown event received!\n");
}

// uses ProcessorDBConnection
void DBRegAgent::onRegistrationActionEvent(RegistrationActionEvent* reg_action_ev) {
  switch (reg_action_ev->action) {
  case RegistrationActionEvent::Register:
    {
      DBG("REGISTER of registration %ld\n", reg_action_ev->subscriber_id);
      registrations_mut.lock();
      map<long, AmSIPRegistration*>::iterator it=
	registrations.find(reg_action_ev->subscriber_id);
      if (it==registrations.end()) {
	DBG("ignoring scheduled REGISTER of unknown registration %ld\n",
	    reg_action_ev->subscriber_id);
      } else {
	if (!it->second->doRegistration()) {
	  updateDBRegistration(ProcessorDBConnection,
			       reg_action_ev->subscriber_id,
			       480, ERR_REASON_UNABLE_TO_SEND_REQUEST,
			       true, REG_STATUS_FAILED);
	  if (error_retry_interval) {
	    // schedule register-refresh after error_retry_interval
	    setRegistrationTimer(reg_action_ev->subscriber_id, error_retry_interval,
				 RegistrationActionEvent::Register);
	  }
	}
      }
      registrations_mut.unlock();
    } break;
  case RegistrationActionEvent::Deregister:
    {
      DBG("De-REGISTER of registration %ld\n", reg_action_ev->subscriber_id);
      registrations_mut.lock();
      map<long, AmSIPRegistration*>::iterator it=
	registrations.find(reg_action_ev->subscriber_id);
      if (it==registrations.end()) {
	DBG("ignoring scheduled De-REGISTER of unknown registration %ld\n",
	    reg_action_ev->subscriber_id);
      } else {
	if (!it->second->doUnregister()) {
	  if (delete_removed_registrations && delete_failed_deregistrations) {
	    DBG("sending de-Register failed - deleting registration %ld "
		"(delete_failed_deregistrations=yes)\n", reg_action_ev->subscriber_id);
	    deleteDBRegistration(reg_action_ev->subscriber_id, ProcessorDBConnection);
	  } else {
	    DBG("failed sending de-register, updating DB with REG_STATUS_TO_BE_REMOVED "
		ERR_REASON_UNABLE_TO_SEND_REQUEST "for subscriber %ld\n",
		reg_action_ev->subscriber_id);
	    updateDBRegistration(ProcessorDBConnection,
				 reg_action_ev->subscriber_id,
				 480, ERR_REASON_UNABLE_TO_SEND_REQUEST,
				 true, REG_STATUS_TO_BE_REMOVED);
	    // don't re-try de-registrations if sending failed
	  // if (error_retry_interval) {
	  //   // schedule register-refresh after error_retry_interval
	  //   setRegistrationTimer(reg_action_ev->subscriber_id, error_retry_interval,
	  // 			 RegistrationActionEvent::Deregister);
	  // }
	  }
	}
      }
      registrations_mut.unlock();
    } break;
  }
}

void DBRegAgent::createDBRegistration(long subscriber_id, mysqlpp::Connection& conn) {
  string insert_query = "insert into "+registrations_table+
    " (subscriber_id) values ("+
    long2str(subscriber_id)+");";

  try {
    mysqlpp::Query query = conn.query();
    query << insert_query;

    mysqlpp::SimpleResult res = query.execute();
    if (!res) {
      WARN("creating registration in DB with query '%s' failed: '%s'\n",
	   insert_query.c_str(), res.info());
    }
  }  catch (const mysqlpp::Exception& er) {
    // Catch-all for any MySQL++ exceptions
    ERROR("MySQL++ error: %s\n", er.what());
    return;
  }
}

void DBRegAgent::deleteDBRegistration(long subscriber_id, mysqlpp::Connection& conn) {
  string insert_query = "delete from "+registrations_table+
    " where subscriber_id=" +  long2str(subscriber_id)+";";

  try {
    mysqlpp::Query query = conn.query();
    query << insert_query;

    mysqlpp::SimpleResult res = query.execute();
    if (!res) {
      WARN("removing registration in DB with query '%s' failed: '%s'\n",
	   insert_query.c_str(), res.info());
    }
  }  catch (const mysqlpp::Exception& er) {
    // Catch-all for any MySQL++ exceptions
    ERROR("MySQL++ error: %s\n", er.what());
    return;
  }
}

void DBRegAgent::updateDBRegistration(mysqlpp::Connection& db_connection,
				      long subscriber_id, int last_code,
				      const string& last_reason,
				      bool update_status, int status,
				      bool update_ts, unsigned int expiry,
				      bool update_contacts, const string& contacts) {
  try {

    mysqlpp::Query query = db_connection.query();

    query << "update "+registrations_table+" set last_code="+ int2str(last_code) +", ";
    query << "last_reason=";
    query << mysqlpp::quote << last_reason;

    if (update_status) {
      query <<  ", registration_status="+int2str(status);
    }

    if (update_ts) {
      query << ", last_registration=NOW(), "
	"expiry=TIMESTAMPADD(SECOND,"+int2str(expiry)+", NOW())";
    }

    if (update_contacts) {
      query << ", contacts=" << mysqlpp::quote << contacts;
    }

    query << " where " COLNAME_SUBSCRIBER_ID "="+long2str(subscriber_id) + ";";
    string query_str = query.str();
    DBG("updating registration in DB with query '%s'\n", query_str.c_str());

    mysqlpp::SimpleResult res = query.execute();
    if (!res) {
      WARN("updating registration in DB with query '%s' failed: '%s'\n",
	   query_str.c_str(), res.info());
    } else {
      if (!res.rows()) {
	// should not happen - DB entry is created on load or on createRegistration
	DBG("creating registration DB entry for subscriber %ld\n", subscriber_id);
	createDBRegistration(subscriber_id, db_connection);
	query.reset();
	query << query_str;

	mysqlpp::SimpleResult res = query.execute();
	if (!res || !res.rows()) {
	  WARN("updating registration in DB with query '%s' failed: '%s'\n",
	       query_str.c_str(), res.info());
	}
      }
    }

  }  catch (const mysqlpp::Exception& er) {
    // Catch-all for any MySQL++ exceptions
    ERROR("MySQL++ error: %s\n", er.what());
    return;
  }

}

// uses MainDBConnection
void DBRegAgent::onSipReplyEvent(AmSipReplyEvent* ev) {
  if (!ev) return;

  DBG("received SIP reply event for '%s'\n", 
#ifdef HAS_OFFER_ANSWER
      ev->reply.from_tag.c_str()
#else
      ev->reply.local_tag.c_str()
#endif
      );
  registrations_mut.lock();

  string local_tag =
#ifdef HAS_OFFER_ANSWER
    ev->reply.from_tag;
#else
    ev->reply.local_tag;
#endif
    
  map<string, long>::iterator it=registration_ltags.find(local_tag);
  if (it!=registration_ltags.end()) {
    long subscriber_id = it->second;
    map<long, AmSIPRegistration*>::iterator r_it=registrations.find(subscriber_id);
    if (r_it != registrations.end()) {
      AmSIPRegistration* registration = r_it->second;
      if (!registration) {
	ERROR("Internal error: registration object missing\n");
	return;
      }
      unsigned int cseq_before = registration->getDlg()->cseq;

#ifdef HAS_OFFER_ANSWER
      registration->getDlg()->onRxReply(ev->reply);
#else
      registration->getDlg()->updateStatus(ev->reply);
#endif

      //update registrations set 
      bool update_status = false;
      int status = 0;
      bool update_ts = false;
      unsigned int expiry = 0;
      bool delete_status = false;
      bool auth_pending = false;

      if (ev->reply.code >= 300) {
	// REGISTER or de-REGISTER failed
	if ((ev->reply.code == 401 || ev->reply.code == 407) &&
	    // auth response codes
	    // processing reply triggered sending request: resent by auth
	    (cseq_before != registration->getDlg()->cseq)) {
	  DBG("received negative reply, but still in pending state (auth).\n");
	  auth_pending = true;
	} else {
	  if (!registration->getUnregistering()) {
	    // REGISTER failed - mark in DB
	    DBG("registration failed - mark in DB\n");
	    update_status = true;
	    status = REG_STATUS_FAILED;
	    if (error_retry_interval) {
	      // schedule register-refresh after error_retry_interval
	      setRegistrationTimer(subscriber_id, error_retry_interval,
				   RegistrationActionEvent::Register);
	    }
	  } else {
	    // de-REGISTER failed
	    if (delete_removed_registrations && delete_failed_deregistrations) {
	      DBG("de-Register failed - deleting registration %ld "
		  "(delete_failed_deregistrations=yes)\n", subscriber_id);
	      delete_status = true;
	    } else {
	      update_status = true;
	      status = REG_STATUS_TO_BE_REMOVED;
	    }
	  }
	}
      } else if (ev->reply.code >= 200) {
	// positive reply
	if (!registration->getUnregistering()) {
	  time_t now_time = time(0);
	  setRegistrationTimer(subscriber_id, registration->getExpiresTS(),
			       now_time, now_time);

	  update_status = true;
	  status = REG_STATUS_ACTIVE;

	  update_ts = true;
	  expiry = registration->getExpiresLeft();
	} else {
	  if (delete_removed_registrations) {
	    delete_status = true;
	  } else {
	    update_status = true;
	    status = REG_STATUS_REMOVED;
	  }
	}
      }

      // skip provisional replies & auth
      if (ev->reply.code >= 200 && !auth_pending) {
	// remove unregistered
	if (registration->getUnregistering()) {
	  registrations_mut.unlock();
	  removeRegistration(subscriber_id);
	  registrations_mut.lock();
	}
      }

      if (!delete_status) {
	if (auth_pending && !save_auth_replies) {
	  DBG("not updating DB with auth reply %u %s\n",
	      ev->reply.code, ev->reply.reason.c_str());
	} else {
	  DBG("update DB with reply %u %s\n", ev->reply.code, ev->reply.reason.c_str());
	  updateDBRegistration(MainDBConnection,
			       subscriber_id, ev->reply.code, ev->reply.reason,
			       update_status, status, update_ts, expiry,
			       save_contacts, ev->reply.contact);
	}
      } else {
	DBG("delete DB registration of subscriber %ld\n", subscriber_id);
	deleteDBRegistration(subscriber_id, MainDBConnection);
      }

    } else {
      ERROR("internal: inconsistent registration list\n");
    }
  } else {
    DBG("ignoring reply for unknown registration\n");
  }
  registrations_mut.unlock();
}

void DBRegAgent::run() {
  running = shutdown_finished = true;

  DBG("DBRegAgent thread: waiting 2 sec for server startup ...\n");
  sleep(2);
  
  mysqlpp::Connection::thread_start();

  if (enable_ratelimiting) {
    DBG("starting processor thread\n");
    registration_processor.start();
  }

  DBG("running DBRegAgent thread...\n");
  shutdown_finished = false;
  while (running) {
    processEvents();

    usleep(1000); // 1ms
  }

  DBG("DBRegAgent done, removing all registrations from Event Dispatcher...\n");
  registrations_mut.lock();
  for (map<string, long>::iterator it=registration_ltags.begin();
       it != registration_ltags.end(); it++) {
    AmEventDispatcher::instance()->delEventQueue(it->first);
  }
  registrations_mut.unlock();

  DBG("removing "MOD_NAME" registrations from Event Dispatcher...\n");
  AmEventDispatcher::instance()->delEventQueue(MOD_NAME);

  mysqlpp::Connection::thread_end();

  DBG("DBRegAgent thread stopped.\n");
  shutdown_finished = true;
}

void DBRegAgent::on_stop() {
  DBG("DBRegAgent on_stop()...\n");
  running = false;
}

void DBRegAgent::setRegistrationTimer(long subscriber_id, unsigned int timeout,
				      RegistrationActionEvent::RegAction reg_action) {
  DBG("setting Register timer for subscription %ld, timeout %u, reg_action %u\n",
      subscriber_id, timeout, reg_action);

  RegTimer* timer = NULL;
  map<long, RegTimer*>::iterator it=registration_timers.find(subscriber_id);
  if (it==registration_timers.end()) {
    DBG("timer object for subscription %ld not found\n", subscriber_id);
    timer = new RegTimer();
    timer->data1 = subscriber_id;
    timer->cb = _timer_cb;
    DBG("created timer object [%p] for subscription %ld\n", timer, subscriber_id);
  } else {
    timer = it->second;
    DBG("removing scheduled timer...\n");
    registration_scheduler.remove_timer(timer);
  }

  timer->data2 = reg_action;
  timer->expires = time(0) + timeout;

  DBG("placing timer for %ld in T-%u\n", subscriber_id, timeout);
  registration_scheduler.insert_timer(timer);

  registration_timers.insert(std::make_pair(subscriber_id, timer));

}

void DBRegAgent::setRegistrationTimer(long subscriber_id,
				      time_t expiry, time_t reg_start_ts,
				      time_t now_time) {
  DBG("setting re-Register timer for subscription %ld, expiry %ld, reg_start_t %ld\n",
      subscriber_id, expiry, reg_start_ts);

  RegTimer* timer = NULL;
  map<long, RegTimer*>::iterator it=registration_timers.find(subscriber_id);
  if (it==registration_timers.end()) {
    DBG("timer object for subscription %ld not found\n", subscriber_id);
    timer = new RegTimer();
    timer->data1 = subscriber_id;
    timer->cb = _timer_cb;
    DBG("created timer object [%p] for subscription %ld\n", timer, subscriber_id);
    registration_timers.insert(std::make_pair(subscriber_id, timer));
  } else {
    timer = it->second;
    DBG("removing scheduled timer...\n");
    registration_scheduler.remove_timer(timer);
  }

  timer->data2 = RegistrationActionEvent::Register;

  if (minimum_reregister_interval>0.0) {
    time_t t_expiry_max = reg_start_ts;
    time_t t_expiry_min = reg_start_ts;
    if (expiry > reg_start_ts)
      t_expiry_max+=(expiry - reg_start_ts) * reregister_interval;
    if (expiry > reg_start_ts)
      t_expiry_min+=(expiry - reg_start_ts) * minimum_reregister_interval;

    if (t_expiry_max < now_time) {
      // calculated interval completely in the past - immediate re-registration
      // by setting the timer to now
      t_expiry_max = now_time;
    }

    if (t_expiry_min > t_expiry_max)
      t_expiry_min = t_expiry_max;

    timer->expires = t_expiry_max;

    if (t_expiry_max == now_time) {
      // immediate re-registration
      DBG("calculated re-registration at TS <now> (%ld)"
	  "(reg_start_ts=%ld, reg_expiry=%ld, reregister_interval=%f, "
	  "minimum_reregister_interval=%f)\n",
	  t_expiry_max, reg_start_ts, expiry,
	  reregister_interval, minimum_reregister_interval);
      registration_scheduler.insert_timer(timer);
    } else {
      DBG("calculated re-registration at TS %ld .. %ld"
	  "(reg_start_ts=%ld, reg_expiry=%ld, reregister_interval=%f, "
	  "minimum_reregister_interval=%f)\n",
	  t_expiry_min, t_expiry_max, reg_start_ts, expiry,
	  reregister_interval, minimum_reregister_interval);
  
      registration_scheduler.insert_timer_leastloaded(timer, t_expiry_min, t_expiry_max);
    }
  } else {
    time_t t_expiry = reg_start_ts;
    if (expiry > reg_start_ts)
      t_expiry+=(expiry - reg_start_ts) * reregister_interval;

    if (t_expiry < now_time) {
      t_expiry = now_time;
      DBG("re-registering at TS <now> (%ld)\n", now_time);
    }

    DBG("calculated re-registration at TS %ld "
	"(reg_start_ts=%ld, reg_expiry=%ld, reregister_interval=%f)\n",
	t_expiry, reg_start_ts, expiry, reregister_interval);

    timer->expires = t_expiry;    
    registration_scheduler.insert_timer(timer);
  }
}

void DBRegAgent::clearRegistrationTimer(long subscriber_id) {
  DBG("removing timer for subscription %ld", subscriber_id);

  map<long, RegTimer*>::iterator it=registration_timers.find(subscriber_id);
  if (it==registration_timers.end()) {
    DBG("timer object for subscription %ld not found\n", subscriber_id);
      return;
  }
  DBG("removing timer [%p] from scheduler\n", it->second);
  registration_scheduler.remove_timer(it->second);

  DBG("deleting timer object [%p]\n", it->second);
  delete it->second;

  registration_timers.erase(it);
}

void DBRegAgent::removeRegistrationTimer(long subscriber_id) {
  DBG("removing timer object for subscription %ld", subscriber_id);

  map<long, RegTimer*>::iterator it=registration_timers.find(subscriber_id);
  if (it==registration_timers.end()) {
    DBG("timer object for subscription %ld not found\n", subscriber_id);
    return;
  }

  DBG("deleting timer object [%p]\n", it->second);
  delete it->second;

  registration_timers.erase(it);
}

void DBRegAgent::timer_cb(RegTimer* timer, long subscriber_id, int reg_action) {
  DBG("re-registration timer expired: subscriber %ld, timer=[%p], action %d\n",
      subscriber_id, timer, reg_action);

  registrations_mut.lock();
  removeRegistrationTimer(subscriber_id);
  registrations_mut.unlock();
  switch (reg_action) {
  case RegistrationActionEvent::Register:
    scheduleRegistration(subscriber_id); break;
  case RegistrationActionEvent::Deregister:
    scheduleDeregistration(subscriber_id); break;
  default: ERROR("internal: unknown reg_action %d for subscriber %ld timer event\n",
		 reg_action, subscriber_id);
  };
}


void DBRegAgent::DIcreateRegistration(int subscriber_id, const string& user, 
				      const string& pass, const string& realm,
				      const string& contact,
				      AmArg& ret) {
  DBG("DI method: createRegistration(%i, %s, %s, %s, %s)\n",
      subscriber_id, user.c_str(),
      pass.c_str(), realm.c_str(), contact.c_str());

  createRegistration(subscriber_id, user, pass, realm, contact);
  scheduleRegistration(subscriber_id);
  ret.push(200);
  ret.push("OK");
}

void DBRegAgent::DIupdateRegistration(int subscriber_id, const string& user, 
				      const string& pass, const string& realm,
				      const string& contact,
				      AmArg& ret) {
  DBG("DI method: updateRegistration(%i, %s, %s, %s)\n",
      subscriber_id, user.c_str(),
      pass.c_str(), realm.c_str());

  string contact_uri = contact;
  if (contact_uri.empty() && !contact_hostport.empty()) {
    contact_uri = "sip:"+ user + "@" + contact_hostport;
  }

  updateRegistration(subscriber_id, user, pass, realm, contact_uri);

  ret.push(200);
  ret.push("OK");
}

void DBRegAgent::DIremoveRegistration(int subscriber_id, AmArg& ret) {
  DBG("DI method: removeRegistration(%i)\n",
      subscriber_id);
  scheduleDeregistration(subscriber_id);

  registrations_mut.lock();
  clearRegistrationTimer(subscriber_id);
  registrations_mut.unlock();

  ret.push(200);
  ret.push("OK");
}

void DBRegAgent::DIrefreshRegistration(int subscriber_id, AmArg& ret) {
  DBG("DI method: refreshRegistration(%i)\n", subscriber_id);
  scheduleRegistration(subscriber_id);

  ret.push(200);
  ret.push("OK");
}

// ///////// DI API ///////////////////

void DBRegAgent::invoke(const string& method,
			const AmArg& args, AmArg& ret)
{
  if (method == "createRegistration"){
    args.assertArrayFmt("isss"); // subscriber_id, user, pass, realm
    string contact;
    if (args.size() > 4) {
      assertArgCStr(args.get(4));
      contact = args.get(4).asCStr();
    }
    DIcreateRegistration(args.get(0).asInt(), args.get(1).asCStr(), 
			 args.get(2).asCStr(),args.get(3).asCStr(),
			 contact, ret);
  } else if (method == "updateRegistration"){
    args.assertArrayFmt("isss"); // subscriber_id, user, pass, realm
    string contact;
    if (args.size() > 4) {
      assertArgCStr(args.get(4));
      contact = args.get(4).asCStr();
    }
    DIupdateRegistration(args.get(0).asInt(), args.get(1).asCStr(),
			 args.get(2).asCStr(),args.get(3).asCStr(),
			 contact, ret);
  } else if (method == "removeRegistration"){
    args.assertArrayFmt("i"); // subscriber_id
    DIremoveRegistration(args.get(0).asInt(), ret);
  } else if (method == "refreshRegistration"){
    args.assertArrayFmt("i"); // subscriber_id
    DIrefreshRegistration(args.get(0).asInt(), ret);
  }  else if(method == "_list"){
    ret.push(AmArg("createRegistration"));
    ret.push(AmArg("updateRegistration"));
    ret.push(AmArg("removeRegistration"));
    ret.push(AmArg("refreshRegistration"));
  }  else
    throw AmDynInvoke::NotImplemented(method);
}

// /////////////// processor thread /////////////////

DBRegAgentProcessorThread::DBRegAgentProcessorThread()
  : AmEventQueue(this), stopped(false) {
}

DBRegAgentProcessorThread::~DBRegAgentProcessorThread() {
}

void DBRegAgentProcessorThread::on_stop() {
}

void DBRegAgentProcessorThread::rateLimitWait() {
  DBG("applying rate limit %u initial requests per %us\n",
      DBRegAgent::ratelimit_rate, DBRegAgent::ratelimit_per);

  DBG("allowance before ratelimit: %f\n", allowance);

  struct timeval current;
  struct timeval time_passed;
  gettimeofday(&current, 0);
  timersub(&current, &last_check, &time_passed);
  memcpy(&last_check, &current, sizeof(struct timeval));
  double seconds_passed = (double)time_passed.tv_sec +
    (double)time_passed.tv_usec / 1000000.0;
  allowance += seconds_passed * 
    (double) DBRegAgent::ratelimit_rate / (double)DBRegAgent::ratelimit_per;

  if (allowance > (double)DBRegAgent::ratelimit_rate)
    allowance = (double)DBRegAgent::ratelimit_rate; // enough time passed, but limit to max
  if (allowance < 1.0) {
    useconds_t sleep_time = 1000000.0 * (1.0 - allowance) *
      ((double)DBRegAgent::ratelimit_per/(double)DBRegAgent::ratelimit_rate);
    DBG("not enough allowance (%f), sleeping %d useconds\n", allowance, sleep_time);
    usleep(sleep_time);
    allowance=0.0;
    gettimeofday(&last_check, 0);
  } else {
    allowance -= 1.0;
  }

  DBG("allowance left: %f\n", allowance);
}

void DBRegAgentProcessorThread::run() {
  DBG("DBRegAgentProcessorThread thread started\n");
  
  // register us as SIP event receiver for MOD_NAME_processor
  AmEventDispatcher::instance()->addEventQueue(MOD_NAME "_processor",this);

  mysqlpp::Connection::thread_start();

  // initialize ratelimit
  gettimeofday(&last_check, NULL);
  if (DBRegAgent::ratelimit_slowstart)
    allowance = 0.0;
  else
    allowance = DBRegAgent::ratelimit_rate;

  reg_agent = DBRegAgent::instance();
  while (!stopped) {
    waitForEvent();
    while (eventPending()) {
      rateLimitWait();
      processSingleEvent();
    }
  }

  mysqlpp::Connection::thread_end();

 DBG("DBRegAgentProcessorThread thread stopped\n"); 
}

void DBRegAgentProcessorThread::process(AmEvent* ev) {

  if (ev->event_id == E_SYSTEM) {
    AmSystemEvent* sys_ev = dynamic_cast<AmSystemEvent*>(ev);
    if(sys_ev){	
      DBG("Session received system Event\n");
      if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) {
	DBG("stopping processor thread\n");
	stopped = true;
      }
      return;
    }
  }


  if (ev->event_id == RegistrationActionEventID) {
    RegistrationActionEvent* reg_action_ev =
      dynamic_cast<RegistrationActionEvent*>(ev);
    if (reg_action_ev) {
      reg_agent->onRegistrationActionEvent(reg_action_ev);
      return;
    }
  }

  ERROR("unknown event received!\n");
}
#if 0
void test_cb(RegTimer* tr, long data1, void* data2) {
  DBG("cb called: [%p], data %ld / [%p]\n", tr, data1, data2);
}

void DBRegAgent::run_tests() {

  registration_timer.start();

  struct timeval now;
  gettimeofday(&now, 0);

  RegTimer rt;
  rt.expires = now.tv_sec + 10; 
  rt.cb=test_cb;
  registration_scheduler.insert_timer(&rt);

  RegTimer rt2;
  rt2.expires = now.tv_sec + 5; 
  rt2.cb=test_cb;
  registration_scheduler.insert_timer(&rt2);

  RegTimer rt3;
  rt3.expires = now.tv_sec + 15; 
  rt3.cb=test_cb;
  registration_scheduler.insert_timer(&rt3);

  RegTimer rt4;
  rt4.expires = now.tv_sec - 1; 
  rt4.cb=test_cb;
  registration_scheduler.insert_timer(&rt4);

  RegTimer rt5;
  rt5.expires = now.tv_sec + 100000; 
  rt5.cb=test_cb;
  registration_scheduler.insert_timer(&rt5);

  RegTimer rt6;
  rt6.expires = now.tv_sec + 100; 
  rt6.cb=test_cb;
  registration_scheduler.insert_timer_leastloaded(&rt6, now.tv_sec+5, now.tv_sec+50);


  sleep(30);
  gettimeofday(&now, 0);

  RegTimer rt7;
  rt6.expires = now.tv_sec + 980; 
  rt6.cb=test_cb;
  registration_scheduler.insert_timer_leastloaded(&rt6, now.tv_sec+9980, now.tv_sec+9990);

   vector<RegTimer*> rts;

   for (int i=0;i<1000;i++) {
     RegTimer* t = new RegTimer();
     rts.push_back(t);
     t->expires = now.tv_sec + i;
     t->cb=test_cb;
     registration_scheduler.insert_timer_leastloaded(t, now.tv_sec, now.tv_sec+1000);
   }

  sleep(200);
}
#endif