modules/db_cassandra/dbcassa_table.c
c0a92201
 /*
  * $Id$
  *
  * Copyright (C) 2012 1&1 Internet Development
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License 
  * along with this program; if not, write to the Free Software 
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  * 
  * History:
  * --------
  * 2012-01 initial version (Anca Vamanu)
  *
  */
 
 #include <stdio.h>
 #include <string.h>
 #include <time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <dirent.h>
 
 #include "../../mem/shm_mem.h"
 #include "../../mem/mem.h"
 #include "../../dprint.h"
1aedf95f
 #include "../../hashes.h"
c0a92201
 #include "../../lock_ops.h"
 
 #include "dbcassa_table.h"
 
 #define DBCASSA_TABLE_SIZE 16
 
 typedef struct  rw_lock {
 	gen_lock_t lock;
 	int reload_flag;
 	int data_refcnt;
 } rw_lock_t;
 
 typedef struct _dbcassa_tbl_htable
 {
 	rw_lock_t lock;
 	dbcassa_table_p dtp;
 } dbcassa_tbl_htable_t, *dbcassa_tbl_htable_p;
 
 static dbcassa_tbl_htable_p dbcassa_tbl_htable = NULL;
 extern str dbcassa_schema_path;
 static char full_path_buf[_POSIX_PATH_MAX + 1];
 
 /**
  * Check if file modified from last read
  * -1 - error
  *  0 - no change
  *  1 - changed
  */
 int dbcassa_check_mtime(time_t *mt)
 {
 	struct stat s;
 
 	if(stat(full_path_buf, &s) == 0)
 	{
 		if((int)s.st_mtime > (int)*mt)
 		{
 			*mt = s.st_mtime;
 			LM_DBG("[%s] was updated\n", full_path_buf);
 			return 1;
 		}
 	} else {
 		LM_DBG("stat failed on [%s]\n", full_path_buf);
 		return -1;
 	}
 	return 0;
 }
 
 /*
  *	Create new table structure
  *
  * */
 
 dbcassa_table_p dbcassa_table_new(const str *_tbname, const str *_dbname)
 {
 	struct stat s;
 	dbcassa_table_p dtp = NULL;
 	int size;
 
 	if(!_tbname || !_dbname) {
 		LM_ERR("Invalid parameters\n");
 		return 0;
 	}
 
 	size = sizeof(dbcassa_table_t)+_tbname->len+_dbname->len;
 	dtp = (dbcassa_table_p)shm_malloc(size);
 	if(!dtp) {
 		LM_ERR("No more shared memory\n");
 		return 0;
 	}
 
 	memset(dtp, 0, size);
 	size = sizeof(dbcassa_table_t);
 	dtp->name.s = (char*)dtp + size;
 	memcpy(dtp->name.s, _tbname->s, _tbname->len);
 	dtp->name.len = _tbname->len;
 	size+= _tbname->len;
 
 	dtp->dbname.s = (char*)dtp + size;
 	memcpy(dtp->dbname.s, _dbname->s, _dbname->len);
 	dtp->dbname.len = _dbname->len;
 
 	if(stat(full_path_buf, &s) == 0) {
 		dtp->mt = s.st_mtime;
 		LM_DBG("mtime is %d\n", (int)s.st_mtime);
 	}
 
 	return dtp;
 }
 
 dbcassa_column_p dbcassa_column_new(char *_s, int _l)
 {
 	dbcassa_column_p dcp;
 	int size;
 
 	size = sizeof(dbcassa_column_t) + _l+ 1;
 	dcp = (dbcassa_column_p)shm_malloc(size);
 	if(!dcp) {
 		LM_ERR("No more shared memory\n");
 		return 0;
 	}
 	memset(dcp, 0, size);
 	dcp->name.s = (char*)dcp + sizeof(dbcassa_column_t);
 	memcpy(dcp->name.s, _s, _l);
 	dcp->name.len = _l;
 	dcp->name.s[_l] = '\0';
 
 	return dcp;
 }
 
 int dbcassa_column_free(dbcassa_column_p dcp)
 {
 	if(!dcp)
 		return -1;
 	shm_free(dcp);
 	return 0;
 }
 
 int dbcassa_table_free(dbcassa_table_p _dtp)
 {
 	dbcassa_column_p _cp, _cp0;
 	
 	if(!_dtp)
 		return -1;
 
 	/* cols*/
 	_cp = _dtp->cols;
 	while(_cp) {
 		_cp0=_cp;
 		_cp=_cp->next;
 		dbcassa_column_free(_cp0);
 	}
 	/* key */
 	if(_dtp->key)
 		shm_free(_dtp->key);
 	if(_dtp->sec_key)
 		shm_free(_dtp->sec_key);
 
 	shm_free(_dtp);
 
 	return 0;
 }
 
 /**
  * Load the table schema from file
  */
 dbcassa_table_p dbcassa_load_file(str* dbn, str* tbn)
 {
 #define KEY_MAX_LEN 10
 	FILE *fin=NULL;
 	char buf[4096];
 	int c, crow, ccol, bp, max_auto;
 	dbcassa_table_p dtp = 0;
 	dbcassa_column_p colp= 0;
 	dbcassa_column_p key[KEY_MAX_LEN];
 	dbcassa_column_p sec_key[KEY_MAX_LEN];
 
 	enum {DBCASSA_FLINE_ST, DBCASSA_NLINE_ST, DBCASSA_NLINE2_ST} state;
 
 	memset(key, 0, KEY_MAX_LEN*sizeof(dbcassa_column_p));
 	memset(sec_key, 0, KEY_MAX_LEN*sizeof(dbcassa_column_p));
 
 	LM_DBG("loading file [%s]\n", full_path_buf);
 	fin = fopen(full_path_buf, "rt");
 	if(!fin) {
 		LM_ERR("Failed to open file\n");
 		return 0;
 	}
 
 	dtp = dbcassa_table_new(tbn, dbn);
 	if(!dtp)
 		goto done;
 
 	state = DBCASSA_FLINE_ST;
 	crow = ccol = -1;
 	c = fgetc(fin);
 	max_auto = 0;
 	while(c!=EOF) {
 		switch(state) {
 			case DBCASSA_FLINE_ST:
 				bp = 0;
 				while(c==DBCASSA_DELIM_C)
 					c = fgetc(fin);
 				if(c==DBCASSA_DELIM_R && !dtp->cols)
 					goto clean;
 				if(c==DBCASSA_DELIM_R) {
 					if(dtp->nrcols <= 0)
 						goto clean;
 					
 					state = DBCASSA_NLINE_ST;
 					c = fgetc(fin);
 					break;
 				}
 				while(c!=DBCASSA_DELIM_C && c!='(' && c!=DBCASSA_DELIM_R) {
 					if(c==EOF)
 						goto clean;
 					buf[bp++] = c;
 					c = fgetc(fin);
 				}
 				colp = dbcassa_column_new(buf, bp);
 				if(!colp)
 					goto clean;
 				LM_DBG("new col [%.*s]\n", bp, buf);
 				while(c==DBCASSA_DELIM_C)
 					c = fgetc(fin);
 				if(c!='(')
 					goto clean;
 				c = fgetc(fin);
 				while(c==DBCASSA_DELIM_C)
 					c = fgetc(fin);
 
 				switch(c) {
 					case 's':
 					case 'S':
 						colp->type = DB1_STR;
 						LM_DBG("column[%d] is STR!\n", ccol+1);
 					break;
 					case 'i':
 					case 'I':
 						colp->type = DB1_INT;
 						LM_DBG("column[%d] is INT!\n", ccol+1);
 					break;
 					case 'd':
 					case 'D':
 						colp->type = DB1_DOUBLE;
 						LM_DBG("column[%d] is DOUBLE!\n", ccol+1);
 					break;
 					case 't':
 					case 'T':
 						colp->type = DB1_DATETIME;
 						LM_DBG("column[%d] is TIME! Timestamp col has name [%s]\n", ccol+1, colp->name.s);
 						if(dtp->ts_col) {
 							LM_ERR("You can have only one column with type timestamp\n");
 							goto clean;
 						}
 						dtp->ts_col = colp;
 					break;
 					default:
 						LM_DBG("wrong column type!\n");
 						goto clean;
 				}
 
 				while(c!='\n' && c!=EOF && c!=')' && c!= ',') {
 					if(colp->type == DB1_STR && (c=='i'|| c=='I')) {
 						colp->type = DB1_STRING;
 						LM_DBG("column[%d] is actually STRING!\n", ccol+1);
 					}
 					c = fgetc(fin);
 				}
 
 				if(c == ')') {
 					//LM_DBG("c=%c!\n", c);
 					colp->next = dtp->cols;
 					dtp->cols = colp;
 					dtp->nrcols++;
 					c = fgetc(fin);
 				}
 				else
 					goto clean;
 				ccol++;
 			break;
 
 			case DBCASSA_NLINE_ST:
 			case DBCASSA_NLINE2_ST:
 				// unique key
 				while(c==DBCASSA_DELIM_C)
 					c = fgetc(fin);
 				if(c == DBCASSA_DELIM_R) {
 					state = DBCASSA_NLINE2_ST;
 					c = fgetc(fin);
 					break;
 				}
 
 				if(c == EOF)
 					break;
 				bp= 0;
 				while(c!=DBCASSA_DELIM_C && c!=DBCASSA_DELIM_R)
 				{
 					if(c==EOF)
 						break;
 					buf[bp++] = c;
 					c = fgetc(fin);
 				}
 				colp = dtp->cols;
 				while(colp) {
 					if(bp==colp->name.len && strncmp(colp->name.s, buf, bp)==0) {
 						if(state == DBCASSA_NLINE_ST)
 							key[dtp->key_len++] = colp;
 						else
 							sec_key[dtp->seckey_len++] = colp;
 						break;
 					}
 					colp = colp->next;
 				}
 				if(!colp) {
 					LM_ERR("Undefined column in key [%.*s]\n", bp, buf);
 					goto clean;
 				}
 				break;
 		}
 	}
 
 	/* copy the keys into the table */
 	if(dtp->key_len) {
 		dtp->key = (dbcassa_column_p*)
 				shm_malloc(dtp->key_len*sizeof(dbcassa_column_p));
 		if(!dtp->key) {
 			LM_ERR("No more share memory\n");
 			goto clean;
 		}
 		for(ccol = 0; ccol< dtp->key_len; ccol++) {
 			dtp->key[ccol] = key[ccol];
 			LM_DBG("col [%.*s] in primary key\n", key[ccol]->name.len, key[ccol]->name.s);
 		}
 	}
 	if(dtp->seckey_len) {
 		dtp->sec_key = (dbcassa_column_p*)
 				shm_malloc(dtp->seckey_len*sizeof(dbcassa_column_p));
 		if(!dtp->sec_key) {
 			LM_ERR("No more share memory\n");
 			goto clean;
 		}
 		for(ccol = 0; ccol< dtp->seckey_len; ccol++) {
 			dtp->sec_key[ccol] = sec_key[ccol];
 			LM_DBG("col [%.*s] in secondary key\n", sec_key[ccol]->name.len, sec_key[ccol]->name.s);
 		}
 	}
 
 done:
 	if(fin)
 		fclose(fin);
 	return dtp;
 clean:
 	if(fin)
 		fclose(fin);
 	if(dtp)
 		dbcassa_table_free(dtp);
 	return NULL;
 }
 
 
 #define ref_read_data(rw_lock) \
 do {\
 	again:\
 	lock_get( &rw_lock.lock ); \
 	if (rw_lock.reload_flag) { \
 		lock_release( &rw_lock.lock ); \
 		usleep(5); \
 		goto again; \
 	} \
 	rw_lock.data_refcnt++; \
 	lock_release( &rw_lock.lock ); \
 } while(0)
 
 
 #define unref_read_data(rw_lock) \
 do {\
 	lock_get( &rw_lock.lock ); \
 	rw_lock.data_refcnt--; \
 	lock_release( &rw_lock.lock ); \
 } while(0)
 
 
 #define ref_write_data(rw_lock)\
 do {\
 	lock_get( &rw_lock.lock ); \
 	rw_lock.reload_flag = 1; \
 	lock_release( &rw_lock.lock ); \
 	while (rw_lock.data_refcnt) \
 		usleep(10); \
 } while(0)
 
 
 #define unref_write_data(rw_lock)\
 	rw_lock.reload_flag = 0;
 
 /*
  *	Search the table schema
  * */
 dbcassa_table_p dbcassa_db_search_table(int hashidx, int hash,
 		const str* dbn, const str *tbn)
 {
 	dbcassa_table_p tbc = NULL;
 	ref_read_data(dbcassa_tbl_htable[hashidx].lock);
 
 	tbc = dbcassa_tbl_htable[hashidx].dtp;
 	while(tbc) {
 		LM_DBG("found dbname=%.*s, table=%.*s\n", tbc->dbname.len, tbc->dbname.s, tbc->name.len, tbc->name.s);
 		if(tbc->hash==hash && tbc->dbname.len == dbn->len
 			&& tbc->name.len == tbn->len
 			&& !strncasecmp(tbc->dbname.s, dbn->s, dbn->len)
 			&& !strncasecmp(tbc->name.s, tbn->s, tbn->len))
 			return tbc;
 		tbc = tbc->next;
 	}
 	unref_read_data(dbcassa_tbl_htable[hashidx].lock);
 	return NULL;
 }
 
 
 /**
  * Get the table schema. If the file was updated, update the table schema.
  */
 dbcassa_table_p dbcassa_db_get_table(const str* dbn, const str *tbn)
 {
 	dbcassa_table_p tbc = NULL, old_tbc= NULL, new_tbc= NULL, prev_tbc= NULL;
 	int hash;
 	int hashidx;
 	int len;
 
 	if(!dbn || !tbn ) {
 		LM_ERR("invalid parameter");
 		return NULL;
 	}
 
 	hash = core_hash(dbn, tbn, DBCASSA_TABLE_SIZE);
 	hashidx = hash % DBCASSA_TABLE_SIZE;
 
 	ref_read_data(dbcassa_tbl_htable[hashidx].lock);
 
 	tbc = dbcassa_tbl_htable[hashidx].dtp;
 
 	while(tbc) {
 		LM_DBG("found dbname=%.*s, table=%.*s\n", tbc->dbname.len, tbc->dbname.s, tbc->name.len, tbc->name.s);
 		if(tbc->hash==hash && tbc->dbname.len == dbn->len
 				&& tbc->name.len == tbn->len
 				&& !strncasecmp(tbc->dbname.s, dbn->s, dbn->len)
 				&& !strncasecmp(tbc->name.s, tbn->s, tbn->len)) {
 
 			memcpy(full_path_buf + dbcassa_schema_path.len, dbn->s, dbn->len);
 			len = dbcassa_schema_path.len + dbn->len;
 			full_path_buf[len++] = '/';
 			memcpy(full_path_buf + len, tbn->s, tbn->len);
 			full_path_buf[len + tbn->len] = '\0';
 
 			if(dbcassa_check_mtime(&tbc->mt) == 0)
 				return tbc;
 			old_tbc = tbc;
 			break;
 		}
 		tbc = tbc->next;
 	}
 	unref_read_data(dbcassa_tbl_htable[hashidx].lock);
 	if(!old_tbc)
 		return NULL;
 
 	/* the file has changed - load again the schema */
 	new_tbc = dbcassa_load_file((str*)dbn, (str*)tbn);
 	if(!new_tbc)
 	{
 		LM_ERR("could not load database from file [%.*s]\n", tbn->len, tbn->s);
 		return NULL;
 	}
 	new_tbc->hash = hashidx;
 
 	/* lock for write */
 	ref_write_data(dbcassa_tbl_htable[hashidx].lock);
 	tbc = dbcassa_tbl_htable[hashidx].dtp;
 
 	while(tbc) {
 		if(tbc == old_tbc)
 			break;
 		prev_tbc = tbc;
 		tbc = tbc->next;
 	}
 
 	/* somebody else might have rewritten it in the mean time? just return the existing one */
 	if(!tbc) {
 		unref_write_data(dbcassa_tbl_htable[hashidx].lock);
 		return dbcassa_db_search_table(hashidx, hash, dbn, tbn);
 	}
 
 	/* replace the table */
 	new_tbc->next = old_tbc->next;
 	if(prev_tbc)
 		prev_tbc->next = new_tbc;
 	else
 		dbcassa_tbl_htable[hashidx].dtp = new_tbc;
 	dbcassa_table_free(old_tbc);
 	unref_write_data(dbcassa_tbl_htable[hashidx].lock);
 
 	/* lock for read, search the table and return */
 	return dbcassa_db_search_table(hashidx, hash, dbn, tbn);
 }
 
 /*
  *	Read all table schemas at startup
  * */
 int dbcassa_read_table_schemas(void)
 {
 	int i, j;
 	str db_name, tb_name;
 	DIR* srcdir = opendir(dbcassa_schema_path.s);
 	DIR* db_dir;
 	struct dirent* dent;
 	int fn_len = dbcassa_schema_path.len;
 	struct stat fstat;
 	int dir_len;
 	dbcassa_table_p tbc;
 	unsigned int hashidx;
 
 	/* init tables' hash table */
 	if (!dbcassa_tbl_htable) {
 		dbcassa_tbl_htable = (dbcassa_tbl_htable_p)shm_malloc(DBCASSA_TABLE_SIZE*
 					sizeof(dbcassa_tbl_htable_t));
 		if(dbcassa_tbl_htable==NULL)
 		{
 			LM_CRIT("no enough shm mem\n");
 			return -1;
 		}
 		memset(dbcassa_tbl_htable, 0, DBCASSA_TABLE_SIZE*sizeof(dbcassa_tbl_htable_t));
 		for(i=0; i<DBCASSA_TABLE_SIZE; i++)
 		{
 			if (lock_init(&dbcassa_tbl_htable[i].lock.lock)==0)
 			{
 				LM_CRIT("cannot init tables' sem's\n");
 				for(j=i-1; j>=0; j--)
 					lock_destroy(&dbcassa_tbl_htable[j].rw_lock.lock);
 				return -1;
 			}
 		}
 	}
 
 	memset(full_path_buf, 0, _POSIX_PATH_MAX);
 	strcpy(full_path_buf, dbcassa_schema_path.s);
 	if (full_path_buf[dbcassa_schema_path.len - 1] != '/') {
 		full_path_buf[fn_len++]= '/';
 		dbcassa_schema_path.len++;
 	}
 
 	if (srcdir == NULL) {
 		perror("opendir");
 		return -1;
 	}
 	LM_DBG("Full name= %.*s\n", fn_len, full_path_buf);
 
 	while((dent = readdir(srcdir)) != NULL)
 	{
 		if(strcmp(dent->d_name, ".") == 0 || strcmp(dent->d_name, "..") == 0)
 			continue;
 
 		/* Calculate full name, check we are in file length limits */
 		if ((fn_len + strlen(dent->d_name) + 1) > _POSIX_PATH_MAX)
 			continue;
 
 		db_name.s = dent->d_name;
 		db_name.len = strlen(dent->d_name);
 		
 		strcpy(full_path_buf+fn_len, dent->d_name);
 		dir_len = fn_len + db_name.len;
 
 		LM_DBG("Full dir name= %.*s\n", dir_len, full_path_buf);
 
 		if (stat(full_path_buf, &fstat) < 0) {
 			LM_ERR("stat failed %s\n", strerror(errno));
 			continue;
 		}
 
 		if (!S_ISDIR(fstat.st_mode))  {
 			LM_ERR("not a directory\n");
 			continue;
 		}
 
 		/*
 		if (fstatat(dirfd(srcdir), dent->d_name, &st) < 0)
 		{
 			perror(dent->d_name);
 			continue;
 		}
 		*/
 
 		LM_DBG("Found database %s\n", dent->d_name);
 		db_dir = opendir(full_path_buf);
 		if(!db_dir) {
 			LM_ERR("Failed to open dictory %s\n", full_path_buf);
 			continue;
 		}
 		full_path_buf[dir_len++]= '/';
 		while((dent = readdir(db_dir)) != NULL)
 		{
 			if(strcmp(dent->d_name, ".") == 0 || strcmp(dent->d_name, "..") == 0)
 				continue;
 			LM_DBG("database table %s\n", dent->d_name);
 			if(dir_len + strlen(dent->d_name)+1 > _POSIX_PATH_MAX) {
 				LM_ERR("File len too large\n");
 				continue;
 			}
 			strcpy(full_path_buf+dir_len, dent->d_name);
 
 			tb_name.s = dent->d_name;
 			tb_name.len = strlen(dent->d_name);
 
 			LM_DBG("File path= %s\n", full_path_buf);
 			tbc = dbcassa_load_file(&db_name, &tb_name);
 			if(!tbc)
 			{
 				LM_ERR("could not load database from file [%s]\n", tb_name.s);
 				return -1;
 			}
 			hashidx = core_hash(&db_name, &tb_name, DBCASSA_TABLE_SIZE);
 			tbc->hash = hashidx;
 			tbc->next = dbcassa_tbl_htable[hashidx].dtp;
 			dbcassa_tbl_htable[hashidx].dtp = tbc;
 		}
 		closedir(db_dir);
 	}
 	closedir(srcdir);
 
 
 	return 0;
 }
 
 /*
  *	Destroy table schema table at shutdown
  * */
 void dbcassa_destroy_htable(void)
 {
 	int i;
 	dbcassa_table_p tbc, tbc0;
 
 	/* destroy tables' hash table*/
 	if(dbcassa_tbl_htable==0)
 		return;
 
 	for(i=0; i<DBCASSA_TABLE_SIZE; i++) {
 		lock_destroy(&dbcassa_tbl_htable[i].rw_lock.lock);
 		tbc = dbcassa_tbl_htable[i].dtp;
 		while(tbc) {
 			tbc0 = tbc;
 			tbc = tbc->next;
 			dbcassa_table_free(tbc0);
 		}
 	}
 	shm_free(dbcassa_tbl_htable);
 }
 
 void dbcassa_lock_release(dbcassa_table_p tbc)
 {
 	unref_read_data(dbcassa_tbl_htable[tbc->hash].lock);
 }