src/modules/db_redis/redis_dbase.c
53e746b5
 /*
  * Copyright (C) 2018 Andreas Granig (sipwise.com)
  *
  * This file is part of Kamailio, a free SIP server.
  *
  * Kamailio is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version
  *
  * Kamailio is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  *
  */
 
 #include <stdlib.h>
 #include <time.h>
 
 #include "db_redis_mod.h"
 #include "redis_connection.h"
 #include "redis_dbase.h"
 #include "redis_table.h"
 
9da53bae
 #define TIMESTAMP_STR_LENGTH 19
 
53e746b5
 static void db_redis_dump_reply(redisReply *reply) {
d12eba62
     int i;
53e746b5
     if (reply->type == REDIS_REPLY_STRING) {
         LM_DBG("%s\n", reply->str);
     } else if (reply->type == REDIS_REPLY_INTEGER) {
         LM_DBG("%lld\n", reply->integer);
     } else if (reply->type == REDIS_REPLY_NIL) {
         LM_DBG("<null>\n");
     } else if (reply->type == REDIS_REPLY_ARRAY) {
384c2ab8
         LM_DBG("printing %lu elements in array reply\n", (unsigned long) reply->elements);
a72c4e8e
         for(i = 0; i < reply->elements; ++i) {
53e746b5
             db_redis_dump_reply(reply->element[i]);
         }
     } else {
         LM_DBG("not printing invalid reply type\n");
     }
 }
 
 // TODO: utilize auto-expiry? on insert/update, also update expire value
 // of mappings
 
 /*
  * Initialize database module
  * No function should be called before this
  */
 db1_con_t* db_redis_init(const str* _url) {
     return db_do_init(_url, (void *)db_redis_new_connection);
 }
 
 /*
  * Shut down database module
  * No function should be called after this
  */
 void db_redis_close(db1_con_t* _h) {
     LM_DBG("closing redis db connection\n");
     db_do_close(_h, db_redis_free_connection);
 }
 
56ad1423
 
 static db1_res_t* db_redis_new_result(void) {
     db1_res_t* obj;
 
     obj = db_new_result();
     if (!obj)
         return NULL;
     return obj;
 }
 
53e746b5
 static int db_redis_val2str(const db_val_t *v, str *_str) {
     const char *s;
     const str *tmpstr;
     int vtype = VAL_TYPE(v);
f9ae1174
     struct tm _time;
53e746b5
     _str->s = NULL;
     _str->len = 32; // default for numbers
 
     if (VAL_NULL(v)) {
         LM_DBG("converting <null> value to str\n");
         _str->len = 0;
         return 0;
     }
 
     switch (vtype) {
         case DB1_INT:
             LM_DBG("converting int value %d to str\n", VAL_INT(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
             snprintf(_str->s, _str->len, "%d", VAL_INT(v));
             _str->len = strlen(_str->s);
             break;
c66bb0b5
        case DB1_UINT:
             LM_DBG("converting uint value %u to str\n", VAL_UINT(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
             snprintf(_str->s, _str->len, "%u", VAL_UINT(v));
             _str->len = strlen(_str->s);
             break;
53e746b5
         case DB1_BIGINT:
             LM_DBG("converting bigint value %lld to str\n", VAL_BIGINT(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
9da53bae
             snprintf(_str->s, _str->len, "%010lld", VAL_BIGINT(v));
53e746b5
             _str->len = strlen(_str->s);
             break;
c66bb0b5
         case DB1_UBIGINT:
             LM_DBG("converting ubigint value %llu to str\n", VAL_UBIGINT(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
9da53bae
             snprintf(_str->s, _str->len, "%010llu", VAL_UBIGINT(v));
c66bb0b5
             _str->len = strlen(_str->s);
             break;
53e746b5
         case DB1_STRING:
             s = VAL_STRING(v);
             _str->len = strlen(s);
             LM_DBG("converting string value '%s' with len %d to str\n", s, _str->len);
             _str->s = (char*)pkg_malloc(_str->len + 1);
             if (!_str->s) goto memerr;
             //memcpy(_str->s, s, _str->len);
             //_str->s[_str->len] = '\0';
             memset(_str->s, 0, _str->len + 1);
             strncpy(_str->s, s, _str->len);
             break;
         case DB1_STR:
             tmpstr = &(VAL_STR(v));
             LM_DBG("converting str value '%.*s' with len %d to str\n", tmpstr->len, tmpstr->s, tmpstr->len);
             // copy manually to add 0 termination
             _str->s = (char*)pkg_malloc(tmpstr->len + 1);
             if (!_str->s) goto memerr;
             _str->len = tmpstr->len;
             memcpy(_str->s, tmpstr->s, _str->len);
             _str->s[_str->len] = '\0';
             break;
         case DB1_DATETIME:
             LM_DBG("converting datetime value %ld to str\n", VAL_TIME(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
f9ae1174
             localtime_r(&(VAL_TIME(v)), &_time);
             strftime(_str->s, _str->len, "%Y-%m-%d %H:%M:%S", &_time);
53e746b5
             _str->len = strlen(_str->s);
             break;
         case DB1_DOUBLE:
             LM_DBG("converting double value %f to str\n", VAL_DOUBLE(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
             snprintf(_str->s, _str->len, "%.6f", VAL_DOUBLE(v));
             _str->len = strlen(_str->s);
             break;
         case DB1_BITMAP:
             LM_DBG("converting bitmap value %u to str\n", VAL_BITMAP(v));
             _str->s = (char*)pkg_malloc(_str->len);
             if (!_str->s) goto memerr;
             snprintf(_str->s, _str->len, "%u", VAL_BITMAP(v));
             _str->len = strlen(_str->s);
             break;
         case DB1_BLOB:
         default:
             LM_ERR("Unsupported val type %d\n", vtype);
             goto err;
     }
 
     return 0;
 
 memerr:
     LM_ERR("Failed to allocate memory to convert value to string\n");
 err:
     return -1;
 }
 
56ad1423
 static int db_redis_return_version(const db1_con_t* _h, km_redis_con_t *con, const str *table_name,
     db1_res_t **_r) {
 
     struct str_hash_entry *table_e;
     redis_table_t *table;
     db_val_t* dval;
     db_row_t* drow;
 
     LM_DBG("get table version\n");
 
     table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
     if (!table_e) {
         LM_ERR("query to undefined table '%.*s', define it in schema file!\n",
                 table_name->len, table_name->s);
         return -1;
     }
     table = (redis_table_t*)table_e->u.p;
 
     *_r = db_redis_new_result();
     if (!*_r) {
         LM_ERR("Failed to allocate memory for result");
         return -1;
     }
     RES_NUM_ROWS(*_r) = 1;
     RES_COL_N(*_r) = 1;
     RES_ROW_N(*_r) = 1;
     if (db_allocate_rows(*_r) != 0) {
         LM_ERR("Failed to allocate memory for rows\n");
         goto err;
     }
     if (db_allocate_columns(*_r, 1) != 0) {
         LM_ERR("Failed to allocate memory for result columns");
         goto err;
     }
 
     drow = &(RES_ROWS(*_r)[0]);
 
     if (db_allocate_row(*_r, drow) != 0) {
         LM_ERR("Failed to allocate row %d\n", RES_NUM_ROWS(*_r));
         goto err;
     }
 
     dval = &(ROW_VALUES(drow)[0]);
 
     VAL_TYPE(dval) = DB1_INT;
     VAL_NULL(dval) = 0;
     VAL_INT(dval) = table->version;
 
     LM_DBG("returning short-cut table version %d for table '%.*s'",
         table->version, table_name->len, table_name->s);
 
     return 0;
 
 err:
     if (*_r) db_redis_free_result((db1_con_t*)_h, *_r);
     return -1;
 }
 
a72c4e8e
 static int db_redis_build_entry_manual_keys(redis_table_t *table, const db_key_t *_k,
d12eba62
         const db_val_t *_v, const int _n, int **manual_keys, int *manual_key_count) {
53e746b5
 
     // TODO: we also put keys here which are already part of type mapping!
     // there must be removed for performance reasons
 
     redis_key_t *key = NULL;
 
     *manual_keys = (int*)pkg_malloc(_n * sizeof(int));
     if (! *manual_keys) {
         LM_ERR("Failed to allocate memory for manual key indices\n");
         goto err;
     }
     memset(*manual_keys, 0, _n * sizeof(int));
     *manual_key_count = 0;
 
     for (key = table->entry_keys; key; key = key->next) {
         int subkey_found = 0;
a72c4e8e
         int i;
3b412293
         *manual_key_count = 0;
53e746b5
         LM_DBG("checking for existence of entry key '%.*s' in query to get manual key\n",
                 key->key.len, key->key.s);
a72c4e8e
         for (i = 0; i < _n; ++i) {
53e746b5
             const db_key_t k = _k[i];
             if (!str_strcmp(&key->key, (str*)k)) {
                 LM_DBG("found key in entry key\n");
                 subkey_found = 1;
                 break;
             } else {
                 (*manual_keys)[*manual_key_count] = i;
                 (*manual_key_count)++;
             }
         }
         if (!subkey_found) {
             break;
         }
     }
     return 0;
 
 err:
     if (*manual_keys) {
         pkg_free(*manual_keys);
         *manual_keys = NULL;
     }
     return -1;
 }
 
a72c4e8e
 static int db_redis_find_query_key(redis_key_t *key, const str *table_name,
9da53bae
         redis_table_t *table,
         str *type_name, const db_key_t *_k, const db_val_t *_v, const db_op_t *_op, const int _n,
         str *key_name, int *key_found, uint64_t *ts_scan_start) {
53e746b5
 
     unsigned int len;
     str val = {NULL, 0};
 
     *key_found = 1;
     key_name->len = 0;
     key_name->s = NULL;
 
     for (; key; key = key->next) {
         int subkey_found = 0;
a72c4e8e
         int i;
53e746b5
         LM_DBG("checking for existence of entry key '%.*s' in query\n",
                 key->key.len, key->key.s);
a72c4e8e
         for (i = 0; i < _n; ++i) {
53e746b5
             const db_key_t k = _k[i];
             const db_val_t v = _v[i];
9da53bae
             const db_op_t op = _op ? _op[i] : NULL;
53e746b5
 
             if (VAL_NULL(&v)) {
                 LM_DBG("Skipping null value for given key '%.*s'\n",
                         k->len, k->s);
                 break;
9da53bae
             } else if (op && strcmp(op, OP_EQ)
                     && !((VAL_TYPE(&v) == DB1_DATETIME || VAL_TYPE(&v) == DB1_BIGINT
                             || VAL_TYPE(&v) == DB1_UBIGINT)
                         && (!strcmp(op, OP_LT) || !strcmp(op, OP_GT)))) {
                 LM_DBG("Skipping non-EQ op (%s) for given key '%.*s'\n",
                         op, k->len, k->s);
                 break;
53e746b5
             } else if (!str_strcmp(&key->key, (str*)k)) {
                 LM_DBG("found key in entry key\n");
                 if (db_redis_val2str(&v, &val) != 0) goto err;
                 if (val.s == NULL) {
                     LM_DBG("key value in entry key is null, skip key\n");
                     subkey_found = 0;
                     break;
                 }
                 if (!key_name->len) {
9da53bae
                     // <version>:<table_name>:<type>::<val>
                     len = table->version_code.len + table_name->len + 1 + type_name->len + 2 + val.len + 1; //snprintf writes term 0 char
53e746b5
                     key_name->s = (char*)pkg_malloc(len);
                     if (!key_name->s) {
                         LM_ERR("Failed to allocate key memory\n");
                         goto err;
                     }
9da53bae
                     snprintf(key_name->s, len, "%.*s%.*s:%.*s::%.*s",
                             table->version_code.len, table->version_code.s,
53e746b5
                             table_name->len, table_name->s,
                             type_name->len, type_name->s,
                             val.len, val.s);
                     key_name->len = len-1; // subtract the term 0 char
 
                 } else {
                     // :<val>
                     key_name->s = (char*)pkg_realloc(key_name->s, key_name->len + val.len + 2);
                     if (!key_name->s) {
                         LM_ERR("Failed to allocate key memory\n");
                         goto err;
                     }
                     snprintf(key_name->s + key_name->len, 1 + val.len + 1, ":%.*s",
                             val.len, val.s);
                     key_name->len += (1 + val.len);
                 }
9da53bae
                 if (op && (VAL_TYPE(&v) == DB1_DATETIME || VAL_TYPE(&v) == DB1_BIGINT
                             || VAL_TYPE(&v) == DB1_UBIGINT)
                         && (!strcmp(op, OP_LT) || !strcmp(op, OP_GT))) {
                     // Special case: we support matching < or > against timestamps and ints using a special
                     // key scanning method. We do this only for a single timestamp/int occurance, and we
                     // still do a table scan, just not a full table scan.
                     if (!ts_scan_start) {
                         LM_DBG("key '%.*s' for type '%.*s' found as timestamp or int, but table scans "
                                 "not supported, unable to use this type\n",
                                 key->key.len, key->key.s, type_name->len, type_name->s);
                         break;
                     }
                     // ts_scan_start is: 31 bits of current full key length, 31 bits of this value length,
                     // one bit of directionality, one bit of length variable indicator
                     if (VAL_TYPE(&v) == DB1_DATETIME && *ts_scan_start == 0 && val.len == TIMESTAMP_STR_LENGTH) {
                         *ts_scan_start = key_name->len | ((uint64_t) TIMESTAMP_STR_LENGTH << 31);
                         if (!strcmp(op, OP_LT))
                             *ts_scan_start |= 0x8000000000000000ULL;
                         LM_DBG("preparing for timestamp range scan at key offset %llx\n",
                                     (unsigned long long) *ts_scan_start);
                         *key_found = 0; // this forces a table scan using the new match key
                     }
                     else if ((VAL_TYPE(&v) == DB1_BIGINT
                                 || VAL_TYPE(&v) == DB1_UBIGINT) && *ts_scan_start == 0) {
                         *ts_scan_start = key_name->len | ((uint64_t) val.len << 31);
                         *ts_scan_start |= 0x4000000000000000ULL; // length is variable
                         if (!strcmp(op, OP_LT))
                             *ts_scan_start |= 0x8000000000000000ULL;
                         LM_DBG("preparing for int range scan at key offset %llx\n",
                                     (unsigned long long) *ts_scan_start);
                         *key_found = 0; // this forces a table scan using the new match key
                     }
                 }
53e746b5
                 LM_DBG("entry key so far is '%.*s'\n", key_name->len, key_name->s);
                 subkey_found = 1;
                 pkg_free(val.s);
                 val.s = NULL;
                 break;
             }
         }
         if (!subkey_found) {
             LM_DBG("key '%.*s' for type '%.*s' not found, unable to use this type\n",
                     key->key.len, key->key.s, type_name->len, type_name->s);
             if (key_name->s) {
                 pkg_free(key_name->s);
                 key_name->s = NULL;
                 key_name->len = 0;
             }
             *key_found = 0;
             break;
         }
     }
 
9da53bae
     // for value-less master keys
     if (!key_name->len) {
         // <version>:<table_name>:<type>
         len = table->version_code.len + table_name->len + 1 + type_name->len + 1;
         key_name->s = (char*)pkg_malloc(len);
         if (!key_name->s) {
             LM_ERR("Failed to allocate key memory\n");
             goto err;
         }
         snprintf(key_name->s, len, "%.*s%.*s:%.*s",
                 table->version_code.len, table->version_code.s,
                 table_name->len, table_name->s,
                 type_name->len, type_name->s);
         key_name->len = len-1;
     }
 
53e746b5
     return 0;
 
 err:
     if (val.s)
         pkg_free(val.s);
     if(key_name->s) {
         pkg_free(key_name->s);
         key_name->s = NULL;
         key_name->len = 0;
     }
     return -1;
 }
 
 static int db_redis_build_entry_keys(km_redis_con_t *con, const str *table_name,
         const db_key_t *_k, const db_val_t *_v, const int _n,
         redis_key_t **keys, int *keys_count) {
 
     struct str_hash_entry *table_e;
     redis_table_t *table;
     redis_key_t *key;
     int key_found;
     str type_name = str_init("entry");
     str keyname = {NULL, 0};
 
     LM_DBG("build entry keys\n");
 
     table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
     if (!table_e) {
         LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
                 table_name->len, table_name->s);
         return -1;
     }
     table = (redis_table_t*)table_e->u.p;
     key = table->entry_keys;
9da53bae
     if (db_redis_find_query_key(key, table_name, table, &type_name, _k, _v, NULL, _n, &keyname, &key_found, NULL) != 0) {
53e746b5
         goto err;
     }
     if (key_found) {
         if (db_redis_key_add_str(keys, &keyname) != 0) {
             LM_ERR("Failed to add key string\n");
             goto err;
         }
         LM_DBG("found suitable entry key '%.*s' for query\n",
                 (*keys)->key.len, (*keys)->key.s);
         *keys_count = 1;
     } else {
         LM_ERR("Failed to create direct entry key, no matching key definition\n");
         goto err;
     }
9da53bae
     if (keyname.s)
         pkg_free(keyname.s);
53e746b5
 
     return 0;
 
 err:
     db_redis_key_free(keys);
     if (keyname.s)
         pkg_free(keyname.s);
     return -1;
 }
 
 static int db_redis_get_keys_for_all_types(km_redis_con_t *con, const str *table_name,
         redis_key_t **keys, int *keys_count) {
 
     struct str_hash_entry *table_e;
     redis_table_t *table;
     redis_type_t *type;
     redis_key_t *key;
 
     *keys = NULL;
     *keys_count = 0;
 
     table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
     if (!table_e) {
         LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
                 table_name->len, table_name->s);
         return -1;
     }
     table = (redis_table_t*)table_e->u.p;
 
     for (type = table->types; type; type = type->next) {
         for (key = type->keys; key; key = key->next) {
             if (db_redis_key_add_str(keys, &key->key) != 0) {
                 LM_ERR("Failed to add key string\n");
                 goto err;
             }
             (*keys_count)++;
         }
     }
 
     return 0;
 
 err:
     db_redis_key_free(keys);
     return -1;
 }
 
 static int db_redis_build_type_keys(km_redis_con_t *con, const str *table_name,
         const db_key_t *_k, const db_val_t *_v, const int _n,
9da53bae
         redis_key_t **keys, redis_key_t **set_keys, int *keys_count) {
53e746b5
 
     struct str_hash_entry *table_e;
     redis_table_t *table;
     redis_type_t *type;
     redis_key_t *key;
 
     *keys = NULL;
     *keys_count = 0;
 
     LM_DBG("build type keys\n");
 
     table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
     if (!table_e) {
         LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
                 table_name->len, table_name->s);
         return -1;
     }
     table = (redis_table_t*)table_e->u.p;
 
     for (type = table->types; type; type = type->next) {
         str *type_name = &(type->type);
         int key_found = 0;
         str keyname = {NULL, 0};
         key = type->keys;
 
9da53bae
         if (db_redis_find_query_key(key, table_name, table, &type->type, _k, _v, NULL, _n, &keyname, &key_found, NULL) != 0) {
53e746b5
             goto err;
         }
         if (key_found) {
22bd6ca0
             if (db_redis_key_add_str(keys, &keyname) != 0) {
                 LM_ERR("Failed to add query key to key list\n");
                 goto err;
             }
53e746b5
             (*keys_count)++;
             LM_DBG("found key '%.*s' for type '%.*s'\n",
                     keyname.len, keyname.s,
                     type_name->len, type_name->s);
9da53bae
 
             if (set_keys) {
                 // add key for parent set
                 // <version>:<table>::index::<type>
                 pkg_free(keyname.s);
                 keyname.len = table->version_code.len + table_name->len + 9 + type->type.len;
                 keyname.s = pkg_malloc(keyname.len + 1);
                 if (!keyname.s) {
                     LM_ERR("Failed to allocate memory for parent set key\n");
                     goto err;
                 }
                 sprintf(keyname.s, "%.*s%.*s::index::%.*s",
                         table->version_code.len, table->version_code.s,
                         table_name->len, table_name->s,
                         type->type.len, type->type.s);
                 if (db_redis_key_add_str(set_keys, &keyname) != 0) {
                     LM_ERR("Failed to add query key to set key list\n");
                     goto err;
                 }
             }
53e746b5
         }
9da53bae
         if (keyname.s)
             pkg_free(keyname.s);
53e746b5
     }
 
     return 0;
 
 err:
     LM_ERR("Failed to get type key\n");
     db_redis_key_free(keys);
     return -1;
 }
 
 static int db_redis_build_query_keys(km_redis_con_t *con, const str *table_name,
         const db_key_t *_k, const db_val_t *_v, const db_op_t *_op, const int _n,
         redis_key_t **query_keys, int *query_keys_count, int **manual_keys, int *manual_keys_count,
9da53bae
         int *do_table_scan, uint64_t *ts_scan_start, str *ts_scan_key) {
53e746b5
 
     struct str_hash_entry *table_e;
     redis_table_t *table;
     redis_type_t *type;
     redis_key_t *key;
     str keyname;
     int key_found;
     redisReply *reply = NULL;
     str typename = str_init(REDIS_DIRECT_PREFIX);
 
     *query_keys = NULL;
     *query_keys_count = 0;
     *do_table_scan = 1;
 
     LM_DBG("build query keys\n");
 
     table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
     if (!table_e) {
         LM_ERR("query to undefined table '%.*s', define in db_redis keys parameter!",
                 table_name->len, table_name->s);
         return -1;
     }
     table = (redis_table_t*)table_e->u.p;
 
     // check if given keys directly match entry key
     keyname.s = NULL;
     keyname.len = 0;
     key = table->entry_keys;
 
9da53bae
     if (db_redis_find_query_key(key, table_name, table, &typename, _k, _v, _op, _n, &keyname, &key_found, NULL) != 0) {
53e746b5
         goto err;
     }
     if (key_found) {
         LM_DBG("found suitable entry key '%.*s' for query\n",
                 keyname.len, keyname.s);
22bd6ca0
         if (db_redis_key_add_str(query_keys, &keyname) != 0) {
             LM_ERR("Failed to add key name to query keys\n");
             goto err;
         }
53e746b5
         *query_keys_count = 1;
         pkg_free(keyname.s);
         keyname.s = NULL;
     } else {
9da53bae
         if (keyname.s)
             pkg_free(keyname.s);
         keyname.s = NULL;
53e746b5
         LM_DBG("no direct entry key found, checking type keys\n");
         for (type = table->types; type; type = type->next) {
             key = type->keys;
             LM_DBG("checking type '%.*s'\n", type->type.len, type->type.s);
9da53bae
             if (db_redis_find_query_key(key, table_name, table, &type->type, _k, _v, _op, _n, &keyname,
                         &key_found, ts_scan_start) != 0) {
53e746b5
                 goto err;
             }
             if (key_found) {
                 redis_key_t *query_v = NULL;
                 char *prefix = "SMEMBERS";
 
                 if (db_redis_key_add_string(&query_v, prefix, strlen(prefix)) != 0) {
                     LM_ERR("Failed to add smembers command to query\n");
22bd6ca0
                     db_redis_key_free(&query_v);
53e746b5
                     goto err;
                 }
                 if (db_redis_key_add_str(&query_v, &keyname) != 0) {
                     LM_ERR("Failed to add key name to smembers query\n");
22bd6ca0
                     db_redis_key_free(&query_v);
53e746b5
                     goto err;
                 }
 
                 reply = db_redis_command_argv(con, query_v);
                 pkg_free(keyname.s);
                 keyname.s = NULL;
                 db_redis_key_free(&query_v);
                 db_redis_check_reply(con, reply, err);
                 if (reply->type == REDIS_REPLY_ARRAY) {
                     if (reply->elements == 0) {
                         LM_DBG("type query returned empty list\n");
                         *query_keys_count = 0;
                         *do_table_scan = 0;
                         db_redis_free_reply(&reply);
                         break;
                     } else {
d12eba62
                         int i;
53e746b5
                         LM_DBG("populating query keys list with result of type query\n");
                         *query_keys_count = reply->elements;
a72c4e8e
                         for (i = 0; i < reply->elements; ++i) {
53e746b5
                             redisReply *subreply = reply->element[i];
                             if (subreply->type == REDIS_REPLY_STRING) {
                                 LM_DBG("adding resulting entry key '%s' from type query\n", subreply->str);
9da53bae
                                 if (db_redis_key_prepend_string(query_keys, subreply->str, strlen(subreply->str)) != 0) {
53e746b5
                                     LM_ERR("Failed to add query key\n");
                                     goto err;
                                 }
                             } else {
                                 LM_ERR("Unexpected entry key type in type query, expecting a string\n");
                                 goto err;
                             }
                         }
                     }
                 } else {
                     LM_ERR("Unexpected reply for type query, expecting an array\n");
                     goto err;
                 }
 
                 db_redis_free_reply(&reply);
                 break;
             }
9da53bae
             else if (keyname.s && *ts_scan_start) {
                 LM_DBG("will use key '%.*s' at offset %llx for timestamp/int range scan\n",
                         keyname.len, keyname.s, (unsigned long long) *ts_scan_start);
                 *ts_scan_key = keyname;
                 keyname.s = NULL;
             }
             else if (keyname.s) {
                 pkg_free(keyname.s);
                 keyname.s = NULL;
             }
53e746b5
         }
     }
 
     if (*query_keys_count > 0) {
         LM_DBG("building manual keys\n");
         if (db_redis_build_entry_manual_keys(table, _k, _v, _n, manual_keys, manual_keys_count) != 0) {
             LM_ERR("Failed to build manual entry key list\n");
             goto err;
 
         }
     }
 
     return 0;
 err:
     if (keyname.s) {
         pkg_free(keyname.s);
         keyname.s = NULL;
     }
     if (reply) {
         db_redis_free_reply(&reply);
     }
     db_redis_key_free(query_keys);
     if (*manual_keys) {
         pkg_free(*manual_keys);
         *manual_keys = NULL;
     }
     return -1;
 }
 
9da53bae
 static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *match_pattern,
         const int _n,
53e746b5
         redis_key_t **query_keys, int *query_keys_count,
9da53bae
         int **manual_keys, int *manual_keys_count, unsigned int match_count_start_val) {
53e746b5
 
     size_t i = 0;
     redis_key_t *query_v = NULL;
     redisReply *reply = NULL;
9da53bae
     redisReply *keys_list = NULL;
a72c4e8e
     size_t j;
     int l;
53e746b5
 
 
9da53bae
 #undef USE_SCAN
 
 #ifdef USE_SCAN
 
     char cursor_str[32] = "";
     unsigned long cursor = 0;
     unsigned int match_count = match_count_start_val;
     char match_count_str[16];
53e746b5
 
     do {
         snprintf(cursor_str, sizeof(cursor_str), "%lu", cursor);
 
         if (db_redis_key_add_string(&query_v, "SCAN", 4) != 0) {
             LM_ERR("Failed to add scan command to scan query\n");
             goto err;
         }
         if (db_redis_key_add_string(&query_v, cursor_str, strlen(cursor_str)) != 0) {
             LM_ERR("Failed to add cursor to scan query\n");
             goto err;
         }
         if (db_redis_key_add_string(&query_v, "MATCH", 5) != 0) {
             LM_ERR("Failed to add match command to scan query\n");
             goto err;
         }
9da53bae
         if (db_redis_key_add_string(&query_v, match_pattern->s, match_pattern->len) != 0) {
53e746b5
             LM_ERR("Failed to add match pattern to scan query\n");
             goto err;
         }
cc6865cd
         if (db_redis_key_add_string(&query_v, "COUNT", 5) != 0) {
             LM_ERR("Failed to add count command to scan query\n");
             goto err;
         }
9da53bae
         l = snprintf(match_count_str, sizeof(match_count_str), "%u", match_count);
         if (l <= 0) {
             LM_ERR("Failed to print integer for scan query\n");
             goto err;
         }
         if (db_redis_key_add_string(&query_v, match_count_str, l) != 0) {
cc6865cd
             LM_ERR("Failed to add count value to scan query\n");
             goto err;
         }
53e746b5
 
         reply = db_redis_command_argv(con, query_v);
         db_redis_key_free(&query_v);
         db_redis_check_reply(con, reply, err);
         if (reply->type != REDIS_REPLY_ARRAY) {
             LM_ERR("Invalid reply type for scan on table '%.*s', expected array\n",
9da53bae
                     match_pattern->len, match_pattern->s);
53e746b5
             goto err;
         }
         if (reply->elements != 2) {
             LM_ERR("Invalid number of reply elements for scan on table '%.*s', expected 2, got %lu\n",
9da53bae
                     match_pattern->len, match_pattern->s, reply->elements);
53e746b5
             goto err;
         }
 
         if (reply->element[0]->type == REDIS_REPLY_STRING) {
             cursor = atol(reply->element[0]->str);
         } else if (reply->element[0]->type == REDIS_REPLY_INTEGER) {
             // should not happen, but play it safe
             cursor = reply->element[0]->integer;
         } else {
             LM_ERR("Invalid cursor type for scan on table '%.*s', expected string or integer\n",
9da53bae
                     match_pattern->len, match_pattern->s);
53e746b5
             goto err;
         }
cc6865cd
         LM_DBG("cursor is %lu\n", cursor);
53e746b5
 
9da53bae
         keys_list = reply->element[1];
 
 #else // use KEYS
 
     if (db_redis_key_add_string(&query_v, "KEYS", 4) != 0) {
         LM_ERR("Failed to add scan command to scan query\n");
         goto err;
     }
     if (db_redis_key_add_string(&query_v, match_pattern->s, match_pattern->len) != 0) {
         LM_ERR("Failed to add match pattern to scan query\n");
         goto err;
     }
 
     reply = db_redis_command_argv(con, query_v);
     db_redis_key_free(&query_v);
     db_redis_check_reply(con, reply, err);
 
     keys_list = reply;
 
 #endif
 
         if (keys_list->type != REDIS_REPLY_ARRAY) {
53e746b5
             LM_ERR("Invalid content type for scan on table '%.*s', expected array\n",
9da53bae
                     match_pattern->len, match_pattern->s);
53e746b5
             goto err;
         }
 
9da53bae
         *query_keys_count += keys_list->elements;
53e746b5
 
9da53bae
         for (j = 0; j < keys_list->elements; ++i, ++j) {
             redisReply *key = keys_list->element[j];
53e746b5
             if (!key) {
                 LM_ERR("Invalid null key at cursor result index %lu while scanning table '%.*s'\n",
384c2ab8
                         (unsigned long) j, match_pattern->len, match_pattern->s);
53e746b5
                 goto err;
             }
             if (key->type != REDIS_REPLY_STRING) {
                 LM_ERR("Invalid key type at cursor result index %lu while scanning table '%.*s', expected string\n",
384c2ab8
                         (unsigned long) j, match_pattern->len, match_pattern->s);
53e746b5
                 goto err;
             }
cc6865cd
             if (db_redis_key_prepend_string(query_keys, key->str, strlen(key->str)) != 0) {
                 LM_ERR("Failed to prepend redis key\n");
53e746b5
                 goto err;
             }
         }
9da53bae
 
 #ifdef USE_SCAN
         // exponential increase and falloff, hovering around 1000 results
         if (keys_list->elements > 1300 && match_count > 500)
             match_count /= 2;
         else if (keys_list->elements < 700 && match_count < 500000)
             match_count *= 2;
 #endif
 
53e746b5
         db_redis_free_reply(&reply);
9da53bae
 
 #ifdef USE_SCAN
53e746b5
     } while (cursor > 0);
9da53bae
 #endif
53e746b5
 
     // for full table scans, we have to manually match all given keys
9da53bae
     // but only do this once for repeated invocations
     if (!*manual_keys) {
         *manual_keys_count = _n;
         *manual_keys = (int*)pkg_malloc(*manual_keys_count * sizeof(int));
         if (! *manual_keys) {
             LM_ERR("Failed to allocate memory for manual keys\n");
             goto err;
         }
         memset(*manual_keys, 0, *manual_keys_count * sizeof(int));
         for (l = 0; l < _n; ++l) {
             (*manual_keys)[l] = l;
         }
53e746b5
     }
 
     if (reply) {
         db_redis_free_reply(&reply);
     }
cc6865cd
 
384c2ab8
     LM_DBG("got %lu entries by scan\n", (unsigned long) i);
53e746b5
     return 0;
 
 err:
     if (reply)
         db_redis_free_reply(&reply);
     db_redis_key_free(&query_v);
     db_redis_key_free(query_keys);
     *query_keys_count = 0;
     if (*manual_keys) {
         pkg_free(*manual_keys);
         *manual_keys = NULL;
     }
     return -1;
 }
 
9da53bae
 static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
         const int _n,
         redis_key_t **query_keys, int *query_keys_count,
         int **manual_keys, int *manual_keys_count, uint64_t ts_scan_start, const str *ts_scan_key) {
 
     struct str_hash_entry *table_e;
     redis_table_t *table;
     char *match = NULL;
     int ret;
     redisReply *reply = NULL;
278ff3a0
     redis_key_t *set_key = NULL;
daa86b20
     int i, j;
9da53bae
 
     *query_keys = NULL;
     *query_keys_count = 0;
     *manual_keys = NULL;
     *manual_keys_count = 0;
     redis_key_t *set_keys = NULL;
     int set_keys_count = 0;
 
     table_e = str_hash_get(&con->tables, table_name->s, table_name->len);
     if (!table_e) {
         LM_ERR("query to undefined table '%.*s', define it in schema file!\n",
                 table_name->len, table_name->s);
         return -1;
     }
     table = (redis_table_t*)table_e->u.p;
 
     if (!ts_scan_start) {
         // full table scan
         match = (char*)pkg_malloc(table->version_code.len
                 + table_name->len + 10); // length of ':entry::*' plus \0
         if (!match) {
             LM_ERR("Failed to allocate memory for match pattern\n");
             return -1;
         }
         int len = sprintf(match, "%.*s%.*s:entry::*",
                 table->version_code.len, table->version_code.s,
                 table_name->len, table_name->s);
         str match_pattern = {match, len};
         ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, query_keys, query_keys_count,
                 manual_keys, manual_keys_count, 1000);
         pkg_free(match);
         return ret;
     }
 
     // timestamp range scan
     // ex: 2019-07-17 17:33:16
     // if >, we match: [3-9]???-??-?? ??:??:??, 2[1-9]??-??-?? ??:??:??, 20[2-9]?-??-?? ??:??:??, etc
     // if <, we match: [0-1]???-??-?? ??:??:??, 200..., 201[0-8]..., etc
     // the maximum match string length is ts_scan_key->len with one character replaced by 5 ('[a-b]')
     //
     // int range scan
     // ex: 12345
     // if >, we match: 2????, 1[3-9]???, ..., plus ?????*
     // if <. we match: ?, ??, ???, ????, 1[0-1]???, 12[0-2]??, etc
     //    ... however we expect a minimum length of 10 digits as per BIGINT printf format
 
     match = pkg_malloc(ts_scan_key->len + 6);
     if (!match) {
         LM_ERR("Failed to allocate memory for match pattern\n");
         return -1;
     }
 
     int scan_lt = (ts_scan_start & 0x8000000000000000ULL) ? 1 : 0;
     int scan_len_variable = (ts_scan_start & 0x4000000000000000ULL) ? 1 : 0;
     unsigned int scan_offset = ts_scan_start & 0x7fffffffULL;
     unsigned int scan_length = (ts_scan_start >> 31) & 0x7fffffffULL;
     scan_offset -= scan_length;
     const char *suffix = ts_scan_key->s + scan_offset + scan_length;
 
     LM_DBG("running timestamp/int range matching: lt %i, lv %i, off %u, len %u\n",
             scan_lt, scan_len_variable, scan_offset, scan_length);
 
     if (scan_lt && scan_len_variable) {
         // match shorter strings
 
         // copy unchanged prefix
         memcpy(match, ts_scan_key->s, scan_offset);
 
         // append a number of ?. minimum string length is 10 digits
daa86b20
         for (i = 0; i < scan_length - 1; i++) {
9da53bae
             int len = scan_offset + i;
             char match_char = ts_scan_key->s[len];
             // skip non-numbers
             if (match_char < '0' || match_char > '9') {
                 match[len] = match_char;
                 continue;
             }
             // append a single ?
             match[len] = '?';
             // append unchanged suffix
             strcpy(match + len + 1, suffix);
             len = strlen(match);
 
             // minimum bigint printf string length
             if (i < 10)
                 continue;
 
             str match_pattern = {match, len};
             LM_DBG("running timestamp/int range matching using pattern '%.*s'\n", len, match);
 
             ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, &set_keys, &set_keys_count,
                     manual_keys, manual_keys_count, 5000);
             if (ret)
                 goto out;
         }
     }
 
daa86b20
     for (i = 0; i < scan_length; i++) {
9da53bae
         int len = scan_offset + i;
         char match_char = ts_scan_key->s[len];
         // skip non-numbers
         if (match_char < '0' || match_char > '9')
             continue;
         // skip numbers that are at the edge of their match range
         if (match_char == '0' && scan_lt)
             continue;
         if (match_char == '1' && scan_lt && i == 0) // no leading 0
             continue;
         if (match_char == '9' && !scan_lt)
             continue;
 
         // copy unchanged prefix
         memcpy(match, ts_scan_key->s, len);
         // append range matcher
         if (scan_lt)
             len += sprintf(match + len, "[0-%c]", match_char - 1);
         else
             len += sprintf(match + len, "[%c-9]", match_char + 1);
         // finish with trailing ?s
daa86b20
 	for (j = i + 1; j < scan_length; j++) {
9da53bae
             match_char = ts_scan_key->s[scan_offset + j];
             // skip non-numbers
             if (match_char < '0' || match_char > '9') {
                 match[len++] = match_char;
                 continue;
             }
             match[len++] = '?';
         }
         // append unchanged suffix
         strcpy(match + len, suffix);
         len = strlen(match);
 
         str match_pattern = {match, len};
         LM_DBG("running timestamp/int range matching using pattern '%.*s'\n", len, match);
 
         ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, &set_keys, &set_keys_count,
                 manual_keys, manual_keys_count, 5000);
         if (ret)
             goto out;
     }
 
     if (!scan_lt && scan_len_variable) {
         // match longer strings
         int len = sprintf(match, "%.*s*%s", scan_offset + scan_length, ts_scan_key->s, suffix);
 
         str match_pattern = {match, len};
         LM_DBG("running timestamp/int range matching using pattern '%.*s'\n", len, match);
 
         ret = db_redis_scan_query_keys_pattern(con, &match_pattern, _n, &set_keys, &set_keys_count,
                 manual_keys, manual_keys_count, 5000);
         if (ret)
             goto out;
     }
 
     // we not have a list of matching type keys in set_keys. now we have to iterate through them
     // and retrieve the set members, and finally build our actual key list
 
     ret = -1;
 
278ff3a0
     for (set_key = set_keys; set_key; set_key = set_key->next) {
9da53bae
         LM_DBG("pulling set members from key '%.*s'\n", set_key->key.len, set_key->key.s);
 
         redis_key_t *query_v = NULL;
         if (db_redis_key_add_string(&query_v, "SMEMBERS", 8) != 0) {
             LM_ERR("Failed to add smembers command to query\n");
             db_redis_key_free(&query_v);
             goto out;
         }
         if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
             LM_ERR("Failed to add key name to smembers query\n");
             db_redis_key_free(&query_v);
             goto out;
         }
 
         reply = db_redis_command_argv(con, query_v);
         db_redis_key_free(&query_v);
         db_redis_check_reply(con, reply, out);
 
         if (reply->type != REDIS_REPLY_ARRAY) {
             LM_ERR("Unexpected reply for type query, expecting an array\n");
             goto out;
         }
 
         LM_DBG("adding %i keys returned from set", (int) reply->elements);
 
daa86b20
         for (i = 0; i < reply->elements; i++) {
9da53bae
             if (reply->element[i]->type != REDIS_REPLY_STRING) {
                 LM_ERR("Unexpected entry key type in type query, expecting a string\n");
                 goto out;
             }
             if (db_redis_key_prepend_string(query_keys, reply->element[i]->str, strlen(reply->element[i]->str))
                     != 0) {
                 LM_ERR("Failed to prepend redis key\n");
                 goto out;
             }
             LM_DBG("adding key '%s'\n", reply->element[i]->str);
         }
         *query_keys_count += reply->elements;
 
         db_redis_free_reply(&reply);
     }
 
     ret = 0;
 
 out:
     pkg_free(match);
     db_redis_key_free(&set_keys);
     db_redis_free_reply(&reply);
     if (ret) {
         db_redis_key_free(query_keys);
         *query_keys_count = 0;
         if (*manual_keys) {
             pkg_free(*manual_keys);
             *manual_keys = NULL;
         }
     }
     return ret;
 }
 
53e746b5
 static int db_redis_compare_column(db_key_t k, db_val_t *v, db_op_t op, redisReply *reply) {
     int i_value;
     long long ll_value;
     double d_value;
     str *tmpstr;
     char tmp[32] = "";
0cc1e167
     struct tm _time;
53e746b5
 
     int vtype = VAL_TYPE(v);
 
     if (VAL_NULL(v) && reply->type == REDIS_REPLY_NIL) {
         LM_DBG("comparing matching NULL values\n");
         return 0;
     } else if (VAL_NULL(v) || reply->type == REDIS_REPLY_NIL) {
         LM_DBG("comparing non-matching NULL values\n");
         return -1;
     }
 
     switch (vtype) {
         case DB1_INT:
             i_value = atoi(reply->str);
             LM_DBG("comparing INT %d %s %d\n", i_value, op, VAL_INT(v));
             if (!strcmp(op, OP_EQ)) {
                 if (i_value == VAL_INT(v))
                     return 0;
             } else if (!strcmp(op, OP_LT)) {
                 if (i_value < VAL_INT(v))
                     return 0;
             } else if (!strcmp(op, OP_GT)) {
                 if (i_value > VAL_INT(v))
                     return 0;
             } else if (!strcmp(op, OP_LEQ)) {
                 if (i_value <= VAL_INT(v))
                     return 0;
             } else if (!strcmp(op, OP_GEQ)) {
                 if (i_value >= VAL_INT(v))
                     return 0;
             } else if (!strcmp(op, OP_NEQ)) {
                 if (i_value != VAL_INT(v))
                     return 0;
             } else if (!strcmp(op, OP_BITWISE_AND)) {
                 if (i_value & VAL_INT(v))
                     return 0;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_BIGINT:
             ll_value = atoll(reply->str);
             LM_DBG("comparing BIGINT %lld %s %lld\n", ll_value, op, VAL_BIGINT(v));
             if (!strcmp(op, OP_EQ)) {
                 if (ll_value == VAL_BIGINT(v))
                     return 0;
             } else if (!strcmp(op, OP_LT)) {
                 if (ll_value < VAL_BIGINT(v))
                     return 0;
             } else if (!strcmp(op, OP_GT)) {
                 if (ll_value > VAL_BIGINT(v))
                     return 0;
             } else if (!strcmp(op, OP_LEQ)) {
                 if (ll_value <= VAL_BIGINT(v))
                     return 0;
             } else if (!strcmp(op, OP_GEQ)) {
                 if (ll_value >= VAL_BIGINT(v))
                     return 0;
             } else if (!strcmp(op, OP_NEQ)) {
                 if (ll_value != VAL_BIGINT(v))
                     return 0;
             } else if (!strcmp(op, OP_BITWISE_AND)) {
                 if (ll_value & VAL_BIGINT(v))
                     return 0;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_STRING:
             LM_DBG("comparing STRING %s %s %s\n", reply->str, op, VAL_STRING(v));
             if (!strcmp(op, OP_EQ)) {
                 return (strcmp(reply->str, VAL_STRING(v)) == 0) ? 0 : -1;
             } else if (!strcmp(op, OP_LT)) {
                 return (strcmp(reply->str, VAL_STRING(v)) < 0) ? 0 : -1;
             } else if (!strcmp(op, OP_GT)) {
                 return (strcmp(reply->str, VAL_STRING(v)) > 0) ? 0 : -1;
             } else if (!strcmp(op, OP_LEQ)) {
                 return (strcmp(reply->str, VAL_STRING(v)) <= 0) ? 0 : -1;
             } else if (!strcmp(op, OP_GEQ)) {
                 return (strcmp(reply->str, VAL_STRING(v)) >= 0) ? 0 : -1;
             } else if (!strcmp(op, OP_NEQ)) {
                 return (strcmp(reply->str, VAL_STRING(v)) != 0) ? 0 : -1;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_STR:
             tmpstr = (struct _str*) &(VAL_STR(v));
             LM_DBG("comparing STR %s %s %.*s\n", reply->str, op, tmpstr->len, tmpstr->s);
             if (!strcmp(op, OP_EQ)) {
                 return (strncmp(reply->str, tmpstr->s, tmpstr->len) == 0) ? 0 : -1;
             } else if (!strcmp(op, OP_LT)) {
                 return (strncmp(reply->str, tmpstr->s, tmpstr->len) < 0) ? 0 : -1;
             } else if (!strcmp(op, OP_GT)) {
                 return (strncmp(reply->str, tmpstr->s, tmpstr->len) > 0) ? 0 : -1;
             } else if (!strcmp(op, OP_LEQ)) {
                 return (strncmp(reply->str, tmpstr->s, tmpstr->len) <= 0) ? 0 : -1;
             } else if (!strcmp(op, OP_GEQ)) {
                 return (strncmp(reply->str, tmpstr->s, tmpstr->len) >= 0) ? 0 : -1;
             } else if (!strcmp(op, OP_NEQ)) {
                 return (strncmp(reply->str, tmpstr->s, tmpstr->len) != 0) ? 0 : -1;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_DOUBLE:
             d_value = atof(reply->str);
             LM_DBG("comparing DOUBLE %f %s %f\n", d_value, op, VAL_DOUBLE(v));
             if (!strcmp(op, OP_EQ)) {
                 return (d_value == VAL_DOUBLE(v)) ? 0 : -1;
             } else if (!strcmp(op, OP_LT)) {
                 return (d_value < VAL_DOUBLE(v)) ? 0 : -1;
             } else if (!strcmp(op, OP_GT)) {
                 return (d_value > VAL_DOUBLE(v)) ? 0 : -1;
             } else if (!strcmp(op, OP_LEQ)) {
                 return (d_value <= VAL_DOUBLE(v)) ? 0 : -1;
             } else if (!strcmp(op, OP_GEQ)) {
                 return (d_value >= VAL_DOUBLE(v)) ? 0 : -1;
             } else if (!strcmp(op, OP_NEQ)) {
                 return (d_value != VAL_DOUBLE(v)) ? 0 : -1;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_DATETIME:
             // TODO: insert int value to db for faster comparison!
0cc1e167
             localtime_r(&(VAL_TIME(v)), &_time);
             strftime(tmp, sizeof(tmp), "%Y-%m-%d %H:%M:%S", &_time);
53e746b5
             LM_DBG("comparing DATETIME %s %s %s\n", reply->str, op, tmp);
             if (!strcmp(op, OP_EQ)) {
                 return (strcmp(reply->str, tmp) == 0) ? 0 : -1;
             } else if (!strcmp(op, OP_LT)) {
                 return (strcmp(reply->str, tmp) < 0) ? 0 : -1;
             } else if (!strcmp(op, OP_GT)) {
                 return (strcmp(reply->str, tmp) > 0) ? 0 : -1;
             } else if (!strcmp(op, OP_LEQ)) {
                 return (strcmp(reply->str, tmp) <= 0) ? 0 : -1;
             } else if (!strcmp(op, OP_GEQ)) {
                 return (strcmp(reply->str, tmp) >= 0) ? 0 : -1;
             } else if (!strcmp(op, OP_NEQ)) {
                 return (strcmp(reply->str, tmp) != 0) ? 0 : -1;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_BITMAP:
             i_value = atoi(reply->str);
             LM_DBG("comparing BITMAP %d %s %d\n", i_value, op, VAL_BITMAP(v));
             if (!strcmp(op, OP_EQ)) {
                 if (i_value == VAL_BITMAP(v))
                     return 0;
             } else if (!strcmp(op, OP_LT)) {
                 if (i_value < VAL_BITMAP(v))
                     return 0;
             } else if (!strcmp(op, OP_GT)) {
                 if (i_value > VAL_BITMAP(v))
                     return 0;
             } else if (!strcmp(op, OP_LEQ)) {
                 if (i_value <= VAL_BITMAP(v))
                     return 0;
             } else if (!strcmp(op, OP_GEQ)) {
                 if (i_value >= VAL_BITMAP(v))
                     return 0;
             } else if (!strcmp(op, OP_NEQ)) {
                 if (i_value != VAL_BITMAP(v))
                     return 0;
             } else if (!strcmp(op, OP_BITWISE_AND)) {
                 if (i_value & VAL_BITMAP(v))
                     return 0;
             } else {
                 LM_ERR("Unsupported op type '%s'\n", op);
                 return -1;
             }
             return -1;
         case DB1_BLOB:
         default:
             LM_ERR("Unsupported val type %d\n", vtype);
             return -1;
     }
 }
 
 static int db_redis_convert_row(km_redis_con_t *con, db1_res_t* _r, const db_key_t *_k,
         const db_val_t *_v, const db_op_t *_op,
         redisReply *reply, const str *table_name, const db_key_t* _c, int _nc,
         int *manual_keys, int manual_keys_count) {
     db_val_t* dval;
     db_row_t* drow;
a72c4e8e
     size_t col;
53e746b5
 
     if (reply->type != REDIS_REPLY_ARRAY) {
         LM_ERR("Unexpected redis reply type, expecting array\n");
         return -1;
     }
 
     if (!reply->elements) {
         LM_DBG("skip empty row");
         return 0;
     }
 
     // manually filter non-matching replies
a72c4e8e
     for (col = 0; col < reply->elements; ++col) {
53e746b5
         if (col < manual_keys_count) {
             int idx = manual_keys[col];
             db_key_t k = _k[idx];
             db_val_t v = _v[idx];
             db_op_t o = _op[idx];
             LM_DBG("manually filtering key '%.*s'\n",
                     k->len, k->s);
             if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
384c2ab8
                 LM_DBG("column %lu does not match, ignore row\n", (unsigned long) col);
53e746b5
                 return 0;
             }
         }
     }
 
     RES_NUM_ROWS(_r) = RES_ROW_N(_r) = RES_NUM_ROWS(_r) + 1;
     drow = &(RES_ROWS(_r)[RES_NUM_ROWS(_r)-1]);
 
     if (db_allocate_row(_r, drow) != 0) {
         LM_ERR("Failed to allocate row %d\n", RES_NUM_ROWS(_r));
         return -1;
     }
 
     if (reply->elements - manual_keys_count > RES_COL_N(_r)) {
         LM_ERR("Invalid number of columns at row %d/%d, expecting %d, got %lu\n",
384c2ab8
                 RES_NUM_ROWS(_r), RES_ROW_N(_r), RES_COL_N(_r), (unsigned long) reply->elements - manual_keys_count);
53e746b5
         return -1;
     }
a72c4e8e
     for (col = manual_keys_count; col < reply->elements; ++col) {
53e746b5
         size_t colidx = col - manual_keys_count;
         size_t redisidx = col;
         int coltype;
         redisReply *col_val = reply->element[redisidx];
         str *col_name = _c[colidx];
 
384c2ab8
         LM_DBG("converting column #%lu of row #%d", (unsigned long) colidx, RES_ROW_N(_r));
53e746b5
 
         if (col_val->type != REDIS_REPLY_STRING &&
             col_val->type != REDIS_REPLY_NIL) {
 
             LM_ERR("Invalid column value type in column '%.*s' of row %d, expecting string or null\n",
                     col_name->len, col_name->s, RES_NUM_ROWS(_r));
             return -1;
         }
 
         if (RES_NUM_ROWS(_r) == 1) {
             coltype = db_redis_schema_get_column_type(con, table_name, col_name);
             RES_TYPES(_r)[colidx] = coltype;
         } else {
             coltype = RES_TYPES(_r)[colidx];
         }
 
         dval = &(ROW_VALUES(drow)[colidx]);
         VAL_TYPE(dval) = coltype;
 
         if (col_val->type == REDIS_REPLY_NIL) {
             VAL_NULL(dval) = 1;
         } else {
             if (db_str2val(coltype, dval, col_val->str, strlen(col_val->str), 1) != 0) {
                 LM_ERR("Failed to convert redis column '%.*s' to db value\n",
                         col_name->len, col_name->s);
                 return -1;
             }
         }
     }
 
     return 0;
 }
 
 static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
         const db_val_t* _v, const db_op_t *_op, const db_key_t* _c,
         const int _n, const int _nc, db1_res_t** _r,
         redis_key_t **keys, int *keys_count,
9da53bae
         int **manual_keys, int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start,
         const str *ts_scan_key) {
53e746b5
 
     redisReply *reply = NULL;
     redis_key_t *query_v = NULL;
     int num_rows = 0;
a72c4e8e
     redis_key_t *key;
cc6865cd
     int i, j, max;
53e746b5
 
     *_r = db_redis_new_result();
     if (!*_r) {
         LM_ERR("Failed to allocate memory for result");
         goto error;
     }
 
     if (db_allocate_columns(*_r, _nc) != 0) {
         LM_ERR("Failed to allocate memory for result columns");
         goto error;
     }
     RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
     RES_COL_N(*_r) = _nc;
 
22bd6ca0
     if (!(*keys_count) && do_table_scan) {
13c1c0bf
         if(_n > 0) {
             LM_WARN("performing full table scan on table '%.*s' while doing the query\n",
3b412293
                 CON_TABLE(_h)->len, CON_TABLE(_h)->s);
13c1c0bf
             for(i = 0; i < _n; ++i) {
                 LM_WARN("  scan key %d is '%.*s'\n",
                         i, _k[i]->len, _k[i]->s);
             }
         } else {
             LM_DBG("loading full table: '%.*s\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
3b412293
         }
9da53bae
         if (db_redis_scan_query_keys(con, CON_TABLE(_h), _n,
53e746b5
                     keys, keys_count,
9da53bae
                     manual_keys, manual_keys_count, ts_scan_start, ts_scan_key) != 0) {
53e746b5
             LM_ERR("failed to scan query keys\n");
             goto error;
         }
     }
 
cc6865cd
     // we allocate best case scenario (all rows match)
     RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = *keys_count;
     if (db_allocate_rows(*_r) != 0) {
         LM_ERR("Failed to allocate memory for rows\n");
         return -1;
     }
     RES_COL_N(*_r) = _nc;
     // reset and increment in convert_row
     RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
 
a72c4e8e
     for (key = *keys; key; key = key->next) {
53e746b5
         redis_key_t *tmp = NULL;
         str *keyname = &(key->key);
 
         num_rows++;
 
         LM_DBG("checking key '%s' in redis\n", keyname->s);
 
         if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
             LM_ERR("Failed to add exists query to list\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, keyname) != 0) {
             LM_ERR("Failed to add key name to list\n");
             goto error;
         }
fd9fe3a6
         if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
53e746b5
             LM_ERR("Failed to append redis command\n");
             goto error;
         }
3b412293
         tmp = db_redis_key_shift(&query_v);
53e746b5
         if (tmp)
             db_redis_key_free(&tmp);
 
         // construct HMGET query
         if (_nc + (*manual_keys_count) == 0) {
             if (db_redis_key_prepend_string(&query_v, "HGETALL", 7) != 0) {
                 LM_ERR("Failed to add hgetall query to list\n");
                 goto error;
             }
         } else {
             if (db_redis_key_prepend_string(&query_v, "HMGET", 5) != 0) {
                 LM_ERR("Failed to add hmget query to list\n");
                 goto error;
             }
         }
 
         // we put the manual comparison columns first, so we can skip them
         // easily in result, for the cost of potential duplicate column returns
a72c4e8e
         for (j = 0; j < *manual_keys_count; ++j) {
53e746b5
             int idx = (*manual_keys)[j];
             str *k_name = _k[idx];
             if (db_redis_key_add_str(&query_v, k_name) != 0) {
                 LM_ERR("Failed to add manual key to query list\n");
                 goto error;
             }
         }
a72c4e8e
         for (j = 0; j < _nc; ++j) {
53e746b5
             str *k_name = _c[j];
             if (db_redis_key_add_str(&query_v, k_name) != 0) {
                 LM_ERR("Failed to add manual key to query list\n");
                 goto error;
             }
         }
 
fd9fe3a6
         if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
53e746b5
             LM_ERR("Failed to append redis command\n");
             goto error;
         }
 
         db_redis_key_free(&query_v);
         query_v = NULL;
 
cc6865cd
         max = 0;
         if (*keys_count == num_rows)
             max = (*keys_count) % 1000;
         else if (num_rows % 1000 == 0)
             max = 1000;
 
         if (max) {
             LM_DBG("fetching next %d results\n", max);
             for (i = 0; i < max; ++i) {
                 // get reply for EXISTS query
                 if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
                     LM_ERR("Failed to get reply for query: %s\n",
3b412293
                             db_redis_get_error(con));
cc6865cd
                     goto error;
                 }
                 db_redis_check_reply(con, reply, error);
                 if (reply->integer == 0) {
                     LM_DBG("key does not exist, returning no row for query\n");
                     db_redis_free_reply(&reply);
                     // also free next reply, as this is a null row for the HMGET
3b412293
                     if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
                         LM_ERR("Failed to get reply for query: %s\n",
                                 db_redis_get_error(con));
                         goto error;
                     }
cc6865cd
                     db_redis_check_reply(con, reply, error);
                     db_redis_free_reply(&reply);
                     continue;
                 }
                 db_redis_free_reply(&reply);
53e746b5
 
cc6865cd
                 // get reply for actual HMGET query
                 if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
                     LM_ERR("Failed to get reply for query: %s\n",
3b412293
                             db_redis_get_error(con));
cc6865cd
                     goto error;
                 }
                 db_redis_check_reply(con, reply, error);
                 if (reply->type != REDIS_REPLY_ARRAY) {
                     LM_ERR("Unexpected reply, expected array\n");
                     goto error;
                 }
                 LM_DBG("dumping full query reply for row\n");
                 db_redis_dump_reply(reply);
53e746b5
 
cc6865cd
                 if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
                     LM_ERR("Failed to convert redis reply for row\n");
                     goto error;
                 }
                 db_redis_free_reply(&reply);
             }
53e746b5
         }
     }
 
cc6865cd
     LM_DBG("done performing query\n");
53e746b5
     return 0;
 
 error:
     LM_ERR("failed to perform the query\n");
     db_redis_key_free(&query_v);
     if(reply)
         db_redis_free_reply(&reply);
cc6865cd
     if(*_r) {
53e746b5
         db_redis_free_result((db1_con_t*)_h, *_r); *_r = NULL;
     }
     return -1;
 }
 
 static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
         const db_val_t* _v, const db_op_t *_op, const int _n,
3b412293
         redis_key_t **keys, int *keys_count,
9da53bae
         int **manual_keys, int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start,
         const str *ts_scan_key) {
53e746b5
 
3b412293
     int i = 0, j = 0;
53e746b5
     redis_key_t *k = NULL;
     int type_keys_count = 0;
     int all_type_keys_count = 0;
a72c4e8e
     size_t col;
53e746b5
 
     redisReply *reply = NULL;
     redis_key_t *query_v = NULL;
     redis_key_t *type_keys = NULL;
9da53bae
     redis_key_t *set_keys = NULL;
53e746b5
     redis_key_t *all_type_keys = NULL;
     db_val_t *db_vals = NULL;
     db_key_t *db_keys = NULL;
a72c4e8e
     redis_key_t *type_key;
9da53bae
     redis_key_t *set_key;
53e746b5
 
3b412293
     if (!*keys_count && do_table_scan) {
9da53bae
         if (!ts_scan_start)
             LM_WARN("performing full table scan on table '%.*s' while performing delete\n",
                     CON_TABLE(_h)->len, CON_TABLE(_h)->s);
         else
             LM_WARN("performing table scan on table '%.*s' while performing delete using match key "
                     "'%.*s' at offset %llx\n",
                     CON_TABLE(_h)->len, CON_TABLE(_h)->s,
                     ts_scan_key->len, ts_scan_key->s, (unsigned long long) ts_scan_start);
3b412293
         for(i = 0; i < _n; ++i) {
             LM_WARN("  scan key %d is '%.*s'\n",
                     i, _k[i]->len, _k[i]->s);
         }
9da53bae
         if (db_redis_scan_query_keys(con, CON_TABLE(_h), _n,
3b412293
                     keys, keys_count,
9da53bae
                     manual_keys, manual_keys_count, ts_scan_start, ts_scan_key) != 0) {
53e746b5
             LM_ERR("failed to scan query keys\n");
             goto error;
         }
     }
 
     // TODO: this should be moved to redis_connection structure
     // and be parsed at startup:
     //
     // fetch list of keys in all types
     if (db_redis_get_keys_for_all_types(con, CON_TABLE(_h),
                 &all_type_keys, &all_type_keys_count) != 0) {
             LM_ERR("failed to get full list of type keys\n");
             goto error;
     }
 
cc6865cd
     LM_DBG("delete all keys\n");
3b412293
     for (k = *keys; k; k = k->next) {
53e746b5
         redis_key_t *all_type_key;
         str *key = &k->key;
         redis_key_t *tmp = NULL;
         int row_match;
cc6865cd
         LM_DBG("delete key '%.*s'\n", key->len, key->s);
53e746b5
 
         if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
             LM_ERR("Failed to add exists command to pre-delete query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, key) != 0) {
             LM_ERR("Failed to add key name to pre-delete query\n");
             goto error;
         }
 
         // TODO: pipeline commands!
         reply = db_redis_command_argv(con, query_v);
         db_redis_check_reply(con, reply, error);
         if (reply->integer == 0) {
             LM_DBG("key does not exist in redis, skip deleting\n");
             db_redis_free_reply(&reply);
3b412293
             db_redis_key_free(&query_v);
53e746b5
             continue;
         }
         db_redis_free_reply(&reply);
3b412293
         tmp = db_redis_key_shift(&query_v);
53e746b5
         if (tmp)
             db_redis_key_free(&tmp);
 
         if (db_redis_key_prepend_string(&query_v, "HMGET", 5) != 0) {
             LM_ERR("Failed to set hmget command to pre-delete query\n");
             goto error;
         }
 
         // add all manual keys to query
3b412293
         for (j = 0; j < *manual_keys_count; ++j) {
             int idx = (*manual_keys)[j];
53e746b5
             str *col = _k[idx];
 
             if (db_redis_key_add_str(&query_v, col) != 0) {
                 LM_ERR("Failed to add manual key to pre-delete query\n");
                 goto error;
             }
         }
         // add all type keys to query
         for (all_type_key = all_type_keys; all_type_key; all_type_key = all_type_key->next) {
             if (db_redis_key_add_str(&query_v, &all_type_key->key) != 0) {
                 LM_ERR("Failed to add type key to pre-delete query\n");
                 goto error;
             }
         }
         reply = db_redis_command_argv(con, query_v);
         db_redis_key_free(&query_v);
         db_redis_check_reply(con, reply, error);
 
         LM_DBG("dumping full query reply\n");
         db_redis_dump_reply(reply);
 
         // manually filter non-matching replies
         row_match = 1;
a72c4e8e
         for (col = 0; col < reply->elements; ++col) {
3b412293
             if (col < *manual_keys_count) {
                 int idx = (*manual_keys)[col];
53e746b5
                 db_key_t k = _k[idx];
                 db_val_t v = _v[idx];
                 db_op_t o = _op[idx];
                 LM_DBG("manually filtering key '%.*s'\n",
                         k->len, k->s);
                 if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
384c2ab8
                     LM_DBG("column %lu does not match, ignore row\n", (unsigned long) col);
53e746b5
                     row_match = 0;
                     break;
                 }
             }
         }
         if (!row_match) {
             db_redis_free_reply(&reply);
             continue;
         } else {
             LM_DBG("row matches manual filtering, proceed with deletion\n");
         }
 
         db_keys = (db_key_t*) pkg_malloc(all_type_keys_count * sizeof(db_key_t));
         if (!db_keys) {
             LM_ERR("Failed to allocate memory for db type keys\n");
             goto error;
         }
         for (j = 0, tmp = all_type_keys; tmp; ++j, tmp = tmp->next) {
             db_keys[j] = &tmp->key;
         }
 
         db_vals = (db_val_t*) pkg_malloc(all_type_keys_count * sizeof(db_val_t));
         if (!db_vals) {
             LM_ERR("Failed to allocate memory for manual db vals\n");
             goto error;
         }
 
         for (j = 0, all_type_key = all_type_keys; all_type_key; ++j, all_type_key = all_type_key->next) {
             db_val_t *v = &(db_vals[j]);
             str *key = &all_type_key->key;
3b412293
             char *value = reply->element[*manual_keys_count + j]->str;
53e746b5
             int coltype = db_redis_schema_get_column_type(con, CON_TABLE(_h), key);
             if (value == NULL) {
                 VAL_NULL(v) = 1;
             } else if (db_str2val(coltype, v, value, strlen(value), 0) != 0) {
                 LM_ERR("Failed to convert redis reply column to db value\n");
                 goto error;
             }
         }
         if (db_redis_build_type_keys(con, CON_TABLE(_h), db_keys, db_vals, all_type_keys_count,
9da53bae
                     &type_keys, &set_keys, &type_keys_count) != 0) {
53e746b5
             LM_ERR("failed to build type keys\n");
             goto error;
         }
         pkg_free(db_keys);
22bd6ca0
         db_keys = NULL;
53e746b5
         pkg_free(db_vals);
22bd6ca0
         db_vals = NULL;
53e746b5
         db_redis_free_reply(&reply);
 
         if (db_redis_key_add_string(&query_v, "DEL", 3) != 0) {
             LM_ERR("Failed to add del command to delete query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, key) != 0) {
             LM_ERR("Failed to add key to delete query\n");
             goto error;
         }
 
         reply = db_redis_command_argv(con, query_v);
         db_redis_key_free(&query_v);
         db_redis_check_reply(con, reply, error);
         db_redis_free_reply(&reply);
 
9da53bae
         for (type_key = type_keys, set_key = set_keys; type_key;
                 type_key = type_key->next, set_key = set_key->next) {
 
             if (db_redis_key_add_string(&query_v, "EVALSHA", 7) != 0) {
                 LM_ERR("Failed to add srem command to post-delete query\n");
                 goto error;
             }
             if (db_redis_key_add_string(&query_v, con->srem_key_lua, strlen(con->srem_key_lua)) != 0) {
                 LM_ERR("Failed to add srem command to post-delete query\n");
                 goto error;
             }
             if (db_redis_key_add_string(&query_v, "3", 1) != 0) {
53e746b5
                 LM_ERR("Failed to add srem command to post-delete query\n");
                 goto error;
             }
             if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
                 LM_ERR("Failed to add key to delete query\n");
                 goto error;
             }
9da53bae
             if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
                 LM_ERR("Failed to add key to delete query\n");
                 goto error;
             }
53e746b5
             if (db_redis_key_add_str(&query_v, key) != 0) {
                 LM_ERR("Failed to add key to delete query\n");
                 goto error;
             }
             reply = db_redis_command_argv(con, query_v);
             db_redis_key_free(&query_v);
             db_redis_check_reply(con, reply, error);
             db_redis_free_reply(&reply);
         }
cc6865cd
         LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s);
3b412293
         db_redis_key_free(&type_keys);
9da53bae
         db_redis_key_free(&set_keys);
53e746b5
     }
     db_redis_key_free(&all_type_keys);
22bd6ca0
     db_redis_key_free(&query_v);
53e746b5
 
     return 0;
 
 error:
     LM_ERR("failed to perform the delete\n");
     if(reply)
         db_redis_free_reply(&reply);
     if (db_keys)
         pkg_free(db_keys);
     if (db_vals)
         pkg_free(db_vals);
     db_redis_key_free(&query_v);
     db_redis_key_free(&type_keys);
9da53bae
     db_redis_key_free(&set_keys);
53e746b5
     db_redis_key_free(&all_type_keys);
     return -1;
 }
 
 static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, const db_key_t* _k,
         const db_val_t* _v, const db_op_t *_op, const db_key_t* _uk, const db_val_t *_uv,
         const int _n, const int _nu,
         redis_key_t **keys, int *keys_count,
9da53bae
         int **manual_keys, int *manual_keys_count, int do_table_scan, uint64_t ts_scan_start,
         const str *ts_scan_key) {
53e746b5
 
     redisReply *reply = NULL;
     redis_key_t *query_v = NULL;
     int update_queries = 0;
a72c4e8e
     redis_key_t *key;
     int i;
     int j;
     size_t col;
9da53bae
     redis_key_t *all_type_keys = NULL;
     int all_type_keys_count = 0;
     db_val_t *db_vals = NULL;
     db_key_t *db_keys = NULL;
     redis_key_t *type_keys = NULL;
     redis_key_t *set_keys = NULL;
     int type_keys_count = 0;
     redis_key_t *new_type_keys = NULL;
     int new_type_keys_count = 0;
     redis_key_t *all_type_key;
53e746b5
 
22bd6ca0
     if (!(*keys_count) && do_table_scan) {
3b412293
         LM_WARN("performing full table scan on table '%.*s' while performing update\n",
                 CON_TABLE(_h)->len, CON_TABLE(_h)->s);
         for(i = 0; i < _n; ++i) {
             LM_WARN("  scan key %d is '%.*s'\n",
                     i, _k[i]->len, _k[i]->s);
         }
9da53bae
         if (db_redis_scan_query_keys(con, CON_TABLE(_h), _n,
53e746b5
                     keys, keys_count,
9da53bae
                     manual_keys, manual_keys_count, ts_scan_start, ts_scan_key) != 0) {
53e746b5
             LM_ERR("failed to scan query keys\n");
             goto error;
         }
     }
 
9da53bae
     // TODO: this should be moved to redis_connection structure
     // and be parsed at startup:
     //
     // fetch list of keys in all types
     if (db_redis_get_keys_for_all_types(con, CON_TABLE(_h),
                 &all_type_keys, &all_type_keys_count) != 0) {
             LM_ERR("failed to get full list of type keys\n");
             goto error;
     }
 
     if (db_redis_build_type_keys(con, CON_TABLE(_h), _uk, _uv, _nu,
                 &new_type_keys, NULL, &new_type_keys_count) != 0) {
         LM_ERR("failed to build type keys\n");
         goto error;
     }
     LM_DBG("%i new type keys\n", new_type_keys_count);
 
a72c4e8e
     for (key = *keys; key; key = key->next) {
53e746b5
         str *keyname = &key->key;
 
9da53bae
         LM_DBG("fetching row for '%.*s' from redis for update\n", keyname->len, keyname->s);
53e746b5
 
 
         if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
             LM_ERR("Failed to set exists command to pre-update exists query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, keyname) != 0) {
             LM_ERR("Failed to add key name to pre-update exists query\n");
             goto error;
         }
fd9fe3a6
         if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
53e746b5
             LM_ERR("Failed to append redis command\n");
             goto error;
         }
         db_redis_key_free(&query_v);
 
         // construct HMGET query
9da53bae
         if (db_redis_key_add_string(&query_v, "HMGET", 5) != 0) {
             LM_ERR("Failed to set hgetall command to pre-update hget query\n");
             goto error;
53e746b5
         }
         if (db_redis_key_add_str(&query_v, keyname) != 0) {
             LM_ERR("Failed to add key name to pre-update exists query\n");
             goto error;
         }
 
a72c4e8e
         for (j = 0; j < *manual_keys_count; ++j) {
53e746b5
             int idx = (*manual_keys)[j];
             str *k_name = _k[idx];
             if (db_redis_key_add_str(&query_v, k_name) != 0) {
                 LM_ERR("Failed to add manual key name to hget query\n");
                 goto error;
             }
         }
9da53bae
         // add all type keys to query
         for (all_type_key = all_type_keys; all_type_key; all_type_key = all_type_key->next) {
             if (db_redis_key_add_str(&query_v, &all_type_key->key) != 0) {
                 LM_ERR("Failed to add type key to pre-update query\n");
                 goto error;
             }
         }
53e746b5
 
fd9fe3a6
         if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
53e746b5
             LM_ERR("Failed to append redis command\n");
             goto error;
         }
         db_redis_key_free(&query_v);
     }
 
 /*
     key_value = (str*)pkg_malloc(_nu * sizeof(str));
     if (!key_value) {
         LM_ERR("Failed to allocate memory for key buffer\n");
         goto error;
     }
     memset(key_value, 0, _nu * sizeof(str));
 
     col_value = (str*)pkg_malloc(_nu * sizeof(str));
     if (!col_value) {
         LM_ERR("Failed to allocate memory for column buffer\n");
         goto error;
     }
     memset(col_value, 0, _nu * sizeof(str));
     */
 
 
a72c4e8e
     for (key = *keys; key; key = key->next) {
9da53bae
         redis_key_t *tmp = NULL;
         redis_key_t *type_key;
         redis_key_t *set_key;
         redis_key_t *new_type_key;
53e746b5
         int row_match;
 
9da53bae
         LM_DBG("fetching replies for '%.*s' from redis for update\n", key->key.len, key->key.s);
53e746b5
 
         // get reply for EXISTS query
         if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
             LM_ERR("Failed to get reply for query: %s\n",
3b412293
                     db_redis_get_error(con));
53e746b5
             goto error;
         }
         db_redis_check_reply(con, reply, error);
         if (reply->integer == 0) {
             LM_DBG("key does not exist, returning no row for query\n");
             db_redis_free_reply(&reply);
             // also free next reply, as this is a null row for the HMGET
             LM_DBG("also fetch hmget reply after non-existent key\n");
3b412293
             if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
                 LM_ERR("Failed to get reply for query: %s\n",
                         db_redis_get_error(con));
                 goto error;
             }
53e746b5
             db_redis_check_reply(con, reply, error);
             db_redis_free_reply(&reply);
             LM_DBG("continue fetch reply loop\n");
             continue;
         }
         db_redis_free_reply(&reply);
 
         // get reply for actual HMGET query
         if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
             LM_ERR("Failed to get reply for query: %s\n",
3b412293
                     db_redis_get_error(con));
53e746b5
             goto error;
         }
         db_redis_check_reply(con, reply, error);
         if (reply->type != REDIS_REPLY_ARRAY) {
             LM_ERR("Unexpected reply, expected array\n");
             goto error;
         }
         LM_DBG("dumping full query reply for row\n");
         db_redis_dump_reply(reply);
 
         // manually filter non-matching replies
         row_match = 1;
a72c4e8e
         for (col = 0; col < reply->elements; ++col) {
53e746b5
             if (col < *manual_keys_count) {
                 int idx = (*manual_keys)[col];
                 db_key_t k = _k[idx];
                 db_val_t v = _v[idx];
                 db_op_t o = _op[idx];
                 LM_DBG("manually filtering key '%.*s'\n",
                         k->len, k->s);
                 if (db_redis_compare_column(k, &v, o, reply->element[col]) != 0) {
384c2ab8
                     LM_DBG("column %lu does not match, ignore row\n", (unsigned long) col);
53e746b5
                     row_match = 0;
                     break;
                 }
             }
         }
         if (!row_match) {
             continue;
         } else {
             LM_DBG("row matches manual filtering, proceed with update\n");
         }
 
9da53bae
         db_keys = (db_key_t*) pkg_malloc(all_type_keys_count * sizeof(db_key_t));
         if (!db_keys) {
             LM_ERR("Failed to allocate memory for db type keys\n");
             goto error;
         }
         for (j = 0, tmp = all_type_keys; tmp; ++j, tmp = tmp->next) {
             db_keys[j] = &tmp->key;
         }
 
         db_vals = (db_val_t*) pkg_malloc(all_type_keys_count * sizeof(db_val_t));
         if (!db_vals) {
             LM_ERR("Failed to allocate memory for manual db vals\n");
             goto error;
         }
 
         for (j = 0, all_type_key = all_type_keys; all_type_key; ++j, all_type_key = all_type_key->next) {
             db_val_t *v = &(db_vals[j]);
             str *key = &all_type_key->key;
             char *value = reply->element[*manual_keys_count + j]->str;
             int coltype = db_redis_schema_get_column_type(con, CON_TABLE(_h), key);
             if (value == NULL) {
                 VAL_NULL(v) = 1;
             } else if (db_str2val(coltype, v, value, strlen(value), 0) != 0) {
                 LM_ERR("Failed to convert redis reply column to db value\n");
                 goto error;
             }
         }
         if (db_redis_build_type_keys(con, CON_TABLE(_h), db_keys, db_vals, all_type_keys_count,
                     &type_keys, &set_keys, &type_keys_count) != 0) {
             LM_ERR("failed to build type keys\n");
             goto error;
         }
         pkg_free(db_keys);
         db_keys = NULL;
         pkg_free(db_vals);
         db_vals = NULL;
         db_redis_free_reply(&reply);
 
53e746b5
         if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) {
             LM_ERR("Failed to add hmset command to update query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, &key->key) != 0) {
             LM_ERR("Failed to add key to update query\n");
             goto error;
         }
 
a72c4e8e
         for (i = 0; i < _nu; ++i) {
53e746b5
             str *k = _uk[i];
             str v = {NULL, 0};
 
             if (db_redis_val2str(&(_uv[i]), &v) != 0) {
                 LM_ERR("Failed to convert update value to string\n");
                 goto error;
             }
             if (db_redis_key_add_str(&query_v, k) != 0) {
                 LM_ERR("Failed to add key to update query\n");
                 goto error;
             }
             if (db_redis_key_add_str(&query_v, &v) != 0) {
                 LM_ERR("Failed to add key to update query\n");
                 goto error;
             }
d12eba62
             if (v.s)
                 pkg_free(v.s);
53e746b5
         }
         update_queries++;
fd9fe3a6
         if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
53e746b5
             LM_ERR("Failed to append redis command\n");
             goto error;
         }
 
         db_redis_key_free(&query_v);
9da53bae
 
         for (type_key = type_keys, set_key = set_keys; type_key;
                 type_key = type_key->next, set_key = set_key->next) {
 
             LM_DBG("checking for update of type key '%.*s'\n",
                     type_key->key.len, type_key->key.s);
             char *prefix = ser_memmem(type_key->key.s, "::", type_key->key.len, 2);
             if (!prefix || prefix == type_key->key.s) {
                 LM_DBG("Invalid key without :: '%.*s'\n",
                         type_key->key.len, type_key->key.s);
                 continue;
             }
             for (new_type_key = new_type_keys; new_type_key; new_type_key = new_type_key->next) {
                 // compare prefix to see if this is the same key
                 if (memcmp(new_type_key->key.s, type_key->key.s, prefix - type_key->key.s))
                     continue;
                 LM_DBG("checking for update of type key against '%.*s'\n",
                         new_type_key->key.len, new_type_key->key.s);
                 if (!str_strcmp(&new_type_key->key, &type_key->key))
                     continue;
 
                 // add to new set key and delete from old
 
                 if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) {
                     LM_ERR("Failed to set sadd command to post-update query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &new_type_key->key) != 0) {
                     LM_ERR("Failed to add map key to post-update query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &key->key) != 0) {
                     LM_ERR("Failed to set entry key to post-update query\n");
                     goto error;
                 }
 
                 update_queries++;
                 if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
                     LM_ERR("Failed to append redis command\n");
                     goto error;
                 }
 
                 db_redis_key_free(&query_v);
 
                 if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) {
                     LM_ERR("Failed to set sadd command to post-update query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
                     LM_ERR("Failed to add map key to post-update query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &new_type_key->key) != 0) {
                     LM_ERR("Failed to set entry key to post-update query\n");
                     goto error;
                 }
 
                 update_queries++;
                 if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
                     LM_ERR("Failed to append redis command\n");
                     goto error;
                 }
 
                 db_redis_key_free(&query_v);
 
                 if (db_redis_key_add_string(&query_v, "EVAL", 4) != 0) {
                     LM_ERR("Failed to add srem command to post-delete query\n");
                     goto error;
                 }
                 if (db_redis_key_add_string(&query_v, SREM_KEY_LUA, strlen(SREM_KEY_LUA)) != 0) {
                     LM_ERR("Failed to add srem command to post-delete query\n");
                     goto error;
                 }
                 if (db_redis_key_add_string(&query_v, "3", 1) != 0) {
                     LM_ERR("Failed to add srem command to post-delete query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
                     LM_ERR("Failed to add key to delete query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
                     LM_ERR("Failed to add key to delete query\n");
                     goto error;
                 }
                 if (db_redis_key_add_str(&query_v, &key->key) != 0) {
                     LM_ERR("Failed to add key to delete query\n");
                     goto error;
                 }
 
                 update_queries++;
                 if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
                     LM_ERR("Failed to append redis command\n");
                     goto error;
                 }
 
                 db_redis_key_free(&query_v);
             }
         }
 
         db_redis_key_free(&type_keys);
         db_redis_key_free(&set_keys);
53e746b5
     }
 
     LM_DBG("getting replies for %d queries\n", update_queries);
 
a72c4e8e
     for (i = 0; i < update_queries; ++i) {
53e746b5
         if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
             LM_ERR("Failed to get reply for query: %s\n",
3b412293
                     db_redis_get_error(con));
53e746b5
             goto error;
         }
         db_redis_check_reply(con, reply, error);
         db_redis_free_reply(&reply);
     }
 
     LM_DBG("done performing update\n");
 
9da53bae
     db_redis_key_free(&all_type_keys);
     db_redis_key_free(&new_type_keys);
afc7f4c2
     db_redis_consume_replies(con);
53e746b5
     return 0;
 
 error:
     LM_ERR("failed to perform the update\n");
     if (reply)
         db_redis_free_reply(&reply);
     db_redis_key_free(&query_v);
9da53bae
     db_redis_key_free(&all_type_keys);
     db_redis_key_free(&type_keys);
     db_redis_key_free(&set_keys);
     db_redis_key_free(&new_type_keys);
afc7f4c2
     db_redis_consume_replies(con);
53e746b5
     return -1;
 }
 
 
 /*
  * Query table for specified rows
  * _h: structure representing database connection
  * _k: key names
  * _op: operators
  * _v: values of the keys that must match
  * _c: column names to return
  * _n: number of key=values pairs to compare
  * _nc: number of columns to return
  * _o: order by the specified column
  */
 int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
         const db_val_t* _v, const db_key_t* _c, const int _n, const int _nc,
         const db_key_t _o, db1_res_t** _r)
 {
     km_redis_con_t *con = NULL;
     int free_op = 0;
     int do_table_scan = 0;
9da53bae
     uint64_t ts_scan_start = 0;
     str ts_scan_key = {0,};
53e746b5
 
     redis_key_t *keys = NULL;
     int keys_count = 0;
     int *manual_keys = NULL;
     int manual_keys_count = 0;
     db_op_t *query_ops = NULL;
a72c4e8e
     int i;
53e746b5
 
     // TODO: implement order-by
     // TODO: optimize mapping-based manual post-check (remove check for keys already
     // in type query key)
 
22bd6ca0
     if (!_r) {
         LM_ERR("db result is null\n");
         return -1;
     }
cc6865cd
 
53e746b5
     con = REDIS_CON(_h);
     if (con && con->con == NULL) {
         if (db_redis_connect(con) != 0) {
             LM_ERR("Failed to reconnect to server\n");
             return -1;
         }
     }
     if(con == NULL || con->id == NULL || con->con == NULL) {
         LM_ERR("connection to server is null\n");
         return -1;
     }
     if(CON_TABLE(_h)->s == NULL) {
         LM_ERR("prefix (table) name not set\n");
         return -1;
     } else {
         LM_DBG("querying prefix (table) '%.*s'\n",
                 CON_TABLE(_h)->len, CON_TABLE(_h)->s);
     }
 
22bd6ca0
     *_r = NULL;
56ad1423
 
     // check if we have a version query, and return version directly from
     // schema instead of loading it from redis
     if (_nc == 1 && _n >= 1 && VAL_TYPE(&_v[0]) == DB1_STR &&
             CON_TABLE(_h)->len == strlen("version") && strncmp(CON_TABLE(_h)->s, "version", CON_TABLE(_h)->len) == 0 &&
             _k[0]->len == strlen("table_name") && strncmp(_k[0]->s, "table_name", _k[0]->len) == 0 &&
             _c[0]->len == strlen("table_version") && strncmp(_c[0]->s, "table_version", _c[0]->len) == 0) {
 
         if (db_redis_return_version(_h, con, &VAL_STR(&_v[0]), _r) == 0) {
             return 0;
         }
         // if we fail to return a version from the schema, go query the table, just in case
     }
 
53e746b5
     free_op = 0;
 
     if (_op == NULL) {
         char *op = "=";
         free_op = 1;
         query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
         if (!query_ops) {
             LM_ERR("Failed to allocate memory for query op list\n");
             goto error;
         }
a72c4e8e
         for (i = 0; i < _n; ++i) {
53e746b5
             query_ops[i] = op;
         }
     } else {
         query_ops = (db_op_t*)_op;
     }
 
     if (_n > 0) {
         if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
9da53bae
                     &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan, &ts_scan_start,
                     &ts_scan_key) != 0) {
53e746b5
             LM_ERR("failed to build query keys\n");
             goto error;
         }
         if (!keys_count) {
             if (do_table_scan) {
                 LM_DBG("unable to build query keys, falling back to full table scan\n");
             } else {
                 LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
             }
         }
     } else {
         LM_DBG("no columns given to build query keys, falling back to full table scan\n");
         keys_count = 0;
0729d9fc
         do_table_scan = 1;
53e746b5
     }
 
     if (db_redis_perform_query(_h, con, _k, _v, query_ops, _c, _n, _nc, _r,
9da53bae
         &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan, ts_scan_start, &ts_scan_key) != 0) {
53e746b5
         goto error;
     }
 
     LM_DBG("done performing query\n");
 
     if (free_op && query_ops) {
         pkg_free(query_ops);
     }
     db_redis_key_free(&keys);
 
     if (manual_keys) {
         pkg_free(manual_keys);
     }
9da53bae
     if (ts_scan_key.s)
         pkg_free(ts_scan_key.s);
53e746b5
 
     db_redis_consume_replies(con);
     return 0;
 
 error:
     LM_ERR("failed to do the query\n");
     if (free_op && query_ops) {
         pkg_free(query_ops);
     }
     db_redis_key_free(&keys);
     if (manual_keys) {
         pkg_free(manual_keys);
     }
9da53bae
     if (ts_scan_key.s)
         pkg_free(ts_scan_key.s);
53e746b5
     db_redis_consume_replies(con);
 
 
     return -1;
 }
 
 /*
  * Execute a raw SQL query
  */
 int db_redis_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
 {
     LM_DBG("perform redis raw query\n");
     return -1;
 }
 
 /*
  * Insert a row into specified table
  * _h: structure representing database connection
  * _k: key names
  * _v: values of the keys
  * _n: number of key=value pairs
  */
 int db_redis_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v, const int _n)
 {
     km_redis_con_t *con = NULL;
     redis_key_t *key = NULL;
     int keys_count = 0;
     redis_key_t *type_keys = NULL;
9da53bae
     redis_key_t *set_keys = NULL;
53e746b5
     int type_keys_count = 0;
     redis_key_t *query_v = NULL;
     redisReply *reply = NULL;
a72c4e8e
     int i;
     redis_key_t *k;
9da53bae
     redis_key_t *set_key;
53e746b5
 
     con = REDIS_CON(_h);
     if (con && con->con == NULL) {
         if (db_redis_connect(con) != 0) {
             LM_ERR("Failed to reconnect to server\n");
             return -1;
         }
     }
     if(con == NULL || con->id == NULL || con->con == NULL) {
         LM_ERR("connection to server is null\n");
         return -1;
     }
     if(CON_TABLE(_h)->s == NULL) {
         LM_ERR("prefix (table) name not set\n");
         return -1;
     } else {
         LM_DBG("inserting to prefix (table) '%.*s'\n",
                 CON_TABLE(_h)->len, CON_TABLE(_h)->s);
     }
 
     if (db_redis_build_entry_keys(con, CON_TABLE(_h), _k, _v, _n,
                 &key, &keys_count) != 0) {
         LM_ERR("failed to build entry keys\n");
         goto error;
     }
     if (db_redis_build_type_keys(con, CON_TABLE(_h), _k, _v, _n,
9da53bae
                 &type_keys, &set_keys, &type_keys_count) != 0) {
53e746b5
         LM_ERR("failed to build type keys\n");
         goto error;
     }
 
     if (db_redis_key_add_string(&query_v, "HMSET", 5) != 0) {
         LM_ERR("Failed to add hmset command to insert query\n");
         goto error;
     }
     if (db_redis_key_add_str(&query_v, &key->key) != 0) {
         LM_ERR("Failed to add key to insert query\n");
         goto error;
     }
 
a72c4e8e
     for (i = 0; i < _n; ++i) {
53e746b5
         str *k = _k[i];
         str v;
 
         if (db_redis_key_add_str(&query_v, k) != 0) {
             LM_ERR("Failed to add column name to insert query\n");
             goto error;
         }
         if (db_redis_val2str(&(_v[i]), &v) != 0) {
             LM_ERR("Failed to allocate memory for query value\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, &v) != 0) {
             LM_ERR("Failed to add column value to insert query\n");
             goto error;
         }
4348049e
         if (v.s)
             pkg_free(v.s);
53e746b5
     }
 
     reply = db_redis_command_argv(con, query_v);
     db_redis_key_free(&query_v);
     db_redis_check_reply(con, reply, error);
     db_redis_free_reply(&reply);
 
9da53bae
     for (k = type_keys, set_key = set_keys; k; k = k->next, set_key = set_key->next) {
53e746b5
         str *type_key = &k->key;
 
         LM_DBG("inserting entry key '%.*s' to type map '%.*s'\n",
             key->key.len, key->key.s, type_key->len, type_key->s);
 
         if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) {
             LM_ERR("Failed to set sadd command to post-insert query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, type_key) != 0) {
             LM_ERR("Failed to add map key to post-insert query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, &key->key) != 0) {
             LM_ERR("Failed to set entry key to post-insert query\n");
             goto error;
         }
 
         reply = db_redis_command_argv(con, query_v);
         db_redis_key_free(&query_v);
         db_redis_check_reply(con, reply, error);
         db_redis_free_reply(&reply);
9da53bae
 
         if (db_redis_key_add_string(&query_v, "SADD", 4) != 0) {
             LM_ERR("Failed to set sadd command to post-insert query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
             LM_ERR("Failed to add map key to post-insert query\n");
             goto error;
         }
         if (db_redis_key_add_str(&query_v, type_key) != 0) {
             LM_ERR("Failed to set entry key to post-insert query\n");
             goto error;
         }
 
         reply = db_redis_command_argv(con, query_v);
         db_redis_key_free(&query_v);
         db_redis_check_reply(con, reply, error);
         db_redis_free_reply(&reply);
53e746b5
     }
 
     db_redis_key_free(&key);
     db_redis_key_free(&type_keys);
9da53bae
     db_redis_key_free(&set_keys);
53e746b5
     db_redis_consume_replies(con);
 
     return 0;
 
 error:
     db_redis_key_free(&key);
     db_redis_key_free(&type_keys);
9da53bae
     db_redis_key_free(&set_keys);
53e746b5
     db_redis_key_free(&query_v);
 
     if (reply)
         db_redis_free_reply(&reply);
 
     db_redis_consume_replies(con);
 
     LM_ERR("failed to do the insert\n");
     return -1;
 }
 
 /*
  * Delete a row from the specified table
  * _h: structure representing database connection
  * _k: key names
  * _o: operators
  * _v: values of the keys that must match
  * _n: number of key=value pairs
  */
 int db_redis_delete(const db1_con_t* _h, const db_key_t* _k,
         const db_op_t* _op, const db_val_t* _v, const int _n)
 {
     km_redis_con_t *con = NULL;
     redis_key_t *keys = NULL;
     int keys_count = 0;
     int *manual_keys = NULL;
     int manual_keys_count = 0;
     int free_op = 0;
     int do_table_scan = 0;
9da53bae
     uint64_t ts_scan_start = 0;
     str ts_scan_key = {0,};
53e746b5
     db_op_t *query_ops = NULL;
a72c4e8e
     int i;
53e746b5
 
     // TODO: optimize mapping-based manual post-check (remove check for keys already
     // in type query key)
 
     con = REDIS_CON(_h);
     if (con && con->con == NULL) {
         if (db_redis_connect(con) != 0) {
             LM_ERR("Failed to reconnect to server\n");
             return -1;
         }
     }
     if(con == NULL || con->id == NULL || con->con == NULL) {
         LM_ERR("connection to server is null\n");
         return -1;
     }
     if(CON_TABLE(_h)->s == NULL) {
         LM_ERR("prefix (table) name not set\n");
         return -1;
     } else {
         LM_DBG("deleting from prefix (table) '%.*s'\n",
                 CON_TABLE(_h)->len, CON_TABLE(_h)->s);
     }
 
     free_op = 0;
 
     if (_op == NULL) {
         char *op = "=";
         free_op = 1;
         query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
         if (!query_ops) {
             LM_ERR("Failed to allocate memory for query op list\n");
             goto error;
         }
a72c4e8e
         for (i = 0; i < _n; ++i) {
53e746b5
             query_ops[i] = op;
         }
     } else {
         query_ops = (db_op_t*)_op;
     }
 
     if (_n > 0) {
         if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
9da53bae
                     &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan, &ts_scan_start,
                     &ts_scan_key) != 0) {
53e746b5
             LM_ERR("failed to build query keys\n");
             goto error;
         }
         if (!keys_count) {
             if (do_table_scan) {
                 LM_DBG("unable to build query keys, falling back to full table scan\n");
             } else {
                 LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
             }
         }
     } else {
         LM_DBG("no columns given to build query keys, falling back to full table scan\n");
         keys_count = 0;
a472aaae
         do_table_scan = 1;
53e746b5
     }
 
     if (db_redis_perform_delete(_h, con, _k, _v, query_ops, _n,
9da53bae
         &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan, ts_scan_start, &ts_scan_key) != 0) {
53e746b5
         goto error;
     }
 
     LM_DBG("done performing delete\n");
 
     if (free_op && query_ops) {
         pkg_free(query_ops);
     }
     db_redis_key_free(&keys);
     if (manual_keys)
         pkg_free(manual_keys);
9da53bae
     if (ts_scan_key.s)
         pkg_free(ts_scan_key.s);
53e746b5
     db_redis_consume_replies(con);
 
     return 0;
 
 error:
     LM_ERR("failed to do the query\n");
     if (free_op && query_ops) {
         pkg_free(query_ops);
     }
     db_redis_key_free(&keys);
     if (manual_keys)
         pkg_free(manual_keys);
9da53bae
     if (ts_scan_key.s)
         pkg_free(ts_scan_key.s);
53e746b5
     db_redis_consume_replies(con);
     return -1;
 }
 
 /*
  * Update some rows in the specified table
  * _h: structure representing database connection
  * _k: key names
  * _op: operators
  * _v: values of the keys that must match
  * _uk: updated columns
  * _uv: updated values of the columns
  * _n: number of key=value pairs
  * _un: number of columns to update
  */
 int db_redis_update(const db1_con_t* _h, const db_key_t* _k,
         const db_op_t* _op, const db_val_t* _v, const db_key_t* _uk,
         const db_val_t* _uv, const int _n, const int _nu)
 {
     km_redis_con_t *con = NULL;
     int free_op = 0;
     int do_table_scan = 0;
9da53bae
     uint64_t ts_scan_start = 0;
     str ts_scan_key = {0,};
53e746b5
 
     redis_key_t *keys = NULL;
     int keys_count = 0;
     int *manual_keys = NULL;
     int manual_keys_count = 0;
     db_op_t *query_ops = NULL;
a72c4e8e
     int i;
53e746b5
 
     // TODO: optimize mapping-based manual post-check (remove check for keys already
     // in type query key)
 
     con = REDIS_CON(_h);
     if (con && con->con == NULL) {
         if (db_redis_connect(con) != 0) {
             LM_ERR("Failed to reconnect to server\n");
             return -1;
         }
     }
     if(con == NULL || con->id == NULL || con->con == NULL) {
         LM_ERR("connection to server is null\n");
         return -1;
     }
     if(CON_TABLE(_h)->s == NULL) {
         LM_ERR("prefix (table) name not set\n");
         return -1;
     } else {
         LM_DBG("updating prefix (table) '%.*s'\n",
                 CON_TABLE(_h)->len, CON_TABLE(_h)->s);
     }
 
     free_op = 0;
 
     if (_op == NULL) {
         char *op = "=";
         free_op = 1;
         query_ops = (db_op_t*)pkg_malloc(_n * sizeof(db_op_t));
         if (!query_ops) {
             LM_ERR("Failed to allocate memory for query op list\n");
             goto error;
         }
a72c4e8e
         for (i = 0; i < _n; ++i) {
53e746b5
             query_ops[i] = op;
         }
     } else {
         query_ops = (db_op_t*)_op;
     }
 
     if (_n > 0) {
         if (db_redis_build_query_keys(con, CON_TABLE(_h), _k, _v, query_ops, _n,
9da53bae
                     &keys, &keys_count, &manual_keys, &manual_keys_count, &do_table_scan, &ts_scan_start,
                     &ts_scan_key) != 0) {
53e746b5
             LM_ERR("failed to build query keys\n");
             goto error;
         }
         if (!keys_count) {
             if (do_table_scan) {
                 LM_DBG("unable to build query keys, falling back to full table scan\n");
             } else {
                 LM_DBG("query keys not member of suitable mapping, skip full table scan\n");
             }
         }
     } else {
         LM_DBG("no columns given to build query keys, falling back to full table scan\n");
         keys_count = 0;
     }
 
     if (db_redis_perform_update(_h, con, _k, _v, query_ops, _uk, _uv, _n, _nu,
9da53bae
         &keys, &keys_count, &manual_keys, &manual_keys_count, do_table_scan, ts_scan_start, &ts_scan_key) != 0) {
53e746b5
         goto error;
     }
 
     LM_DBG("done performing update\n");
 
     if (free_op && query_ops) {
         pkg_free(query_ops);
     }
     db_redis_key_free(&keys);
 
     if (manual_keys) {
         pkg_free(manual_keys);
     }
9da53bae
     if (ts_scan_key.s)
         pkg_free(ts_scan_key.s);
53e746b5
     db_redis_consume_replies(con);
     return 0;
 
 error:
     LM_ERR("failed to do the query\n");
     if (free_op && query_ops) {
         pkg_free(query_ops);
     }
     db_redis_key_free(&keys);
     if (manual_keys) {
         pkg_free(manual_keys);
     }
9da53bae
     if (ts_scan_key.s)
         pkg_free(ts_scan_key.s);
53e746b5
     db_redis_consume_replies(con);
     return -1;
 }
 
 /*
  * Just like insert, but replace the row if it exists
  */
 int db_redis_replace(const db1_con_t* _h, const db_key_t* _k,
         const db_val_t* _v, const int _n, const int _un, const int _m) {
     LM_DBG("perform redis replace\n");
     return -1;
 }
 
 /*
  * Store name of table that will be used by
  * subsequent database functions
  */
 int db_redis_use_table(db1_con_t* _h, const str* _t) {
     return db_use_table(_h, _t);
 }
 
 int db_redis_free_result(db1_con_t* _h, db1_res_t* _r) {
     LM_DBG("perform redis free result\n");
 
     if(!_r)
         return -1;
     db_free_result(_r);
     return 0;
 }