modules/db_cassandra/dbcassa_base.cpp
c0a92201
 /*
  * $Id$
  *
  * CASSANDRA module interface
  *
  * Copyright (C) 2012 1&1 Internet AG
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License 
  * along with this program; if not, write to the Free Software 
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  * History:
  * --------
  * 2012-01  first version (Anca Vamanu)
  */
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <poll.h>
 #include <iostream>
 #include <boost/lexical_cast.hpp>
 #include <protocol/TBinaryProtocol.h>
 #include <transport/TSocket.h>
 #include <transport/TTransportUtils.h>
 
 extern "C" {
 #include "../../timer.h"
 #include "../../mem/mem.h"
 #include "dbcassa_table.h"
 }
 
 #include "Cassandra.h"
 #include "dbcassa_base.h"
 
 namespace at  = apache::thrift;
 namespace att = apache::thrift::transport;
 namespace atp = apache::thrift::protocol;
 namespace oac = org::apache::cassandra;
 
 static const char cassa_key_delim  = ' ';
 static const int  cassa_max_key_len= 512;
 
 #define MAX_ROWS_NO        128     /* TODO: make this configurable or dynamic */
 int row_slices[MAX_ROWS_NO][2];
 
 /*
  * ----         Cassandra Connection Section               ----
  *  */
 
 struct cassa_con {
 	struct db_id* id;           /*!< Connection identifier       */
 	unsigned int ref;           /*!< Reference count             */
 	struct pool_con* next;      /*!< Next connection in the pool */
 
 	str db_name;                /*!< Database name as str        */
 	oac::CassandraClient* con;  /*!< Cassandra connection        */
 };
 
 #define CON_CASSA(db_con)    ((struct cassa_con*)db_con->tail)
 
 /*!
  * \brief Open connection to Cassandra cluster
  * \param db_id
  */
 oac::CassandraClient* dbcassa_open(struct db_id* id)
 {
 	try {
 		boost::shared_ptr<att::TSocket> socket(new att::TSocket(id->host, id->port));
 		boost::shared_ptr<att::TTransport> transport(new att::TFramedTransport (socket));
 		boost::shared_ptr<atp::TProtocol> protocol(new atp::TBinaryProtocol(transport));
 
 		socket->setConnTimeout(cassa_conn_timeout);
 		socket->setSendTimeout(cassa_send_timeout);
 		socket->setRecvTimeout(cassa_recv_timeout);
 
 		std::auto_ptr<oac::CassandraClient> cassa_client(new oac::CassandraClient(protocol));
 
 		transport->open();
 		if (!transport->isOpen()) {
 			LM_ERR("Failed to open transport to Cassandra\n");
 			return 0;
 		}
 
 		/* database name ->  keyspace */
 
 		cassa_client->set_keyspace(id->database);
 		if(id->username && id->password) {
 			oac::AuthenticationRequest au_req;
 			std::map<std::string, std::string>  cred;
 			cred.insert(std::pair<std::string, std::string>("username", id->username));
 			cred.insert(std::pair<std::string, std::string>("password", id->password));
 			au_req.credentials = cred;
 			try {
 				cassa_client->login(au_req);
 			} catch (const oac::AuthenticationException& autx) {
 				LM_ERR("Authentication failure: Credentials not valid, %s\n", autx.why.c_str());
 			} catch (const oac::AuthorizationException & auzx) {
 				LM_ERR("Authentication failure: Credentials not valid for the selected database, %s\n", auzx.why.c_str());
 			}
 		}
 
 		LM_DBG("Opened connection to Cassandra cluster  %s:%d\n", id->host, id->port);
 		return cassa_client.release();
 
 	} catch (const oac::InvalidRequestException &irx) {
 		LM_ERR("Database does not exist %s, %s\n", id->database, irx.why.c_str());
 	} catch (const at::TException &tx) {
 		LM_ERR("Failed to open connection to Cassandra cluster %s:%d, %s\n",
 				id->database, id->port, tx.what());
 	} catch (const std::exception &ex) {
 		LM_ERR("Failed: %s\n", ex.what());
 	} catch (...) {
 		LM_ERR("Failed to open connection to Cassandra cluster\n");
 	}
 
 	return 0;
 }
 
 /*!
  * \brief Create new DB connection structure
  * \param db_id
  */
 void* db_cassa_new_connection(struct db_id* id)
 {
 	struct cassa_con* ptr;
 
 	if (!id) {
 		LM_ERR("invalid db_id parameter value\n");
 		return 0;
 	}
 
 	if (id->port) {
 		LM_DBG("opening connection: cassa://xxxx:xxxx@%s:%d/%s\n", ZSW(id->host),
 			id->port, ZSW(id->database));
 	} else {
 		LM_DBG("opening connection: cassa://xxxx:xxxx@%s/%s\n", ZSW(id->host),
 			ZSW(id->database));
 	}
 
 	ptr = (struct cassa_con*)pkg_malloc(sizeof(struct cassa_con));
 	if (!ptr) {
 		LM_ERR("failed trying to allocated %lu bytes for connection structure."
 				"\n", (unsigned long)sizeof(struct cassa_con));
 		return 0;
 	}
 	LM_DBG("%p=pkg_malloc(%lu)\n", ptr, (unsigned long)sizeof(struct cassa_con));
 
 	memset(ptr, 0, sizeof(struct cassa_con));
 
 	ptr->db_name.s = id->database;
 	ptr->db_name.len = strlen(id->database);
 	ptr->id = id;
 	ptr->ref = 1;
 
 	ptr->con = dbcassa_open(id);
 	if(!ptr->con) {
 		LM_ERR("Failed to open connection to Cassandra cluster\n");
 		pkg_free(ptr);
 		return 0;
 	}
 	return ptr;
 }
 
 
 /*!
  * \brief Close Cassandra connection
  * \param CassandraConnection
  */
 void dbcassa_close(oac::CassandraClient* con)
 {
 	if(! con) return;
 
 	delete con;
 }
 
 /*!
  * \brief Close the connection and release memory
  * \param connection
  */
 void db_cassa_free_connection(struct pool_con* con)
 {
 	struct cassa_con * _c;
 
 	if (!con) return;
 
 	_c = (struct cassa_con*) con;
 	dbcassa_close(_c->con);
 	pkg_free(_c);
 }
 
 /*!
  * \brief Reconnect to Cassandra cluster
  * \param connection
  */
 void dbcassa_reconnect(struct cassa_con* con)
 {
 	dbcassa_close(con->con);
 	con->con = dbcassa_open(con->id);
 }
 
 
 /*
  * ----              DB Operations Section                          ----
  * */
 
 /*
  *	Util functions
  * */
 static int cassa_get_res_col(std::vector<oac::ColumnOrSuperColumn> result, int r_si, int r_fi, int prefix_len, db_key_t qcol)
 {
 	str res_col_name;
 
 	for (int i = r_si; i< r_fi; i++) {
 		res_col_name.s = (char*)result[i].column.name.c_str()+prefix_len;
 		res_col_name.len = (int)result[i].column.name.size() - prefix_len;
 
 		if(res_col_name.len == qcol->len &&
 				strncmp(res_col_name.s, qcol->s, qcol->len )==0)
 			return i;
 	}
 	return -1;
 }
 
 static int cassa_convert_result(db_key_t qcol, std::vector<oac::ColumnOrSuperColumn> result,
 		int r_si, int r_fi, int prefix_len, db_val_t* sr_cell)
 {
 	str col_val;
 	int idx_rescol;
 	oac::Column res_col;
 
 	idx_rescol = cassa_get_res_col(result, r_si, r_fi, prefix_len, qcol);
 	if(idx_rescol< 0) {
 		LM_DBG("Column not found in result %.*s\n", qcol->len, qcol->s);
 		sr_cell->nul  = 1;
 		return 0;
 	}
 	res_col = result[idx_rescol].column;
 
 	col_val.s = (char*)res_col.value.c_str();
 	if(!col_val.s) {
 		LM_DBG("Column not found in result %.*s- NULL\n", qcol->len, qcol->s);
 		sr_cell->nul  = 1;
 		return 0;
 	}
 	col_val.len = strlen(col_val.s);
 
 	sr_cell->nul  = 0;
 	sr_cell->free  = 0;
 
 	switch (sr_cell->type) {
 		case DB1_INT:
 			if(str2int(&col_val, (unsigned int*)&sr_cell->val.int_val) < 0) {
 				LM_ERR("Wrong value [%s] - len=%d, expected integer\n", col_val.s, col_val.len);
 				return -1;
 			}
 			break;
 		case DB1_BIGINT:
 			if(sscanf(col_val.s, "%lld", &sr_cell->val.ll_val) < 0) {
 				LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
 				return -1;
 			}
 			break;
 		case DB1_DOUBLE:
 			if(sscanf(col_val.s, "%lf", &sr_cell->val.double_val) < 0) {
 				LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
 				return -1;
 			}
 			break;
 		case DB1_STR:
 			pkg_str_dup(&sr_cell->val.str_val, &col_val);
 			sr_cell->free  = 1;
 			break;
 		case DB1_STRING:
 			col_val.len++;
 			pkg_str_dup(&sr_cell->val.str_val, &col_val);
 			sr_cell->val.str_val.len--;
 			sr_cell->val.str_val.s[col_val.len-1]='\0';
 			sr_cell->free  = 1;
 			break;
 		case DB1_BLOB:
 			pkg_str_dup(&sr_cell->val.blob_val, &col_val);
 			sr_cell->free  = 1;
 			break;
 		case DB1_BITMAP:
 			if(str2int(&col_val, &sr_cell->val.bitmap_val) < 0) {
 				LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
 				return -1;
 			}
 			break;
 		case DB1_DATETIME:
 			if(sscanf(col_val.s, "%ld", (long int*)&sr_cell->val.time_val) < 0) {
 				LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
 				return -1;
 			}
 			break;
 	}
 	return 0;
 }
 
 static char* dbval_to_string(db_val_t dbval, char* pk)
 {
 	switch(dbval.type) {
 		case DB1_STRING: strcpy(pk, dbval.val.string_val);
 						   pk+= strlen(dbval.val.string_val);
 						   break;
 		case DB1_STR:    memcpy(pk, dbval.val.str_val.s, dbval.val.str_val.len);
 						   pk+= dbval.val.str_val.len;
 						   break;
 		case DB1_INT:    pk+= sprintf(pk, "%d", dbval.val.int_val);
 						   break;
 		case DB1_BIGINT: pk+= sprintf(pk, "%lld", dbval.val.ll_val);
 						   break;
 		case DB1_DOUBLE: pk+= sprintf(pk, "%lf", dbval.val.double_val);
 						   break;
 		case DB1_BLOB:   pk+= sprintf(pk, "%.*s", dbval.val.blob_val.len, dbval.val.blob_val.s);
 						   break;
 		case DB1_BITMAP: pk+= sprintf(pk, "%u", dbval.val.bitmap_val);
 						   break;
 		case DB1_DATETIME:pk+= sprintf(pk, "%ld", (long int)dbval.val.time_val);
 						  break;
 	}
 	return pk;
 }
 
 
 int cassa_constr_key( const db_key_t* _k, const db_val_t* _v,
 		int _n, int key_len, dbcassa_column_p* key_array, int *no_kc, char* key)
 {
 	int i, j;
 	char* pk = key;
 
 	if(!key_array)
 		return 0;
 
 	for(j = 0; j< _n; j++) {
 		LM_DBG("query col = %.*s\n",  _k[j]->len,  _k[j]->s);
 	}
 
 	for(i = 0; i< key_len; i++) {
 		/* look in the received columns to search the key column */
 		for(j = 0; j< _n; j++) {
 			if(_k[j]->len == key_array[i]->name.len &&
 					!strncmp(_k[j]->s, key_array[i]->name.s, _k[j]->len))
 				break;
 		}
 		if(j == _n) {
 			LM_ERR("The key column with name [%.*s] not found in values\n", key_array[i]->name.len, key_array[i]->name.s);
 			break;
 		}
 		pk= dbval_to_string(_v[j], pk);
 		*(pk++) = cassa_key_delim;
 	}
 	if(pk > key)
 		*(--pk) = '\0';
 	else
 		*key = '\0';
 
 	if(no_kc)
 		*no_kc = i;
 
 	LM_DBG("key = %s\n", key);
 
 	return pk - key;
 }
 
 
 int cassa_result_separate_rows(std::vector<oac::ColumnOrSuperColumn> result) {
 	int rows_no =0, i = 0;
 	int res_size = result.size();
 
 	while(i< res_size) {
 		size_t found;
 		std::string curr_seckey;
 
 		found = result[i].column.name.find(cassa_key_delim);
 		if(found< 0) {
 			LM_ERR("Wrong formated column name - secondary key part not found [%s]\n",
 					result[i].column.name.c_str());
 			return -1;
 		}
 		curr_seckey = result[i].column.name.substr(0, found);
 
 		while(++i < res_size) {
 			if(result[i].column.name.compare(0, found, curr_seckey)) {
 				LM_DBG("Encountered a new secondary key %s - %s\n", result[i].column.name.c_str(), curr_seckey.c_str());
 				break;
 			}
 		}
 		/* the current row stretches until index 'i' and the corresponding key prefix has length 'found' */
 		row_slices[rows_no][0] = i;
 		row_slices[rows_no][1] = found +1;
 		rows_no++;
 	}
 
 	/* debug messages */
 	for(int i = 0; i< rows_no; i++) {
 		LM_DBG("Row %d until index %d with prefix len %d\n", i, row_slices[i][0], row_slices[i][1]);
 	}
 
 	return rows_no;
 }
 
 dbcassa_column_p cassa_search_col(dbcassa_table_p tbc, db_key_t col_name)
 {
 	dbcassa_column_p colp;
 
 	colp = tbc->cols;
 	while(colp) {
 		if(colp->name.len == col_name->len && !strncmp(colp->name.s, col_name->s, col_name->len))
 			return colp;
 		colp = colp->next;
 	}
 	return 0;
 }
 
13464e8d
 typedef std::vector<oac::ColumnOrSuperColumn>  ColumnVec;
 typedef std::auto_ptr<ColumnVec>  ColumnVecPtr;
c0a92201
 
 ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
13464e8d
 		const db_val_t* _v, const db_key_t* _c, int _n, int _nc, int* ret_rows_no)
c0a92201
 {
 	char row_key[cassa_max_key_len];
 	char sec_key[cassa_max_key_len];
13464e8d
 	int key_len=0, seckey_len = 0;
c0a92201
 	int no_kc, no_sec_kc;
 	dbcassa_table_p tbc;
 
 	/** Lock table schema and construct primary and secondary key **/
13464e8d
 	if(_k) {
 		tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
 		if(!tbc) {
 			LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
 			return ColumnVecPtr(NULL);
 		}
 		cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
c0a92201
 
13464e8d
 		if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
 			LM_ERR("Query not supported - key not provided\n");
 			dbcassa_lock_release(tbc);
 			return ColumnVecPtr(NULL);
 		}
 		key_len = tbc->key_len;
c0a92201
 
13464e8d
 		cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
 		seckey_len = tbc->seckey_len;
c0a92201
 
13464e8d
 		dbcassa_lock_release(tbc);
 	}
c0a92201
 
 	try {
 		oac::SlicePredicate sp;
 		if(seckey_len) { // seckey defined for this table
 			if(no_sec_kc == seckey_len) { // was able to build the complete secondary key
 				if(_c) { /* if queried for specific columns */
 					/* query for the specific columns */
 					for(int i=0; i< _nc; i++) {
 						std::string col_name = sec_key;
 						col_name.push_back(cassa_key_delim);
 						col_name.append(_c[i]->s);
 						sp.column_names.push_back(col_name);
 						LM_DBG("Query col: %s\n", col_name.c_str());
 					}
 					sp.__isset.column_names = true; // set
 				} else { /* query for columns starting with this secondary key */
 					oac::SliceRange sr;
 					sr.start = sec_key;
 					sr.start.push_back(cassa_key_delim);
 					sr.finish = sec_key;
 					sr.finish.push_back(cassa_key_delim +1);
 					sp.slice_range = sr;
 					sp.__isset.slice_range = true; // set
 				}
 			} else {  /* query all columns */
 				oac::SliceRange sr;
 				sr.start = "";
 				sr.finish = "";
 				sp.slice_range = sr;
 				sp.__isset.slice_range = true; // set
 			}
 		} else { /* the table doesn't have any secondary key defined */
 			if(_c) {
 				for(int i=0; i< _nc; i++) {
 					sp.column_names.push_back(_c[i]->s);
 					LM_DBG("Query col: %s\n", _c[i]->s);
 				}
 				LM_DBG("get %d columns\n", _nc);
 				sp.__isset.column_names = true; // set
 			} else {
 				/* return all columns */
 				oac::SliceRange sr;
 				sr.start = "";
 				sr.finish = "";
 				sp.slice_range = sr;
 				sp.__isset.slice_range = true; // set
 				LM_DBG("get all columns\n");
 			}
 		}
 
 		unsigned int retr = 0;
 		oac::ColumnParent cparent;
 		cparent.column_family = _h->table->s;
 		ColumnVecPtr cassa_result(new std::vector<oac::ColumnOrSuperColumn>);
 		do {
 			if(CON_CASSA(_h)->con) {
 				try {
13464e8d
 
 					if(_k) {
 						CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
 						*ret_rows_no = 1;
 					} else {
 						oac::KeyRange keyRange;
 						keyRange.start_key = "";
 						keyRange.start_key = "";
 						std::vector<oac::KeySlice> key_slice_vect;
 						keyRange.__isset.start_key = 1;
 						keyRange.__isset.end_key = 1;
 						ColumnVec::iterator it = cassa_result->begin();
 
 						/* get in a loop 100 records at a time */
 						int rows_no =0;
 						while(1) {
 							CON_CASSA(_h)->con->get_range_slices(key_slice_vect, cparent, sp, keyRange, oac::ConsistencyLevel::ONE);
 							/* construct cassa_result */
 							LM_DBG("Retuned %d key slices\n", key_slice_vect.size());
 							for(unsigned int i = 0; i< key_slice_vect.size(); i++) {
 								if(key_slice_vect[i].columns.size()==0) {
 									continue;
 								}
 								cassa_result->insert(it, key_slice_vect[i].columns.begin(), key_slice_vect[i].columns.end());
 								it = cassa_result->begin();
 								row_slices[rows_no][0] = cassa_result->size();
 								row_slices[rows_no][1] = 0;
 								rows_no++;
 							}
 							if(key_slice_vect.size() < (unsigned int)keyRange.count)
 								break;
 						}
 
 						*ret_rows_no = rows_no;
 					}
 
c0a92201
 					return cassa_result;
 				} catch (const att::TTransportException &tx) {
 					LM_ERR("Failed to query: %s\n", tx.what());
 				}
 			}
 			dbcassa_reconnect(CON_CASSA(_h));
 		} while(cassa_auto_reconnect && retr++ < cassa_retries);
 
 	} catch (const oac::InvalidRequestException ir) {
 		LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
 	} catch (const at::TException &tx) {
 		LM_ERR("Failed generic Thrift error: %s\n", tx.what());
 	} catch (const std::exception &ex) {
 		LM_ERR("Failed std error: %s\n", ex.what());
 	} catch (...) {
 		LM_ERR("Failed generic error\n");
 	}
 
 	LM_DBG("Query with get slice no_kc=%d tbc->key_len=%d  _n=%d\n", no_kc, key_len,_n);
 	return ColumnVecPtr(NULL);
 }
 
 
 /*
  *	The functions for the DB Operations: query, delete, update.
  * */
 
 /*
  * Query table for specified rows
  * _h: structure representing database connection
  * _k: key names
  * _op: operators
  * _v: values of the keys that must match
  * _c: column names to return
  * _n: number of key=values pairs to compare
  * _nc: number of columns to return
  * _o: order by the specified column
  */
 int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
 		const db_val_t* _v, const db_key_t* _c, int _n, int _nc,
 		const db_key_t _o, db1_res_t** _r)
 {
 	db1_res_t* db_res = 0;
 	int rows_no;
 	ColumnVecPtr cassa_result;
 	dbcassa_table_p tbc;
 	int seckey_len;
 
 	if (!_h || !CON_TABLE(_h) || !_r) {
 		LM_ERR("invalid parameter value\n");
 		return -1;
 	}
 	LM_DBG("query table=%s\n", _h->table->s);
 
 	/** Construct and send the query to Cassandra Cluster **/
 
13464e8d
 	cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc, &rows_no);
c0a92201
 
 	if(cassa_result.get() == NULL) {
 		LM_ERR("Failed to query Cassandra cluster\n");
 		return -1;
 	}
 
 	/* compare the number of queried cols with the key cols*/
 //	if(no_kc + no_sec_kc < _n) { /* TODO */
 		/* filter manually for the rest of the values */
 //	}
 
 	db_res = db_new_result();
 	if (!db_res) {
 		LM_ERR("no memory left\n");
 		goto error;
 	}
 	RES_COL_N(db_res)= _nc;
 	if(!db_allocate_columns(db_res, _nc) < 0) {
 		LM_ERR("no more memory\n");
 		goto error;
 	}
 
 	tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
 	if(!tbc) {
 		LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
 		return -1;
 	}
 
 	/** Convert the result from Cassandra **/
 	/* fill in the columns name and type */
 	for(int col = 0; col < _nc; col++) {
 		RES_NAMES(db_res)[col] = (str*)pkg_malloc(sizeof(str));
 		if (! RES_NAMES(db_res)[col]) {
 			LM_ERR("no private memory left\n");
 			dbcassa_lock_release(tbc);
 			db_free_columns(db_res);
 			goto error;
 		}
 
 		*RES_NAMES(db_res)[col]   = *_c[col];
 
 		/* search the column in table schema to get the type */
 		dbcassa_column_p colp = cassa_search_col(tbc, _c[col]);
 		if(!colp) {
 			LM_ERR("No column with name [%.*s] found\n", _c[col]->len, _c[col]->s);
 			dbcassa_lock_release(tbc);
 			db_free_columns(db_res);
 			goto error;
 		}
 		RES_TYPES(db_res)[col] = colp->type;
 
 		LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(db_res)[col], col,
 				RES_NAMES(db_res)[col]->len, RES_NAMES(db_res)[col]->s);
 	}
13464e8d
 	/* TODO  if all columns asked - take from table schema */
c0a92201
 	seckey_len = tbc->seckey_len;
 	dbcassa_lock_release(tbc);
 
 	if(!cassa_result->size()) {
 		LM_DBG("The query returned no result\n");
 		RES_ROW_N(db_res) = 0;
 		goto done;
 	}
 
 	/* Initialize the row_slices vector for the case with one column and no secondary key */
13464e8d
 	if(rows_no == 1) {
 		row_slices[0][0]= cassa_result->size();
 		row_slices[0][1]= 0;
 
 		if(seckey_len) { /* if the table has a secondary key defined */
c0a92201
 			/* pass through the result once to see how many rows there are */
 			rows_no = cassa_result_separate_rows(*cassa_result);
 			if(rows_no < 0) {
 				LM_ERR("Wrong formated column names\n");
 				goto error;
 			}
 		}
 	}
 
 	RES_ROW_N(db_res) = rows_no;
 
 	if (db_allocate_rows(db_res) < 0) {
 		LM_ERR("could not allocate rows");
 		goto error;
 	}
 
 	for(int ri=0; ri < rows_no; ri++) {
 		if (db_allocate_row(db_res, &(RES_ROWS(db_res)[ri])) != 0) {
 			LM_ERR("could not allocate row");
 			goto error;
 		}
 
 		/* complete the row with the columns */
 		for(int col = 0; col< _nc; col++) {
 			RES_ROWS(db_res)[ri].values[col].type = RES_TYPES(db_res)[col];
 			cassa_convert_result(_c[col], *cassa_result, (ri>0?row_slices[ri-1][0]:0),  row_slices[ri][0],
 					row_slices[ri][1], &RES_ROWS(db_res)[ri].values[col]);
 		}
 	}
 
 done:
 	*_r = db_res;
 	LM_DBG("Exited with success\n");
 	return 1;
 
 error:
 	if(db_res)
 		db_free_result(db_res);
 	return -1;
 }
 
 /*
  * Insert or update the table for specified row key
  * _h: structure representing database connection
  * _k: key names
  * _op: operators
  * _v: values of the keys that must match
  * _uk: column names to update
  * _uv: values for the columns to update
  * _n: number of key=values pairs to compare
  * _un: number of columns to update
  */
 int db_cassa_modify(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
 		const db_key_t* _uk, const db_val_t* _uv, int _n, int _un)
 {
 	dbcassa_table_p tbc;
 	char row_key[cassa_max_key_len];
 	char sec_key[cassa_max_key_len];
 	int64_t ts = 0;
 	str ts_col_name={0, 0};
 	int seckey_len;
 	unsigned int curr_time = time(NULL);
 
 	if (!_h || !CON_TABLE(_h) || !_k || !_v) {
 		LM_ERR("invalid parameter value\n");
 		return -1;
 	}
 
 	LM_DBG("modify table=%s\n", _h->table->s);
 
 	/** Lock table schema and construct primary and secondary key **/
 	tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
 	if(!tbc) {
 		LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
 		return -1;
 	}
 	if(tbc->ts_col)
 		pkg_str_dup(&ts_col_name, (const str*)&tbc->ts_col->name);
 
 	cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, 0, row_key);
 	cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, 0, sec_key);
 	seckey_len = tbc->seckey_len;
 
 	dbcassa_lock_release(tbc);
 
 	/** Construct and send the query to Cassandra Cluster **/
 	try {
 		/* Set the columns */
 		std::vector<oac::Mutation> mutations;
 		for(int i=0; i< _un; i++) {
 			if(_uv[i].nul)
 				continue;
 
 			std::stringstream out;
 			std::string value;
 			int cont = 0;
 
 			switch(_uv[i].type) {
 				case DB1_INT:	out << _uv[i].val.int_val;
 								value = out.str();
 								break;
 				case DB1_BIGINT:out << _uv[i].val.ll_val;
 								value = out.str();
 								break;
 				case DB1_DOUBLE:out << _uv[i].val.double_val;
 								value = out.str();
 								break;
 				case DB1_BITMAP:out << _uv[i].val.bitmap_val;
 								value = out.str();
 								break;
 				case DB1_STRING:value = _uv[i].val.string_val;
 								break;
 				case DB1_STR:	if(!_uv[i].val.str_val.s) {
 									cont = 1;
 									break;
 								}
 								value = std::string(_uv[i].val.str_val.s, _uv[i].val.str_val.len);
 								break;
 				case DB1_BLOB:	value = std::string(_uv[i].val.blob_val.s, _uv[i].val.blob_val.len);
 								break;
 				case DB1_DATETIME:	unsigned int exp_time = (unsigned int)_uv[i].val.time_val;
 									out << exp_time;
 									value = out.str();
 									if(ts_col_name.s && ts_col_name.len==_uk[i]->len &&
 											strncmp(ts_col_name.s, _uk[i]->s, ts_col_name.len)==0) {
 										ts = exp_time;
 										LM_DBG("Found timestamp col [%.*s]\n", ts_col_name.len, ts_col_name.s);
 									}
 									break;
 			}
 			if (cont)
 				continue;
 
 			LM_DBG("ADDED column [%.*s] type [%d], value [%s]\n", _uk[i]->len, _uk[i]->s,
 				_uv[i].type, value.c_str());
 
 			oac::Mutation mut;
 			oac::ColumnOrSuperColumn col;
 			if(seckey_len) {
 				col.column.name = sec_key;
 				col.column.name.push_back(cassa_key_delim);
 				col.column.name.append(_uk[i]->s);
 			}
 			else
 				col.column.name = _uk[i]->s;
 			col.column.value = value;
 			col.column.__isset.value = true;
 			col.__isset.column = true;
 			col.column.timestamp = curr_time;
 			col.column.__isset.timestamp = true;
 			mut.column_or_supercolumn = col;
 			mut.__isset.column_or_supercolumn = true;
 			mutations.push_back(mut);
 		}
 		if(ts_col_name.s)
 			pkg_free(ts_col_name.s);
 		ts_col_name.s = 0;
 
 		if(ts) {
 			int32_t ttl = ts - curr_time;
 			LM_DBG("Set expires to %d seconds\n", ttl);
 			for(size_t mi=0; mi< mutations.size(); mi++) {
 				mutations[mi].column_or_supercolumn.column.ttl = ttl;
 				mutations[mi].column_or_supercolumn.column.__isset.ttl = true;
 			}
 		}
 
 		LM_DBG("Perform the mutation, add [%d] columns\n", (int)mutations.size());
 
 		std::map<std::string, std::vector<oac::Mutation> > innerMap;
 		innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
 		std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
 		CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
 		unsigned int retr = 0;
 
 		do {
 			if(CON_CASSA(_h)->con) {
 				try{
 					CON_CASSA(_h)->con->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
 					return 1;
 				}  catch (const att::TTransportException &tx) {
 					LM_ERR("Failed to query: %s\n", tx.what());
 				}
 			}
 			dbcassa_reconnect(CON_CASSA(_h));
 		} while (cassa_auto_reconnect && retr++ < cassa_retries);
 
 	} catch (const oac::InvalidRequestException ir) {
 		LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
 	} catch (const at::TException &tx) {
 		LM_ERR("Failed generic Thrift error: %s\n", tx.what());
 	} catch (const std::exception &ex) {
 		LM_ERR("Failed std error: %s\n", ex.what());
 	} catch (...) {
 		LM_ERR("Failed generic error\n");
 	}
 
 	LM_ERR("Insert/Update query failed\n");
 	return -1;
 }
 
 
 int db_cassa_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
 		int _n)
 {
 	LM_DBG("db_cassa_insert:\n");
 	return db_cassa_modify(_h, _k, _v, _k, _v, _n, _n);
 }
 
 
 int db_cassa_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
 		const db_val_t* _v, const db_key_t* _uk, const db_val_t* _uv,
 		int _n, int _un)
 {
 	LM_DBG("db_cassa_update:\n");
 	return db_cassa_modify(_h, _k, _v, _uk, _uv, _n, _un);
 }
 
 
 int db_cassa_free_result(db1_con_t* _h, db1_res_t* _r)
 {
 	return db_free_result(_r);
 }
 
 /*
  * Delete after primary or primary and secondary key
  * _h: structure representing database connection
  * _k: key names
  * _op: operators
  * _v: values of the keys that must match
  * _n: number of key=values pairs to compare
  */
 int db_cassa_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
 		const db_val_t* _v, int _n)
 {
 	oac::CassandraClient* cassa_client = CON_CASSA(_h)->con;
 	char row_key[cassa_max_key_len];
 	char sec_key[cassa_max_key_len];
 	dbcassa_table_p tbc;
 	int no_kc, no_sec_kc;
 	unsigned int retr = 0;
 	int seckey_len;
 	oac::Mutation m;
 
 	if (!_h || !CON_TABLE(_h) || !_k || !_v) {
 		LM_ERR("invalid parameter value\n");
 		return -1;
 	}
 
 	LM_DBG("query table=%s\n", _h->table->s);
 
 	/* get the table schema and construct primary and secondary key */
 	tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
 	if(!tbc)
 	{
 		LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
 		return -1;
 	}
 
 	cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc,   row_key);
 	cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
 	seckey_len = tbc->seckey_len;
 
 	if (_n != no_kc && no_sec_kc == seckey_len) {
 		/* if the conditions are also for secondary key */
 		LM_DBG("Delete after primary and secondary key %s %s\n", row_key, sec_key);
 		dbcassa_column_p colp = tbc->cols;
 		try {
 			while(colp) {
 				std::string col_name = sec_key;
 				col_name.push_back(cassa_key_delim);
 				col_name.append(colp->name.s);
 				m.deletion.predicate.column_names.push_back(col_name);
 				colp = colp->next;
 			}
 		} catch (...) {
 			LM_ERR("Failed to construct the list of column names\n");
 			dbcassa_lock_release(tbc);
 			return -1;
 		}
 	}
 
 	dbcassa_lock_release(tbc);
 
 	for(int i=0; i < _n; i++)
 		LM_DBG("delete query col = %.*s\n", _k[i]->len, _k[i]->s);
 
 	if(no_kc == 0 ) {
 		LM_DBG("Delete operation not supported\n");
 		return -1;
 	}
 
 	try {
 		if (_n == no_kc) {
 			LM_DBG("Delete after row key %s\n", row_key);
 			oac::ColumnPath cp;
 			cp.column_family = _h->table->s;
 			do {
 				if(CON_CASSA(_h)->con) {
 					try {
 						cassa_client->remove(row_key, cp, (int64_t)time(0), oac::ConsistencyLevel::ONE);
 						return 1;
 					} catch  (const att::TTransportException &tx) {
 							LM_ERR("Failed to query: %s\n", tx.what());
 					}
 				}
 				dbcassa_reconnect(CON_CASSA(_h));
 			} while(cassa_auto_reconnect && retr++ < cassa_retries);
 		} else {
 
 			if(!seckey_len) {
 				LM_ERR("Delete operation not supported\n");
 				return -1;
 			}
 
 //			oac::Mutation m;
 			m.deletion.timestamp = (int64_t)time(0);
 			m.deletion.__isset.timestamp = true;
 			m.__isset.deletion = true;
 
 #if 0
 			/* push all columns for the corresponding secondary key */
 			tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
 			if(!tbc)
 			{
 				LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
 				return -1;
 			}
 			dbcassa_column_p colp = tbc->cols;
 			try {
 				while(colp) {
 					std::string col_name = sec_key;
 					col_name.push_back(cassa_key_delim);
 					col_name.append(colp->name.s);
 					m.deletion.predicate.column_names.push_back(col_name);
 					colp = colp->next;
 				}
 			} catch (...) {
 				LM_ERR("Failed to construct the list of column names\n");
 				dbcassa_lock_release(tbc);
 				return -1;
 			}
 			dbcassa_lock_release(tbc);
 #endif
 			m.deletion.__isset.predicate = true;
 			m.deletion.predicate.__isset.column_names = true; // set
 
 			std::vector<oac::Mutation> mutations;
 			mutations.push_back(m);
 
 			/* innerMap - column_family + mutations vector */
 			std::map<std::string, std::vector<oac::Mutation> > innerMap;
 			innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
 			std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
 			CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
 
 			do {
 				if(CON_CASSA(_h)->con) {
 					try {
 						cassa_client->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
 						return 1;
 					} catch  (const att::TTransportException &tx) {
 							LM_ERR("Failed to query: %s\n", tx.what());
 					}
 				}
 				dbcassa_reconnect(CON_CASSA(_h));
 			} while(cassa_auto_reconnect && retr++ < cassa_retries);
 		}
 		return 1;
 	} catch (const oac::InvalidRequestException ir) {
 		LM_ERR("Invalid query: %s\n", ir.why.c_str());
 	} catch (const at::TException &tx) {
 		LM_ERR("Failed TException: %s\n", tx.what());
 	} catch (std::exception &e) {
 		LM_ERR("Failed: %s\n", e.what());
 	} catch (...) {
 		LM_ERR("Failed generic error\n");
 	}
 
 	return -1;
 }