/** * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com) * * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com) * - for: redis array reply support * * Copyright (C) 2017 Carsten Bock (ng-voice GmbH) * - for: Cluster support * * This file is part of Kamailio, a free SIP server. * * Kamailio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version * * Kamailio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * */ #include #include #include #include #include #include #include "../../core/mem/mem.h" #include "../../core/dprint.h" #include "../../core/hashes.h" #include "../../core/ut.h" #include "redis_client.h" #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); \ if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;}) static redisc_server_t * _redisc_srv_list=NULL; static redisc_reply_t *_redisc_rpl_list=NULL; extern int init_without_redis; extern int redis_connect_timeout_param; extern int redis_cmd_timeout_param; extern int redis_cluster_param; extern int redis_disable_time_param; extern int redis_allowed_timeouts_param; extern int redis_flush_on_reconnect_param; extern int redis_allow_dynamic_nodes_param; /* backwards compatibility with hiredis < 0.12 */ #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12) typedef char *sds; sds sdscatlen(sds s, const void *t, size_t len); int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len); #else #define redis_append_formatted_command redisAppendFormattedCommand #endif /** * */ int redisc_init(void) { char addr[256], pass[256], unix_sock_path[256], sentinel_group[256]; unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1; int i, row; redisc_server_t *rsrv=NULL; param_t *pit = NULL; struct timeval tv_conn; struct timeval tv_cmd; tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000; tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000; tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000; tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000; if(_redisc_srv_list==NULL) { LM_ERR("no redis servers defined\n"); return -1; } for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next) { char sentinels[MAXIMUM_SENTINELS][256]; uint8_t sentinels_count = 0; port = 6379; db = 0; haspass = 0; sock = 0; memset(addr, 0, sizeof(addr)); memset(pass, 0, sizeof(pass)); memset(unix_sock_path, 0, sizeof(unix_sock_path)); for (pit = rsrv->attrs; pit; pit=pit->next) { if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) { snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s", pit->body.len, pit->body.s); sock = 1; } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) { snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s); } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) { if(str2int(&pit->body, &port) < 0) port = 6379; } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) { if(str2int(&pit->body, &db) < 0) db = 0; } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) { snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s); haspass = 1; } else if(pit->name.len==14 && strncmp(pit->name.s, "sentinel_group", 14)==0) { snprintf(sentinel_group, sizeof(sentinel_group)-1, "%.*s", pit->body.len, pit->body.s); } else if(pit->name.len==15 && strncmp(pit->name.s, "sentinel_master", 15)==0) { if(str2int(&pit->body, &sentinel_master) < 0) sentinel_master = 1; } else if(pit->name.len==8 && strncmp(pit->name.s, "sentinel", 8)==0) { if( sentinels_count < MAXIMUM_SENTINELS ){ snprintf(sentinels[sentinels_count], sizeof(sentinels[sentinels_count])-1, "%.*s", pit->body.len, pit->body.s); sentinels_count++; } else { LM_ERR("too many sentinels, maximum %d supported.\n", MAXIMUM_SENTINELS); return -1; } } } // if sentinels are provided, we need to connect to them and retrieve the redis server // address / port if(sentinels_count > 0) { for(i= 0; i< sentinels_count; i++) { char *sentinelAddr = sentinels[i]; char *pos; redisContext *redis; redisReply *res, *res2; port = 6379; if( (pos = strchr(sentinelAddr, ':')) != NULL ) { port = atoi(pos+1); pos[i] = '\0'; } redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn); if( redis ) { if(sentinel_master != 0) { res = redisCommand(redis, "SENTINEL get-master-addr-by-name %s", sentinel_group); if( res && (res->type == REDIS_REPLY_ARRAY) && (res->elements == 2) ) { strncpy(addr, res->element[0]->str, res->element[0]->len + 1); port = atoi(res->element[1]->str); LM_DBG("sentinel replied: %s:%d\n", addr, port); } } else { res = redisCommand(redis, "SENTINEL slaves %s", sentinel_group); if( res && (res->type == REDIS_REPLY_ARRAY) ) { for(row = 0; row< res->elements; row++){ res2 = res->element[row]; for(i= 0; i< res2->elements; i+= 2) { if( strncmp(res2->element[i]->str, "ip", 2) == 0 ) { strncpy(addr, res2->element[i+1]->str, res2->element[i+1]->len); addr[res2->element[i+1]->len] = '\0'; } else if( strncmp(res2->element[i]->str, "port", 4) == 0) { port = atoi(res2->element[i+1]->str); break; } } } LM_DBG("slave for %s: %s:%d\n", sentinel_group, addr, port); } } } } } if(sock != 0) { LM_DBG("Connecting to unix socket: %s\n", unix_sock_path); rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn); } else { LM_DBG("Connecting to %s:%d\n", addr, port); rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn); } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(!rsrv->ctxRedis) { LM_ERR("Failed to create REDIS-Context.\n"); goto err; } if (rsrv->ctxRedis->err) { LM_ERR("Failed to create REDIS returned an error: %s\n", rsrv->ctxRedis->errstr); goto err2; } if ((haspass != 0) && redisc_check_auth(rsrv, pass)) { LM_ERR("Authentication failed.\n"); goto err2; } if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) { LM_ERR("Failed to set timeout.\n"); goto err2; } if (redisCommandNR(rsrv->ctxRedis, "PING")) { LM_ERR("Failed to send PING (REDIS returned %s).\n", rsrv->ctxRedis->errstr); goto err2; } if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) { LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\"," " and not in cluster mode).\n", db, rsrv->ctxRedis->errstr); goto err2; } } return 0; err2: if (sock != 0) { LM_ERR("error communicating with redis server [%.*s]" " (unix:%s db:%d): %s\n", rsrv->sname->len, rsrv->sname->s, unix_sock_path, db, rsrv->ctxRedis->errstr); } else { LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n", rsrv->sname->len, rsrv->sname->s, addr, port, db, rsrv->ctxRedis->errstr); } if (init_without_redis==1) { LM_WARN("failed to initialize redis connections, but initializing" " module anyway.\n"); return 0; } return -1; err: if (sock != 0) { LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n", rsrv->sname->len, rsrv->sname->s, unix_sock_path, db); } else { LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n", rsrv->sname->len, rsrv->sname->s, addr, port, db); } if (init_without_redis==1) { LM_WARN("failed to initialize redis connections, but initializing" " module anyway.\n"); return 0; } return -1; } /** * */ int redisc_destroy(void) { redisc_reply_t *rpl, *next_rpl; redisc_server_t *rsrv=NULL; redisc_server_t *rsrv1=NULL; rpl = _redisc_rpl_list; while(rpl != NULL) { next_rpl = rpl->next; if(rpl->rplRedis) freeReplyObject(rpl->rplRedis); if(rpl->rname.s != NULL) pkg_free(rpl->rname.s); pkg_free(rpl); rpl = next_rpl; } _redisc_rpl_list = NULL; if(_redisc_srv_list==NULL) return -1; rsrv=_redisc_srv_list; while(rsrv!=NULL) { rsrv1 = rsrv; rsrv=rsrv->next; if (rsrv1->ctxRedis!=NULL) redisFree(rsrv1->ctxRedis); free_params(rsrv1->attrs); pkg_free(rsrv1); } _redisc_srv_list = NULL; return 0; } /** * */ int redisc_add_server(char *spec) { param_t *pit=NULL; param_hooks_t phooks; redisc_server_t *rsrv=NULL; str s; s.s = spec; s.len = strlen(spec); if(s.s[s.len-1]==';') s.len--; if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0) { LM_ERR("failed parsing params value\n"); goto error; } rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t)); if(rsrv==NULL) { LM_ERR("no more pkg\n"); goto error; } memset(rsrv, 0, sizeof(redisc_server_t)); rsrv->attrs = pit; rsrv->spec = spec; for (pit = rsrv->attrs; pit; pit=pit->next) { if(pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) { rsrv->sname = &pit->body; rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len); break; } } if(rsrv->sname==NULL) { LM_ERR("no server name\n"); goto error; } rsrv->next = _redisc_srv_list; _redisc_srv_list = rsrv; return 0; error: if(pit!=NULL) free_params(pit); if(rsrv!=NULL) pkg_free(rsrv); return -1; } /** * */ redisc_server_t *redisc_get_server(str *name) { redisc_server_t *rsrv=NULL; unsigned int hname; hname = get_hash1_raw(name->s, name->len); LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s); rsrv=_redisc_srv_list; while(rsrv!=NULL) { LM_DBG("Entry %u (%.*s)\n", rsrv->hname, rsrv->sname->len, rsrv->sname->s); if(rsrv->hname==hname && rsrv->sname->len==name->len && strncmp(rsrv->sname->s, name->s, name->len)==0) return rsrv; rsrv=rsrv->next; } LM_DBG("No entry found.\n"); return NULL; } /** * */ int redisc_reconnect_server(redisc_server_t *rsrv) { char addr[256], pass[256], unix_sock_path[256], sentinel_group[256]; unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1; char sentinels[MAXIMUM_SENTINELS][256]; uint8_t sentinels_count = 0; int i, row; param_t *pit = NULL; struct timeval tv_conn; struct timeval tv_cmd; tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000; tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000; tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000; tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000; memset(addr, 0, sizeof(addr)); port = 6379; db = 0; memset(pass, 0, sizeof(pass)); memset(unix_sock_path, 0, sizeof(unix_sock_path)); for (pit = rsrv->attrs; pit; pit=pit->next) { if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) { snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s", pit->body.len, pit->body.s); sock = 1; } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) { snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s); } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) { if(str2int(&pit->body, &port) < 0) port = 6379; } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) { if(str2int(&pit->body, &db) < 0) db = 0; } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) { snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s); haspass = 1; } else if(pit->name.len==14 && strncmp(pit->name.s, "sentinel_group", 14)==0) { snprintf(sentinel_group, sizeof(sentinel_group)-1, "%.*s", pit->body.len, pit->body.s); } else if(pit->name.len==15 && strncmp(pit->name.s, "sentinel_master", 15)==0) { if(str2int(&pit->body, &sentinel_master) < 0) sentinel_master = 1; } else if(pit->name.len==8 && strncmp(pit->name.s, "sentinel", 8)==0) { if( sentinels_count < MAXIMUM_SENTINELS ){ snprintf(sentinels[sentinels_count], sizeof(sentinels[sentinels_count])-1, "%.*s", pit->body.len, pit->body.s); sentinels_count++; } else { LM_ERR("too many sentinels, maximum %d supported.\n", MAXIMUM_SENTINELS); return -1; } } } // if sentinels are provided, we need to connect to them and retrieve the redis server // address / port if(sentinels_count > 0) { for(i= 0; i< sentinels_count; i++) { char *sentinelAddr = sentinels[i]; char *pos; redisContext *redis; redisReply *res, *res2; port = 6379; if( (pos = strchr(sentinelAddr, ':')) != NULL ) { port = atoi(pos+1); pos[i] = '\0'; } redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn); if( redis ) { if(sentinel_master != 0) { res = redisCommand(redis, "SENTINEL get-master-addr-by-name %s", sentinel_group); if( res && (res->type == REDIS_REPLY_ARRAY) && (res->elements == 2) ) { strncpy(addr, res->element[0]->str, res->element[0]->len + 1); port = atoi(res->element[1]->str); LM_DBG("sentinel replied: %s:%d\n", addr, port); } } else { res = redisCommand(redis, "SENTINEL slaves %s", sentinel_group); if( res && (res->type == REDIS_REPLY_ARRAY) ) { for(row = 0; row< res->elements; row++){ res2 = res->element[row]; for(i= 0; i< res2->elements; i+= 2) { if( strncmp(res2->element[i]->str, "ip", 2) == 0 ) { strncpy(addr, res2->element[i+1]->str, res2->element[i+1]->len); addr[res2->element[i+1]->len] = '\0'; } else if( strncmp(res2->element[i]->str, "port", 4) == 0) { port = atoi(res2->element[i+1]->str); break; } } } LM_DBG("slave for %s: %s:%d\n", sentinel_group, addr, port); } } } } } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(rsrv->ctxRedis!=NULL) { redisFree(rsrv->ctxRedis); rsrv->ctxRedis = NULL; } if(sock != 0) { rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn); } else { rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn); } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(!rsrv->ctxRedis) goto err; if (rsrv->ctxRedis->err) goto err2; if ((haspass) && redisc_check_auth(rsrv, pass)) goto err2; if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) goto err2; if (redisCommandNR(rsrv->ctxRedis, "PING")) goto err2; if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) goto err2; if (redis_flush_on_reconnect_param) if (redisCommandNR(rsrv->ctxRedis, "FLUSHALL")) goto err2; return 0; err2: if (sock != 0) { LM_ERR("error communicating with redis server [%.*s]" " (unix:%s db:%d): %s\n", rsrv->sname->len, rsrv->sname->s, unix_sock_path, db, rsrv->ctxRedis->errstr); } else { LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n", rsrv->sname->len, rsrv->sname->s, addr, port, db, rsrv->ctxRedis->errstr); } err: if (sock != 0) { LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n", rsrv->sname->len, rsrv->sname->s, unix_sock_path, db); } else { LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n", rsrv->sname->len, rsrv->sname->s, addr, port, db); } return -1; } /** * */ int redisc_append_cmd(str *srv, str *res, str *cmd, ...) { redisc_server_t *rsrv=NULL; redisc_reply_t *rpl; char c; va_list ap; va_start(ap, cmd); if(srv==NULL || cmd==NULL || res==NULL) { LM_ERR("invalid parameters"); goto error_cmd; } if(srv->len==0 || res->len==0 || cmd->len==0) { LM_ERR("invalid parameters"); goto error_cmd; } rsrv = redisc_get_server(srv); if(rsrv==NULL) { LM_ERR("no redis server found: %.*s\n", srv->len, srv->s); goto error_cmd; } if(rsrv->ctxRedis==NULL) { LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); goto error_cmd; } if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS) { LM_ERR("Too many pipelined commands, maximum is %d\n", MAXIMUM_PIPELINED_COMMANDS); goto error_cmd; } rpl = redisc_get_reply(res); if(rpl==NULL) { LM_ERR("no redis reply id found: %.*s\n", res->len, res->s); goto error_cmd; } STR_VTOZ(cmd->s[cmd->len], c); rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand( &rsrv->piped.commands[rsrv->piped.pending_commands].s, cmd->s, ap); if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0) { LM_ERR("Invalid redis command : %s\n",cmd->s); goto error_cmd; } rsrv->piped.replies[rsrv->piped.pending_commands]=rpl; rsrv->piped.pending_commands++; STR_ZTOV(cmd->s[cmd->len], c); va_end(ap); return 0; error_cmd: va_end(ap); return -1; } /** * */ int redisc_exec_pipelined_cmd(str *srv) { redisc_server_t *rsrv=NULL; if (srv == NULL) { LM_ERR("invalid parameters"); return -1; } if (srv->len == 0) { LM_ERR("invalid parameters"); return -1; } rsrv = redisc_get_server(srv); if (rsrv == NULL) { LM_ERR("no redis server found: %.*s\n", srv->len, srv->s); return -1; } if (rsrv->ctxRedis == NULL) { LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); return -1; } return redisc_exec_pipelined(rsrv); } /** * */ int redisc_create_pipelined_message(redisc_server_t *rsrv) { int i; if (rsrv->ctxRedis->err) { LM_DBG("Reconnecting server because of error %d: \"%s\"", rsrv->ctxRedis->err,rsrv->ctxRedis->errstr); if (redisc_reconnect_server(rsrv)) { LM_ERR("unable to reconnect to REDIS server: %.*s\n", rsrv->sname->len,rsrv->sname->s); return -1; } } for (i=0;ipiped.pending_commands;i++) { if (redis_append_formatted_command(rsrv->ctxRedis, rsrv->piped.commands[i].s,rsrv->piped.commands[i].len) != REDIS_OK) { LM_ERR("Error while appending command %d",i); return -1; } } return 0; } /** * */ void redisc_free_pipelined_cmds(redisc_server_t *rsrv) { int i; for (i=0;ipiped.pending_commands;i++) { free(rsrv->piped.commands[i].s); rsrv->piped.commands[i].len=0; } rsrv->piped.pending_commands=0; } /** * */ int redisc_exec_pipelined(redisc_server_t *rsrv) { redisc_reply_t *rpl; int i; LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s); /* if server is disabled do nothing unless the disable time has passed */ if (redis_check_server(rsrv)) { goto srv_disabled; } if (rsrv->piped.pending_commands == 0) { LM_WARN("call for redis_cmd without any pipelined commands\n"); return -1; } if(rsrv->ctxRedis==NULL) { LM_ERR("no redis context for server: %.*s\n", rsrv->sname->len,rsrv->sname->s); goto error_exec; } /* send the commands and retrieve the first reply */ rpl=rsrv->piped.replies[0]; if(rpl->rplRedis!=NULL) { /* clean up previous redis reply */ freeReplyObject(rpl->rplRedis); rpl->rplRedis = NULL; } redisc_create_pipelined_message(rsrv); redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis); if (rpl->rplRedis == NULL) { /* null reply, reconnect and try again */ if (rsrv->ctxRedis->err) { LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr); } if (redisc_create_pipelined_message(rsrv) == 0) { redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis); if (rpl->rplRedis == NULL) { redis_count_err_and_disable(rsrv); LM_ERR("Unable to read reply\n"); goto error_exec; } } else { redis_count_err_and_disable(rsrv); goto error_exec; } } LM_DBG_redis_reply(rpl->rplRedis); /* replies are received just retrieve them */ for (i=1;ipiped.pending_commands;i++) { rpl=rsrv->piped.replies[i]; if(rpl->rplRedis!=NULL) { /* clean up previous redis reply */ freeReplyObject(rpl->rplRedis); rpl->rplRedis = NULL; } if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis) != REDIS_OK) { LM_ERR("Unable to read reply\n"); continue; } if (rpl->rplRedis == NULL) { LM_ERR("Trying to read reply for command %.*s but nothing in buffer!", rsrv->piped.commands[i].len,rsrv->piped.commands[i].s); continue; } LM_DBG_redis_reply(rpl->rplRedis); } redisc_free_pipelined_cmds(rsrv); rsrv->disable.consecutive_errors = 0; return 0; error_exec: redisc_free_pipelined_cmds(rsrv); return -1; srv_disabled: redisc_free_pipelined_cmds(rsrv); return -2; } int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) { redisc_server_t *rsrv_new; char buffername[100]; unsigned int port; str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0}; int server_len = 0; char spec_new[100]; if(redis_cluster_param) { LM_DBG("Redis replied: \"%.*s\"\n", (int)reply->len, reply->str); if((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) { port = 6379; if(strchr(reply->str, ':') > 0) { tmpstr.s = strchr(reply->str, ':') + 1; tmpstr.len = reply->len - (tmpstr.s - reply->str); if(str2int(&tmpstr, &port) < 0) port = 6379; LM_DBG("Port \"%.*s\" [%i] => %i\n", tmpstr.len, tmpstr.s, tmpstr.len, port); } else { LM_ERR("No Port in REDIS MOVED Reply (%.*s)\n", (int)reply->len, reply->str); return 0; } if(strchr(reply->str + 6, ' ') > 0) { addr.len = tmpstr.s - strchr(reply->str + 6, ' ') - 2; addr.s = strchr(reply->str + 6, ' ') + 1; LM_DBG("Host \"%.*s\" [%i]\n", addr.len, addr.s, addr.len); } else { LM_ERR("No Host in REDIS MOVED Reply (%.*s)\n", (int)reply->len, reply->str); return 0; } memset(buffername, 0, sizeof(buffername)); name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i", addr.len, addr.s, port); name.s = buffername; LM_DBG("Name of new connection: %.*s\n", name.len, name.s); rsrv_new = redisc_get_server(&name); if(rsrv_new) { LM_DBG("Reusing Connection\n"); *rsrv = rsrv_new; return 1; } else if(redis_allow_dynamic_nodes_param) { /* New param redis_allow_dynamic_nodes_param: * if set, we allow ndb_redis to add nodes that were * not defined explicitly in the module configuration */ char *server_new; memset(spec_new, 0, sizeof(spec_new)); /* For now the only way this can work is if * the new node is accessible with default * parameters for sock and db */ server_len = snprintf(spec_new, sizeof(spec_new) - 1, "name=%.*s;addr=%.*s;port=%i", name.len, name.s, addr.len, addr.s, port); if(server_len<0 || server_len>sizeof(spec_new) - 1) { LM_ERR("failed to print server spec string\n"); return 0; } server_new = (char *)pkg_malloc(server_len + 1); if(server_new == NULL) { LM_ERR("Error allocating pkg mem\n"); return 0; } strncpy(server_new, spec_new, server_len); server_new[server_len] = '\0'; if(redisc_add_server(server_new) == 0) { rsrv_new = redisc_get_server(&name); if(rsrv_new) { *rsrv = rsrv_new; /* Need to connect to the new server now */ if(redisc_reconnect_server(rsrv_new) == 0) { LM_DBG("Connected to the new server with name: " "%.*s\n", name.len, name.s); return 1; } else { LM_ERR("ERROR connecting to the new server with " "name: %.*s\n", name.len, name.s); return 0; } } else { /* Adding the new node failed * - cannot perform redirection */ LM_ERR("No new connection with name (%.*s) was " "created\n", name.len, name.s); } } else { LM_ERR("Could not add a new connection with name %.*s\n", name.len, name.s); pkg_free(server_new); } } else { LM_ERR("No Connection with name (%.*s)\n", name.len, name.s); } } } return 0; } /** * */ int redisc_exec(str *srv, str *res, str *cmd, ...) { redisc_server_t *rsrv=NULL; redisc_reply_t *rpl; char c; va_list ap, ap2, ap3, ap4; int ret = -1; va_start(ap, cmd); va_copy(ap2, ap); va_copy(ap3, ap); va_copy(ap4, ap); if(srv==NULL || cmd==NULL || res==NULL) { LM_ERR("invalid parameters"); goto error; } if(srv->len==0 || res->len==0 || cmd->len==0) { LM_ERR("invalid parameters"); goto error; } STR_VTOZ(cmd->s[cmd->len], c); rsrv = redisc_get_server(srv); if(rsrv==NULL) { LM_ERR("no redis server found: %.*s\n", srv->len, srv->s); goto error_exec; } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(rsrv->ctxRedis==NULL) { LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); goto error_exec; } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if (rsrv->piped.pending_commands != 0) { LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer." " Automatically call redis_execute"); redisc_exec_pipelined(rsrv); } /* if server is disabled do nothing unless the disable time has passed */ if (redis_check_server(rsrv)) { goto srv_disabled; } rpl = redisc_get_reply(res); if(rpl==NULL) { LM_ERR("no redis reply id found: %.*s\n", res->len, res->s); goto error_exec; } if(rpl->rplRedis!=NULL) { /* clean up previous redis reply */ freeReplyObject(rpl->rplRedis); rpl->rplRedis = NULL; } rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap ); if(rpl->rplRedis == NULL) { /* null reply, reconnect and try again */ if(rsrv->ctxRedis->err) { LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr); } if(redisc_reconnect_server(rsrv)==0) { rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2); if (rpl->rplRedis ==NULL) { redis_count_err_and_disable(rsrv); goto error_exec; } } else { redis_count_err_and_disable(rsrv); LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len, srv->s); STR_ZTOV(cmd->s[cmd->len], c); goto error_exec; } } if (check_cluster_reply(rpl->rplRedis, &rsrv)) { LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(rsrv->ctxRedis==NULL) { LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); goto error_exec; } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(rpl->rplRedis!=NULL) { /* clean up previous redis reply */ freeReplyObject(rpl->rplRedis); rpl->rplRedis = NULL; } rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3 ); if(rpl->rplRedis == NULL) { /* null reply, reconnect and try again */ if(rsrv->ctxRedis->err) { LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr); } if(redisc_reconnect_server(rsrv)==0) { rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4); if(rpl->rplRedis == NULL) { redis_count_err_and_disable(rsrv); goto error_exec; } } else { LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len, srv->s); STR_ZTOV(cmd->s[cmd->len], c); goto error_exec; } } } LM_DBG("rpl->rplRedis->type:%d\n", rpl->rplRedis->type); if(rpl->rplRedis->type == REDIS_REPLY_ERROR) { LM_ERR("Redis error:%.*s\n", (int)rpl->rplRedis->len, rpl->rplRedis->str); goto error_exec; } STR_ZTOV(cmd->s[cmd->len], c); rsrv->disable.consecutive_errors = 0; va_end(ap); va_end(ap2); va_end(ap3); va_end(ap4); LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); return 0; error_exec: STR_ZTOV(cmd->s[cmd->len], c); ret = -1; goto error; srv_disabled: STR_ZTOV(cmd->s[cmd->len], c); ret = -2; goto error; error: va_end(ap); va_end(ap2); va_end(ap3); va_end(ap4); return ret; } /** * Executes a redis command. * Command is coded using a vector of strings, and a vector of lengths. * * @param rsrv Pointer to a redis_server_t structure. * @param argc number of elements in the command vector. * @param argv vector of zero terminated strings forming the command. * @param argvlen vector of command string lengths or NULL. * @return redisReply structure or NULL if there was an error. */ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, const size_t *argvlen) { redisReply *res=NULL; if(rsrv==NULL) { LM_ERR("no redis context found for server %.*s\n", (rsrv)?rsrv->sname->len:0, (rsrv)?rsrv->sname->s:""); return NULL; } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(rsrv->ctxRedis==NULL) { LM_ERR("no redis context found for server %.*s\n", (rsrv)?rsrv->sname->len:0, (rsrv)?rsrv->sname->s:""); return NULL; } if(argc<=0) { LM_ERR("invalid parameters\n"); return NULL; } if(argv==NULL || *argv==NULL) { LM_ERR("invalid parameters\n"); return NULL; } again: res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen); /* null reply, reconnect and try again */ if(rsrv->ctxRedis->err) { LM_DBG("Redis error: %s\n", rsrv->ctxRedis->errstr); } if(res) { if (check_cluster_reply(res, &rsrv)) { goto again; } return res; } if(redisc_reconnect_server(rsrv)==0) { res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen); if (res) { if (check_cluster_reply(res, &rsrv)) { goto again; } } } else { LM_ERR("Unable to reconnect to server: %.*s\n", rsrv->sname->len, rsrv->sname->s); return NULL; } return res; } /** * */ redisc_reply_t *redisc_get_reply(str *name) { redisc_reply_t *rpl; unsigned int hid; hid = get_hash1_raw(name->s, name->len); for(rpl=_redisc_rpl_list; rpl; rpl=rpl->next) { if(rpl->hname==hid && rpl->rname.len==name->len && strncmp(rpl->rname.s, name->s, name->len)==0) return rpl; } /* not found - add a new one */ rpl = (redisc_reply_t*)pkg_malloc(sizeof(redisc_reply_t)); if(rpl==NULL) { LM_ERR("no more pkg\n"); return NULL; } memset(rpl, 0, sizeof(redisc_reply_t)); rpl->hname = hid; rpl->rname.s = (char*)pkg_malloc(name->len+1); if(rpl->rname.s==NULL) { LM_ERR("no more pkg.\n"); pkg_free(rpl); return NULL; } strncpy(rpl->rname.s, name->s, name->len); rpl->rname.len = name->len; rpl->rname.s[name->len] = '\0'; rpl->next = _redisc_rpl_list; _redisc_rpl_list = rpl; return rpl; } /** * */ int redisc_free_reply(str *name) { redisc_reply_t *rpl; unsigned int hid; if(name==NULL || name->len==0) { LM_ERR("invalid parameters"); return -1; } hid = get_hash1_raw(name->s, name->len); rpl = _redisc_rpl_list; while(rpl) { if(rpl->hname==hid && rpl->rname.len==name->len && strncmp(rpl->rname.s, name->s, name->len)==0) { if(rpl->rplRedis) { freeReplyObject(rpl->rplRedis); rpl->rplRedis = NULL; } return 0; } rpl = rpl->next; } /* reply entry not found. */ return -1; } int redisc_check_auth(redisc_server_t *rsrv, char *pass) { redisReply *reply; int retval = 0; reply = redisCommand(rsrv->ctxRedis, "AUTH %s", pass); if(!reply) { LM_ERR("Redis authentication error\n"); return -1; } if (reply->type == REDIS_REPLY_ERROR) { LM_ERR("Redis authentication error\n"); retval = -1; } freeReplyObject(reply); return retval; } /* backwards compatibility with hiredis < 0.12 */ #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12) int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len) { sds newbuf; newbuf = sdscatlen(c->obuf,cmd,len); if (newbuf == NULL) { c->err = REDIS_ERR_OOM; strcpy(c->errstr,"Out of memory"); return REDIS_ERR; } c->obuf = newbuf; return REDIS_OK; } #endif int redis_check_server(redisc_server_t *rsrv) { if (rsrv->disable.disabled) { if (get_ticks() > rsrv->disable.restore_tick) { LM_NOTICE("REDIS server %.*s re-enabled", rsrv->sname->len, rsrv->sname->s); rsrv->disable.disabled = 0; rsrv->disable.consecutive_errors = 0; } else { return 1; } } return 0; } int redis_count_err_and_disable(redisc_server_t *rsrv) { if (redis_allowed_timeouts_param < 0) { return 0; } rsrv->disable.consecutive_errors++; if (rsrv->disable.consecutive_errors > redis_allowed_timeouts_param) { rsrv->disable.disabled=1; rsrv->disable.restore_tick=get_ticks() + redis_disable_time_param; LM_WARN("REDIS server %.*s disabled for %d seconds", rsrv->sname->len, rsrv->sname->s, redis_disable_time_param); return 1; } return 0; } void print_redis_reply(int log_level, redisReply *rpl,int offset) { int i; char padding[MAXIMUM_NESTED_KEYS + 1]; if(!is_printable(log_level)) return; if (!rpl) { LM_ERR("Unexpected null reply"); return; } if (offset > MAXIMUM_NESTED_KEYS) { LM_ERR("Offset is too big"); return; } for (i=0;itype) { case REDIS_REPLY_STRING: LOG(log_level,"%sstring reply: [%s]", padding, rpl->str); break; case REDIS_REPLY_INTEGER: LOG(log_level,"%sinteger reply: %lld", padding, rpl->integer); break; case REDIS_REPLY_ARRAY: LOG(log_level,"%sarray reply with %d elements", padding, (int)rpl->elements); for (i=0; i < rpl->elements; i++) { LOG(log_level,"%selement %d:",padding,i); print_redis_reply(log_level,rpl->element[i],offset+1); } break; case REDIS_REPLY_NIL: LOG(log_level,"%snil reply",padding); break; case REDIS_REPLY_STATUS: LOG(log_level,"%sstatus reply: %s", padding, rpl->str); break; case REDIS_REPLY_ERROR: LOG(log_level,"%serror reply: %s", padding, rpl->str); break; } }