Browse code

db_redis: use cluster api

Riccardo Villa authored on 05/01/2022 11:05:41 • Daniel-Constantin Mierla committed on 20/01/2022 11:30:22
Showing 1 changed files
... ...
@@ -25,6 +25,15 @@
25 25
 #include "redis_table.h"
26 26
 #include "redis_dbase.h"
27 27
 
28
+#ifdef WITH_HIREDIS_CLUSTER
29
+static unsigned int MAX_URL_LENGTH = 1023;
30
+#define redisCommand redisClusterCommand
31
+#define redisFree redisClusterFree
32
+#define redisCommandArgv redisClusterCommandArgv
33
+#define redisAppendCommandArgv redisClusterAppendCommandArgv
34
+#define redisGetReply redisClusterGetReply
35
+#endif
36
+
28 37
 extern int db_redis_verbosity;
29 38
 
30 39
 static void print_query(redis_key_t *query) {
... ...
@@ -104,21 +113,59 @@ static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
104 113
 int db_redis_connect(km_redis_con_t *con) {
105 114
     struct timeval tv;
106 115
     redisReply *reply;
116
+#ifndef WITH_HIREDIS_CLUSTER
107 117
     int db;
118
+#endif
108 119
 
109 120
     tv.tv_sec = 1;
110 121
     tv.tv_usec = 0;
111
-
122
+#ifndef WITH_HIREDIS_CLUSTER
112 123
     db = atoi(con->id->database);
124
+#endif
113 125
     reply = NULL;
114 126
 
115 127
     // TODO: introduce require_master mod-param and check if we're indeed master
116 128
     // TODO: on carrier, if we have db fail-over, the currently connected
117 129
     // redis server will become slave without dropping connections?
118
-
130
+#ifdef WITH_HIREDIS_CLUSTER
131
+    int status;
132
+    char hosts[MAX_URL_LENGTH];
133
+    char* host_begin;
134
+    char* host_end;
135
+    LM_DBG("connecting to redis cluster at %.*s\n", con->id->url.len, con->id->url.s);
136
+    host_begin = strstr(con->id->url.s, "redis://");
137
+    if (host_begin) {
138
+        host_begin += 8;
139
+    } else {
140
+        LM_ERR("invalid url scheme\n");
141
+        goto err;
142
+    }
143
+    host_end = strstr(host_begin, "/");
144
+    if (! host_end) {
145
+        LM_ERR("invalid url: cannot find end of host part\n");
146
+        goto err;
147
+    }
148
+    if ((host_end - host_begin) > (MAX_URL_LENGTH-1)) {
149
+        LM_ERR("url too long\n");
150
+        goto err;
151
+    }
152
+    strncpy(hosts, host_begin, (host_end - host_begin));
153
+    hosts[MAX_URL_LENGTH-1] = '\0';
154
+    con->con = redisClusterContextInit();
155
+    if (! con->con) {
156
+        LM_ERR("no private memory left\n");
157
+        goto err;
158
+    }
159
+    redisClusterSetOptionAddNodes(con->con, hosts);
160
+    redisClusterSetOptionConnectTimeout(con->con, tv);
161
+    status = redisClusterConnect2(con->con);
162
+    if (status != REDIS_OK) {
163
+        LM_ERR("cannot open connection to cluster with hosts: %s, error: %s\n", hosts, con->con->errstr);
164
+        goto err;
165
+    }
166
+#else
119 167
     LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port);
120 168
     con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
121
-
122 169
     if (!con->con) {
123 170
         LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
124 171
         goto err;
... ...
@@ -128,6 +175,7 @@ int db_redis_connect(km_redis_con_t *con) {
128 175
             con->con->errstr);
129 176
         goto err;
130 177
     }
178
+#endif
131 179
 
132 180
     if (con->id->password) {
133 181
         reply = redisCommand(con->con, "AUTH %s", con->id->password);
... ...
@@ -144,6 +192,7 @@ int db_redis_connect(km_redis_con_t *con) {
144 192
         freeReplyObject(reply); reply = NULL;
145 193
     }
146 194
 
195
+#ifndef WITH_HIREDIS_CLUSTER
147 196
     reply = redisCommand(con->con, "PING");
148 197
     if (!reply) {
149 198
         LM_ERR("cannot ping server on connection %.*s: %s\n",
... ...
@@ -169,8 +218,10 @@ int db_redis_connect(km_redis_con_t *con) {
169 218
         goto err;
170 219
     }
171 220
     freeReplyObject(reply); reply = NULL;
221
+#endif
172 222
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
173 223
 
224
+#ifndef WITH_HIREDIS_CLUSTER
174 225
     reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA);
175 226
     if (!reply) {
176 227
         LM_ERR("failed to load LUA script to server %.*s: %s\n",
... ...
@@ -194,6 +245,7 @@ int db_redis_connect(km_redis_con_t *con) {
194 245
     }
195 246
     strcpy(con->srem_key_lua, reply->str);
196 247
     freeReplyObject(reply); reply = NULL;
248
+#endif
197 249
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
198 250
 
199 251
     return 0;
... ...
@@ -291,6 +343,63 @@ void db_redis_free_connection(struct pool_con* con) {
291 343
     pkg_free(_c);
292 344
 }
293 345
 
346
+#ifdef WITH_HIREDIS_CLUSTER
347
+void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node) {
348
+    char **argv = NULL;
349
+    int argc;
350
+    #define MAX_CMD_LENGTH 256
351
+    char cmd[MAX_CMD_LENGTH] = "";
352
+    size_t cmd_len = MAX_CMD_LENGTH - 1;
353
+    int i;
354
+
355
+    print_query(query);
356
+
357
+    argc = db_redis_key_list2arr(query, &argv);
358
+    if (argc < 0) {
359
+        LM_ERR("Failed to allocate memory for query array\n");
360
+        return NULL;
361
+    }
362
+    LM_DBG("query has %d args\n", argc);
363
+
364
+    for (i=0; i<argc; i++)
365
+    {
366
+        size_t arg_len = strlen(argv[i]);
367
+        if (arg_len > cmd_len)
368
+                        break;
369
+        strncat(cmd, argv[i], cmd_len);
370
+        cmd_len = cmd_len - arg_len;
371
+        if (cmd_len == 0)
372
+            break;
373
+        
374
+        if (i != argc - 1) {
375
+            strncat(cmd, " ", cmd_len);
376
+            cmd_len--;
377
+        }
378
+
379
+    }
380
+
381
+    LM_DBG("cmd is %s\n", cmd);
382
+
383
+    redisReply *reply = redisClusterCommandToNode(con->con, node, cmd);
384
+    if (con->con->err != REDIS_OK) {
385
+        LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
386
+        if (db_redis_connect(con) != 0) {
387
+            LM_ERR("Failed to reconnect to redis db\n");
388
+            pkg_free(argv);
389
+            if (con->con) {
390
+                redisFree(con->con);
391
+                con->con = NULL;
392
+            }
393
+            return NULL;
394
+        }
395
+        reply = redisClusterCommandToNode(con->con, node, cmd);
396
+    }
397
+    pkg_free(argv);
398
+    return reply;
399
+}
400
+    
401
+#endif
402
+
294 403
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
295 404
     char **argv = NULL;
296 405
     int argc;
Browse code

db_redis: fix broken pipe issue, if redis server with timeout setting.

- issue #2764

FredWH authored on 28/06/2021 06:46:25 • Daniel-Constantin Mierla committed on 02/08/2021 13:00:18
Showing 1 changed files
... ...
@@ -305,7 +305,8 @@ void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
305 305
     LM_DBG("query has %d args\n", argc);
306 306
 
307 307
     redisReply *reply = redisCommandArgv(con->con, argc, (const char**)argv, NULL);
308
-    if (con->con->err == REDIS_ERR_EOF) {
308
+    if (con->con->err != REDIS_OK) {
309
+        LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
309 310
         if (db_redis_connect(con) != 0) {
310 311
             LM_ERR("Failed to reconnect to redis db\n");
311 312
             pkg_free(argv);
... ...
@@ -344,7 +345,8 @@ int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query, int qu
344 345
     // this should actually never happen, because if all replies
345 346
     // are properly consumed for the previous command, it won't send
346 347
     // out a new query until redisGetReply is called
347
-    if (con->con->err == REDIS_ERR_EOF) {
348
+    if (con->con->err != REDIS_OK) {
349
+        LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
348 350
         if (db_redis_connect(con) != 0) {
349 351
             LM_ERR("Failed to reconnect to redis db\n");
350 352
             pkg_free(argv);
... ...
@@ -374,8 +376,8 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
374 376
 
375 377
     *reply = NULL;
376 378
     ret = redisGetReply(con->con, reply);
377
-    if (con->con->err == REDIS_ERR_EOF) {
378
-        LM_DBG("redis connection is gone, try reconnect\n");
379
+    if (con->con->err != REDIS_OK) {
380
+        LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
379 381
         con->append_counter = 0;
380 382
         if (db_redis_connect(con) != 0) {
381 383
             LM_ERR("Failed to reconnect to redis db\n");
... ...
@@ -396,7 +398,7 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
396 398
             db_redis_key_free(&query);
397 399
         }
398 400
         ret = redisGetReply(con->con, reply);
399
-        if (con->con->err != REDIS_ERR_EOF) {
401
+        if (con->con->err == REDIS_OK) {
400 402
             con->append_counter--;
401 403
         }
402 404
     } else {
Browse code

db_redis: performance improvements and fixes

- Support update of type key columns
- Support range scans on timestamp and int keys through a series of
wildcard matches when used with a < or > operator
- Support exponential increase and fallof for SCAN
- Pad bigint values to 10 digits for faster wildcard matching
- Use KEYS instead of SCAN by default for faster keys matching
- Support optional versioning of table names
- Simulate non-unique indexes through parent sets for O(1) counting of
entries

Richard Fuchs authored on 22/11/2019 13:35:49
Showing 1 changed files
... ...
@@ -23,6 +23,7 @@
23 23
 #include "db_redis_mod.h"
24 24
 #include "redis_connection.h"
25 25
 #include "redis_table.h"
26
+#include "redis_dbase.h"
26 27
 
27 28
 extern int db_redis_verbosity;
28 29
 
... ...
@@ -170,6 +171,31 @@ int db_redis_connect(km_redis_con_t *con) {
170 171
     freeReplyObject(reply); reply = NULL;
171 172
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
172 173
 
174
+    reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA);
175
+    if (!reply) {
176
+        LM_ERR("failed to load LUA script to server %.*s: %s\n",
177
+                con->id->url.len, con->id->url.s, con->con->errstr);
178
+        goto err;
179
+    }
180
+    if (reply->type == REDIS_REPLY_ERROR) {
181
+        LM_ERR("failed to load LUA script to server %.*s: %s\n",
182
+                con->id->url.len, con->id->url.s, reply->str);
183
+        goto err;
184
+    }
185
+    if (reply->type != REDIS_REPLY_STRING) {
186
+        LM_ERR("failed to load LUA script to server %.*s: %i\n",
187
+                con->id->url.len, con->id->url.s, reply->type);
188
+        goto err;
189
+    }
190
+    if (reply->len >= sizeof(con->srem_key_lua)) {
191
+        LM_ERR("failed to load LUA script to server %.*s: %i >= %i\n",
192
+                con->id->url.len, con->id->url.s, (int) reply->len, (int) sizeof(con->srem_key_lua));
193
+        goto err;
194
+    }
195
+    strcpy(con->srem_key_lua, reply->str);
196
+    freeReplyObject(reply); reply = NULL;
197
+    LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
198
+
173 199
     return 0;
174 200
 
175 201
 err:
Browse code

db_redis: log message if failure to get reply

Daniel-Constantin Mierla authored on 19/11/2018 08:19:58
Showing 1 changed files
... ...
@@ -394,7 +394,9 @@ void db_redis_consume_replies(km_redis_con_t *con) {
394 394
     redis_key_t *query;
395 395
     while (con->append_counter > 0 && con->con && !con->con->err) {
396 396
         LM_DBG("consuming outstanding reply %u", con->append_counter);
397
-        db_redis_get_reply(con, (void**)&reply);
397
+        if(db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
398
+            LM_DBG("failure to get the reply\n");
399
+        }
398 400
         if (reply) {
399 401
             freeReplyObject(reply);
400 402
             reply = NULL;
Browse code

db_redis: fix reconnection failure bug

Richard Fuchs authored on 28/09/2018 21:42:11
Showing 1 changed files
... ...
@@ -392,7 +392,7 @@ void db_redis_free_reply(redisReply **reply) {
392 392
 void db_redis_consume_replies(km_redis_con_t *con) {
393 393
     redisReply *reply = NULL;
394 394
     redis_key_t *query;
395
-    while (con->append_counter > 0 && !con->con->err) {
395
+    while (con->append_counter > 0 && con->con && !con->con->err) {
396 396
         LM_DBG("consuming outstanding reply %u", con->append_counter);
397 397
         db_redis_get_reply(con, (void**)&reply);
398 398
         if (reply) {
Browse code

db_redis: control printing all db tables via parameter verbosity

- the list can be long and the output can make waching logs harder for
rpc commands that connect/disconnect each time

Daniel-Constantin Mierla authored on 26/09/2018 16:00:24
Showing 1 changed files
... ...
@@ -24,6 +24,8 @@
24 24
 #include "redis_connection.h"
25 25
 #include "redis_table.h"
26 26
 
27
+extern int db_redis_verbosity;
28
+
27 29
 static void print_query(redis_key_t *query) {
28 30
     redis_key_t *k;
29 31
 
... ...
@@ -217,7 +219,7 @@ km_redis_con_t* db_redis_new_connection(const struct db_id* id) {
217 219
         goto err;
218 220
     }
219 221
 
220
-    db_redis_print_all_tables(ptr);
222
+    if(db_redis_verbosity > 0) db_redis_print_all_tables(ptr);
221 223
 
222 224
     ptr->ref = 1;
223 225
     ptr->append_counter = 0;
Browse code

db_redis: fixed compile warning on testing array address

Daniel-Constantin Mierla authored on 24/09/2018 18:05:26
Showing 1 changed files
... ...
@@ -405,9 +405,9 @@ void db_redis_consume_replies(km_redis_con_t *con) {
405 405
 }
406 406
 
407 407
 const char *db_redis_get_error(km_redis_con_t *con) {
408
-    if (con && con->con && con->con->errstr) {
408
+    if (con && con->con && con->con->errstr[0]) {
409 409
         return con->con->errstr;
410 410
     } else {
411 411
         return "<broken redis connection>";
412 412
     }
413
-}
414 413
\ No newline at end of file
414
+}
Browse code

db_redis: Fix memleaks on delete

Make sure to release pkg memory on delete operations.
Improve error handling to avoid segfault on broken connection.
Warn on full table scans to help improve key definition.

Andreas Granig authored on 14/06/2018 13:49:11
Showing 1 changed files
... ...
@@ -403,3 +403,11 @@ void db_redis_consume_replies(km_redis_con_t *con) {
403 403
         db_redis_key_free(&query);
404 404
     }
405 405
 }
406
+
407
+const char *db_redis_get_error(km_redis_con_t *con) {
408
+    if (con && con->con && con->con->errstr) {
409
+        return con->con->errstr;
410
+    } else {
411
+        return "<broken redis connection>";
412
+    }
413
+}
406 414
\ No newline at end of file
Browse code

db_redis: Fix scanning large tables

* When querying large tables (e.g. pre-loading location by usrloc),
make sure to use O(1) when adding keys by prepending them to list.
* Increase batch size of redis scan command to reduce number of
redis queries.
* Batch creation of DB_ROW entries to free up memory allocated by
redis in heap regularly.
* Fix more issues reported by coverity.

Andreas Granig authored on 11/04/2018 15:28:32
Showing 1 changed files
... ...
@@ -25,7 +25,7 @@
25 25
 #include "redis_table.h"
26 26
 
27 27
 static void print_query(redis_key_t *query) {
28
-	redis_key_t *k;
28
+    redis_key_t *k;
29 29
 
30 30
     LM_DBG("Query dump:\n");
31 31
     for (k = query; k; k = k->next) {
... ...
@@ -355,6 +355,7 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
355 355
                 redisFree(con->con);
356 356
                 con->con = NULL;
357 357
             }
358
+            return -1;
358 359
         }
359 360
         // take commands from oldest to newest and re-do again,
360 361
         // but don't queue them once again in retry-mode
Browse code

db_redis: Fix various pointer and memory issues

Issues discovered by coverity:
* Fix mem leaks in error handling
* Fix potential null pointer deref
* Fix potential out-of-memory cases

Andreas Granig authored on 19/03/2018 16:37:29
Showing 1 changed files
... ...
@@ -339,6 +339,11 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
339 339
     int ret;
340 340
     redis_key_t *query;
341 341
 
342
+    if (!con || !con->con) {
343
+        LM_ERR("Internal error passing null connection\n");
344
+        return -1;
345
+    }
346
+
342 347
     *reply = NULL;
343 348
     ret = redisGetReply(con->con, reply);
344 349
     if (con->con->err == REDIS_ERR_EOF) {
Browse code

db_redis: Fix removal of correct queued command

When calling db_redis_get_reply, remove oldest queued command
instead of newest.

Andreas Granig authored on 27/02/2018 16:55:42
Showing 1 changed files
... ...
@@ -98,28 +98,6 @@ static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
98 98
     return query;
99 99
 }
100 100
 
101
-static redis_key_t* db_redis_pop_query(km_redis_con_t *con) {
102
-    redis_command_t **current;
103
-    redis_command_t *prev;
104
-    redis_key_t *query;
105
-
106
-    current = &con->command_queue;
107
-    if (!*current)
108
-        return NULL;
109
-
110
-    do {
111
-        query = (*current)->query;
112
-        prev = *current;
113
-        *current = (*current)->next;
114
-    } while (*current);
115
-
116
-    prev->next = NULL;
117
-    pkg_free(*current);
118
-    *current = NULL;
119
-
120
-    return query;
121
-}
122
-
123 101
 int db_redis_connect(km_redis_con_t *con) {
124 102
     struct timeval tv;
125 103
     redisReply *reply;
... ...
@@ -388,8 +366,8 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
388 366
             con->append_counter--;
389 367
         }
390 368
     } else {
391
-        LM_DBG("get_reply successful, popping query\n");
392
-        query = db_redis_pop_query(con);
369
+        LM_DBG("get_reply successful, removing query\n");
370
+        query = db_redis_shift_query(con);
393 371
         db_redis_key_free(&query);
394 372
         con->append_counter--;
395 373
     }
Browse code

db_redis: declare variable at beginning of function

- doing it inside for statement breaks compiling with less than C99
- reported in Jenkings debs building log

Daniel-Constantin Mierla authored on 21/02/2018 07:17:46
Showing 1 changed files
... ...
@@ -25,8 +25,10 @@
25 25
 #include "redis_table.h"
26 26
 
27 27
 static void print_query(redis_key_t *query) {
28
+	redis_key_t *k;
29
+
28 30
     LM_DBG("Query dump:\n");
29
-    for (redis_key_t *k = query; k; k = k->next) {
31
+    for (k = query; k; k = k->next) {
30 32
         LM_DBG("  %s\n", k->key.s);
31 33
     }
32 34
 }
Browse code

db_redis: re-do appended commands after reconnect

Since hiredis sends out the pipelined command only after
calling redisGetReply(), we need a mechamism to queue up all
appended commands, so we can re-queue them once the connection
is re-established.

This commit introduces a redis command queue used for pipelined
commands to re-do all appended commands in case of a connection drop
(which typically happens in db-mode write-through with very low
traffic, where redis would close the connection due to inactivity).

Andreas Granig authored on 20/02/2018 10:12:57
Showing 1 changed files
... ...
@@ -24,6 +24,100 @@
24 24
 #include "redis_connection.h"
25 25
 #include "redis_table.h"
26 26
 
27
+static void print_query(redis_key_t *query) {
28
+    LM_DBG("Query dump:\n");
29
+    for (redis_key_t *k = query; k; k = k->next) {
30
+        LM_DBG("  %s\n", k->key.s);
31
+    }
32
+}
33
+
34
+static int db_redis_push_query(km_redis_con_t *con, redis_key_t *query) {
35
+
36
+    redis_command_t *cmd = NULL;
37
+    redis_command_t *tmp = NULL;
38
+    redis_key_t *new_query = NULL;
39
+
40
+    if (!query)
41
+        return 0;
42
+
43
+    cmd = (redis_command_t*)pkg_malloc(sizeof(redis_command_t));
44
+    if (!cmd) {
45
+        LM_ERR("Failed to allocate memory for redis command\n");
46
+        goto err;
47
+    }
48
+
49
+    // duplicate query, as original one might be free'd after being
50
+    // appended
51
+    while(query) {
52
+         if (db_redis_key_add_str(&new_query, &query->key) != 0) {
53
+            LM_ERR("Failed to duplicate query\n");
54
+            goto err;
55
+        }
56
+        query = query->next;
57
+    }
58
+
59
+    cmd->query = new_query;
60
+    cmd->next = NULL;
61
+
62
+    if (!con->command_queue) {
63
+        con->command_queue = cmd;
64
+    } else {
65
+        tmp = con->command_queue;
66
+        while (tmp->next)
67
+            tmp = tmp->next;
68
+        tmp->next = cmd;
69
+    }
70
+
71
+    return 0;
72
+
73
+err:
74
+    if (new_query) {
75
+        db_redis_key_free(&new_query);
76
+    }
77
+    if (cmd) {
78
+        pkg_free(cmd);
79
+    }
80
+    return -1;
81
+}
82
+
83
+static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
84
+    redis_command_t *cmd;
85
+    redis_key_t *query;
86
+
87
+    query = NULL;
88
+    cmd = con->command_queue;
89
+
90
+    if (cmd) {
91
+        query = cmd->query;
92
+        con->command_queue = cmd->next;
93
+        pkg_free(cmd);
94
+    }
95
+
96
+    return query;
97
+}
98
+
99
+static redis_key_t* db_redis_pop_query(km_redis_con_t *con) {
100
+    redis_command_t **current;
101
+    redis_command_t *prev;
102
+    redis_key_t *query;
103
+
104
+    current = &con->command_queue;
105
+    if (!*current)
106
+        return NULL;
107
+
108
+    do {
109
+        query = (*current)->query;
110
+        prev = *current;
111
+        *current = (*current)->next;
112
+    } while (*current);
113
+
114
+    prev->next = NULL;
115
+    pkg_free(*current);
116
+    *current = NULL;
117
+
118
+    return query;
119
+}
120
+
27 121
 int db_redis_connect(km_redis_con_t *con) {
28 122
     struct timeval tv;
29 123
     redisReply *reply;
... ...
@@ -189,16 +283,6 @@ void db_redis_free_connection(struct pool_con* con) {
189 283
     pkg_free(_c);
190 284
 }
191 285
 
192
-
193
-static void print_query(redis_key_t *query) {
194
-	redis_key_t *k;
195
-
196
-    LM_DBG("Query dump:\n");
197
-    for (k = query; k; k = k->next) {
198
-        LM_DBG("  %s\n", k->key.s);
199
-    }
200
-}
201
-
202 286
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
203 287
     char **argv = NULL;
204 288
     int argc;
... ...
@@ -229,12 +313,17 @@ void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
229 313
     return reply;
230 314
 }
231 315
 
232
-int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
316
+int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query, int queue) {
233 317
     char **argv = NULL;
234 318
     int ret, argc;
235 319
 
236 320
     print_query(query);
237 321
 
322
+    if (queue > 0 && db_redis_push_query(con, query) != 0) {
323
+        LM_ERR("Failed to queue redis command\n");
324
+        return -1;
325
+    }
326
+
238 327
     argc = db_redis_key_list2arr(query, &argv);
239 328
     if (argc < 0) {
240 329
         LM_ERR("Failed to allocate memory for query array\n");
... ...
@@ -243,6 +332,10 @@ int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
243 332
     LM_DBG("query has %d args\n", argc);
244 333
 
245 334
     ret = redisAppendCommandArgv(con->con, argc, (const char**)argv, NULL);
335
+
336
+    // this should actually never happen, because if all replies
337
+    // are properly consumed for the previous command, it won't send
338
+    // out a new query until redisGetReply is called
246 339
     if (con->con->err == REDIS_ERR_EOF) {
247 340
         if (db_redis_connect(con) != 0) {
248 341
             LM_ERR("Failed to reconnect to redis db\n");
... ...
@@ -264,22 +357,40 @@ int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
264 357
 
265 358
 int db_redis_get_reply(km_redis_con_t *con, void **reply) {
266 359
     int ret;
360
+    redis_key_t *query;
267 361
 
268 362
     *reply = NULL;
269 363
     ret = redisGetReply(con->con, reply);
270 364
     if (con->con->err == REDIS_ERR_EOF) {
365
+        LM_DBG("redis connection is gone, try reconnect\n");
366
+        con->append_counter = 0;
271 367
         if (db_redis_connect(con) != 0) {
272 368
             LM_ERR("Failed to reconnect to redis db\n");
273 369
             if (con->con) {
274 370
                 redisFree(con->con);
275 371
                 con->con = NULL;
276 372
             }
277
-            return ret;
373
+        }
374
+        // take commands from oldest to newest and re-do again,
375
+        // but don't queue them once again in retry-mode
376
+        while ((query = db_redis_shift_query(con))) {
377
+            LM_DBG("re-queueing appended command\n");
378
+            if (db_redis_append_command_argv(con, query, 0) != 0) {
379
+                LM_ERR("Failed to re-queue redis command");
380
+                return -1;
381
+            }
382
+            db_redis_key_free(&query);
278 383
         }
279 384
         ret = redisGetReply(con->con, reply);
280
-    }
281
-    if (!con->con->err)
385
+        if (con->con->err != REDIS_ERR_EOF) {
386
+            con->append_counter--;
387
+        }
388
+    } else {
389
+        LM_DBG("get_reply successful, popping query\n");
390
+        query = db_redis_pop_query(con);
391
+        db_redis_key_free(&query);
282 392
         con->append_counter--;
393
+    }
283 394
     return ret;
284 395
 }
285 396
 
... ...
@@ -292,6 +403,7 @@ void db_redis_free_reply(redisReply **reply) {
292 403
 
293 404
 void db_redis_consume_replies(km_redis_con_t *con) {
294 405
     redisReply *reply = NULL;
406
+    redis_key_t *query;
295 407
     while (con->append_counter > 0 && !con->con->err) {
296 408
         LM_DBG("consuming outstanding reply %u", con->append_counter);
297 409
         db_redis_get_reply(con, (void**)&reply);
... ...
@@ -300,4 +412,8 @@ void db_redis_consume_replies(km_redis_con_t *con) {
300 412
             reply = NULL;
301 413
         }
302 414
     }
415
+    while ((query = db_redis_shift_query(con))) {
416
+        LM_DBG("consuming queued command\n");
417
+        db_redis_key_free(&query);
418
+    }
303 419
 }
Browse code

db_redis: don't declare vars in for statement

- not compliant with C strict compile older than C99

Daniel-Constantin Mierla authored on 16/02/2018 07:50:27
Showing 1 changed files
... ...
@@ -191,8 +191,10 @@ void db_redis_free_connection(struct pool_con* con) {
191 191
 
192 192
 
193 193
 static void print_query(redis_key_t *query) {
194
+	redis_key_t *k;
195
+
194 196
     LM_DBG("Query dump:\n");
195
-    for (redis_key_t *k = query; k; k = k->next) {
197
+    for (k = query; k; k = k->next) {
196 198
         LM_DBG("  %s\n", k->key.s);
197 199
     }
198 200
 }
Browse code

db_redis: Use schema files and improve keys def

* Auto-generate schema files for redis from xml specs and use
them in module instead of having to define them as mod params.
* Allow key definition line by line with multiple "keys" mod params.
* Fetch table versions from schema to avoid having to populate them
in Redis.
* Fix reconnection issues on connection drops when Redis takes longer
to start.
* Fix documentation formatting issues.

Andreas Granig authored on 13/02/2018 13:55:35
Showing 1 changed files
... ...
@@ -39,6 +39,7 @@ int db_redis_connect(km_redis_con_t *con) {
39 39
     // TODO: on carrier, if we have db fail-over, the currently connected
40 40
     // redis server will become slave without dropping connections?
41 41
 
42
+    LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port);
42 43
     con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
43 44
 
44 45
     if (!con->con) {
... ...
@@ -91,6 +92,7 @@ int db_redis_connect(km_redis_con_t *con) {
91 92
         goto err;
92 93
     }
93 94
     freeReplyObject(reply); reply = NULL;
95
+    LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
94 96
 
95 97
     return 0;
96 98
 
... ...
@@ -125,9 +127,9 @@ km_redis_con_t* db_redis_new_connection(const struct db_id* id) {
125 127
     ptr->id = (struct db_id*)id;
126 128
 
127 129
     /*
128
-    LM_DBG("trying to initialize connection to '%.*s' with schema '%.*s' and keys '%.*s'\n",
130
+    LM_DBG("trying to initialize connection to '%.*s' with schema path '%.*s' and keys '%.*s'\n",
129 131
             id->url.len, id->url.s,
130
-            redis_schema.len, redis_schema.s,
132
+            redis_schema_path.len, redis_schema_path.s,
131 133
             redis_keys.len, redis_keys.s);
132 134
     */
133 135
     LM_DBG("trying to initialize connection to '%.*s'\n",
... ...
@@ -209,12 +211,14 @@ void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
209 211
     LM_DBG("query has %d args\n", argc);
210 212
 
211 213
     redisReply *reply = redisCommandArgv(con->con, argc, (const char**)argv, NULL);
212
-    if (con->con->err == REDIS_ERR_EOF &&
213
-        strcmp(con->con->errstr,"Server closed the connection") == 0) {
214
-
214
+    if (con->con->err == REDIS_ERR_EOF) {
215 215
         if (db_redis_connect(con) != 0) {
216 216
             LM_ERR("Failed to reconnect to redis db\n");
217 217
             pkg_free(argv);
218
+            if (con->con) {
219
+                redisFree(con->con);
220
+                con->con = NULL;
221
+            }
218 222
             return NULL;
219 223
         }
220 224
         reply = redisCommandArgv(con->con, argc, (const char**)argv, NULL);
... ...
@@ -237,12 +241,14 @@ int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
237 241
     LM_DBG("query has %d args\n", argc);
238 242
 
239 243
     ret = redisAppendCommandArgv(con->con, argc, (const char**)argv, NULL);
240
-    if (con->con->err == REDIS_ERR_EOF &&
241
-        strcmp(con->con->errstr,"Server closed the connection") == 0) {
242
-
244
+    if (con->con->err == REDIS_ERR_EOF) {
243 245
         if (db_redis_connect(con) != 0) {
244 246
             LM_ERR("Failed to reconnect to redis db\n");
245 247
             pkg_free(argv);
248
+            if (con->con) {
249
+                redisFree(con->con);
250
+                con->con = NULL;
251
+            }
246 252
             return ret;
247 253
         }
248 254
         ret = redisAppendCommandArgv(con->con, argc, (const char**)argv, NULL);
... ...
@@ -259,11 +265,13 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
259 265
 
260 266
     *reply = NULL;
261 267
     ret = redisGetReply(con->con, reply);
262
-    if (con->con->err == REDIS_ERR_EOF &&
263
-        strcmp(con->con->errstr,"Server closed the connection") == 0) {
264
-
268
+    if (con->con->err == REDIS_ERR_EOF) {
265 269
         if (db_redis_connect(con) != 0) {
266 270
             LM_ERR("Failed to reconnect to redis db\n");
271
+            if (con->con) {
272
+                redisFree(con->con);
273
+                con->con = NULL;
274
+            }
267 275
             return ret;
268 276
         }
269 277
         ret = redisGetReply(con->con, reply);
Browse code

db_redis: Implement db_redis generic db driver

This module implements a generic db driver for kamailio. It
requires a "schema" and "key" definition of "tables" and corresponding
keys for redis in the kamailio config file, otherwise it's supposed to
work with every module.

Implemented methods are query (w/o order-by), insert, update, delete.

Tested with usrloc and acc.

Andreas Granig authored on 07/02/2018 12:52:56
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,293 @@
1
+/*
2
+ * Copyright (C) 2018 Andreas Granig (sipwise.com)
3
+ *
4
+ * This file is part of Kamailio, a free SIP server.
5
+ *
6
+ * Kamailio is free software; you can redistribute it and/or modify
7
+ * it under the terms of the GNU General Public License as published by
8
+ * the Free Software Foundation; either version 2 of the License, or
9
+ * (at your option) any later version
10
+ *
11
+ * Kamailio is distributed in the hope that it will be useful,
12
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
+ * GNU General Public License for more details.
15
+ *
16
+ * You should have received a copy of the GNU General Public License
17
+ * along with this program; if not, write to the Free Software
18
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
+ */
20
+
21
+#include <stdlib.h>
22
+
23
+#include "db_redis_mod.h"
24
+#include "redis_connection.h"
25
+#include "redis_table.h"
26
+
27
+int db_redis_connect(km_redis_con_t *con) {
28
+    struct timeval tv;
29
+    redisReply *reply;
30
+    int db;
31
+
32
+    tv.tv_sec = 1;
33
+    tv.tv_usec = 0;
34
+
35
+    db = atoi(con->id->database);
36
+    reply = NULL;
37
+
38
+    // TODO: introduce require_master mod-param and check if we're indeed master
39
+    // TODO: on carrier, if we have db fail-over, the currently connected
40
+    // redis server will become slave without dropping connections?
41
+
42
+    con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
43
+
44
+    if (!con->con) {
45
+        LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
46
+        goto err;
47
+    }
48
+    if (con->con->err) {
49
+        LM_ERR("cannot open connection to %.*s: %s\n", con->id->url.len, con->id->url.s,
50
+            con->con->errstr);
51
+        goto err;
52
+    }
53
+
54
+    if (con->id->password) {
55
+        reply = redisCommand(con->con, "AUTH %s", con->id->password);
56
+        if (!reply) {
57
+            LM_ERR("cannot authenticate connection %.*s: %s\n",
58
+                    con->id->url.len, con->id->url.s, con->con->errstr);
59
+            goto err;
60
+        }
61
+        if (reply->type == REDIS_REPLY_ERROR) {
62
+            LM_ERR("cannot authenticate connection %.*s: %s\n",
63
+                    con->id->url.len, con->id->url.s, reply->str);
64
+            goto err;
65
+        }
66
+        freeReplyObject(reply); reply = NULL;
67
+    }
68
+
69
+    reply = redisCommand(con->con, "PING");
70
+    if (!reply) {
71
+        LM_ERR("cannot ping server on connection %.*s: %s\n",
72
+                con->id->url.len, con->id->url.s, con->con->errstr);
73
+        goto err;
74
+    }
75
+    if (reply->type == REDIS_REPLY_ERROR) {
76
+        LM_ERR("cannot ping server on connection %.*s: %s\n",
77
+                con->id->url.len, con->id->url.s, reply->str);
78
+        goto err;
79
+    }
80
+    freeReplyObject(reply); reply = NULL;
81
+
82
+    reply = redisCommand(con->con, "SELECT %i", db);
83
+    if (!reply) {
84
+        LM_ERR("cannot select db on connection %.*s: %s\n",
85
+                con->id->url.len, con->id->url.s, con->con->errstr);
86
+        goto err;
87
+    }
88
+    if (reply->type == REDIS_REPLY_ERROR) {
89
+        LM_ERR("cannot select db on connection %.*s: %s\n",
90
+                con->id->url.len, con->id->url.s, reply->str);
91
+        goto err;
92
+    }
93
+    freeReplyObject(reply); reply = NULL;
94
+
95
+    return 0;
96
+
97
+err:
98
+    if (reply)
99
+        freeReplyObject(reply);
100
+    if (con->con) {
101
+        redisFree(con->con);
102
+        con->con = NULL;
103
+    }
104
+    return -1;
105
+}
106
+
107
+/*! \brief
108
+ * Create a new connection structure,
109
+ * open the redis connection and set reference count to 1
110
+ */
111
+km_redis_con_t* db_redis_new_connection(const struct db_id* id) {
112
+    km_redis_con_t *ptr = NULL;
113
+
114
+    if (!id) {
115
+        LM_ERR("invalid id parameter value\n");
116
+        return 0;
117
+    }
118
+
119
+    ptr = (km_redis_con_t*)pkg_malloc(sizeof(km_redis_con_t));
120
+    if (!ptr) {
121
+        LM_ERR("no private memory left\n");
122
+        return 0;
123
+    }
124
+    memset(ptr, 0, sizeof(km_redis_con_t));
125
+    ptr->id = (struct db_id*)id;
126
+
127
+    /*
128
+    LM_DBG("trying to initialize connection to '%.*s' with schema '%.*s' and keys '%.*s'\n",
129
+            id->url.len, id->url.s,
130
+            redis_schema.len, redis_schema.s,
131
+            redis_keys.len, redis_keys.s);
132
+    */
133
+    LM_DBG("trying to initialize connection to '%.*s'\n",
134
+            id->url.len, id->url.s);
135
+    if (db_redis_parse_schema(ptr) != 0) {
136
+        LM_ERR("failed to parse 'schema' module parameter\n");
137
+        goto err;
138
+    }
139
+    if (db_redis_parse_keys(ptr) != 0) {
140
+        LM_ERR("failed to parse 'keys' module parameter\n");
141
+        goto err;
142
+    }
143
+
144
+    db_redis_print_all_tables(ptr);
145
+
146
+    ptr->ref = 1;
147
+    ptr->append_counter = 0;
148
+
149
+    if (db_redis_connect(ptr) != 0) {
150
+        LM_ERR("Failed to connect to redis db\n");
151
+        goto err;
152
+    }
153
+
154
+    LM_DBG("connection opened to %.*s\n", id->url.len, id->url.s);
155
+
156
+    return ptr;
157
+
158
+ err:
159
+    if (ptr) {
160
+        if (ptr->con) {
161
+            redisFree(ptr->con);
162
+        }
163
+        pkg_free(ptr);
164
+    }
165
+    return 0;
166
+}
167
+
168
+
169
+/*! \brief
170
+ * Close the connection and release memory
171
+ */
172
+void db_redis_free_connection(struct pool_con* con) {
173
+    km_redis_con_t * _c;
174
+
175
+    LM_DBG("freeing db_redis connection\n");
176
+
177
+    if (!con) return;
178
+
179
+    _c = (km_redis_con_t*) con;
180
+
181
+    if (_c->id) free_db_id(_c->id);
182
+    if (_c->con) {
183
+        redisFree(_c->con);
184
+    }
185
+
186
+    db_redis_free_tables(_c);
187
+    pkg_free(_c);