Browse code

db_redis: re-do appended commands after reconnect

Since hiredis sends out the pipelined command only after
calling redisGetReply(), we need a mechamism to queue up all
appended commands, so we can re-queue them once the connection
is re-established.

This commit introduces a redis command queue used for pipelined
commands to re-do all appended commands in case of a connection drop
(which typically happens in db-mode write-through with very low
traffic, where redis would close the connection due to inactivity).

Andreas Granig authored on 20/02/2018 10:12:57
Showing 5 changed files
... ...
@@ -24,6 +24,100 @@
24 24
 #include "redis_connection.h"
25 25
 #include "redis_table.h"
26 26
 
27
+static void print_query(redis_key_t *query) {
28
+    LM_DBG("Query dump:\n");
29
+    for (redis_key_t *k = query; k; k = k->next) {
30
+        LM_DBG("  %s\n", k->key.s);
31
+    }
32
+}
33
+
34
+static int db_redis_push_query(km_redis_con_t *con, redis_key_t *query) {
35
+
36
+    redis_command_t *cmd = NULL;
37
+    redis_command_t *tmp = NULL;
38
+    redis_key_t *new_query = NULL;
39
+
40
+    if (!query)
41
+        return 0;
42
+
43
+    cmd = (redis_command_t*)pkg_malloc(sizeof(redis_command_t));
44
+    if (!cmd) {
45
+        LM_ERR("Failed to allocate memory for redis command\n");
46
+        goto err;
47
+    }
48
+
49
+    // duplicate query, as original one might be free'd after being
50
+    // appended
51
+    while(query) {
52
+         if (db_redis_key_add_str(&new_query, &query->key) != 0) {
53
+            LM_ERR("Failed to duplicate query\n");
54
+            goto err;
55
+        }
56
+        query = query->next;
57
+    }
58
+
59
+    cmd->query = new_query;
60
+    cmd->next = NULL;
61
+
62
+    if (!con->command_queue) {
63
+        con->command_queue = cmd;
64
+    } else {
65
+        tmp = con->command_queue;
66
+        while (tmp->next)
67
+            tmp = tmp->next;
68
+        tmp->next = cmd;
69
+    }
70
+
71
+    return 0;
72
+
73
+err:
74
+    if (new_query) {
75
+        db_redis_key_free(&new_query);
76
+    }
77
+    if (cmd) {
78
+        pkg_free(cmd);
79
+    }
80
+    return -1;
81
+}
82
+
83
+static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
84
+    redis_command_t *cmd;
85
+    redis_key_t *query;
86
+
87
+    query = NULL;
88
+    cmd = con->command_queue;
89
+
90
+    if (cmd) {
91
+        query = cmd->query;
92
+        con->command_queue = cmd->next;
93
+        pkg_free(cmd);
94
+    }
95
+
96
+    return query;
97
+}
98
+
99
+static redis_key_t* db_redis_pop_query(km_redis_con_t *con) {
100
+    redis_command_t **current;
101
+    redis_command_t *prev;
102
+    redis_key_t *query;
103
+
104
+    current = &con->command_queue;
105
+    if (!*current)
106
+        return NULL;
107
+
108
+    do {
109
+        query = (*current)->query;
110
+        prev = *current;
111
+        *current = (*current)->next;
112
+    } while (*current);
113
+
114
+    prev->next = NULL;
115
+    pkg_free(*current);
116
+    *current = NULL;
117
+
118
+    return query;
119
+}
120
+
27 121
 int db_redis_connect(km_redis_con_t *con) {
28 122
     struct timeval tv;
29 123
     redisReply *reply;
... ...
@@ -189,16 +283,6 @@ void db_redis_free_connection(struct pool_con* con) {
189 283
     pkg_free(_c);
190 284
 }
191 285
 
192
-
193
-static void print_query(redis_key_t *query) {
194
-	redis_key_t *k;
195
-
196
-    LM_DBG("Query dump:\n");
197
-    for (k = query; k; k = k->next) {
198
-        LM_DBG("  %s\n", k->key.s);
199
-    }
200
-}
201
-
202 286
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
203 287
     char **argv = NULL;
204 288
     int argc;
... ...
@@ -229,12 +313,17 @@ void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
229 313
     return reply;
230 314
 }
231 315
 
232
-int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
316
+int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query, int queue) {
233 317
     char **argv = NULL;
234 318
     int ret, argc;
235 319
 
236 320
     print_query(query);
237 321
 
322
+    if (queue > 0 && db_redis_push_query(con, query) != 0) {
323
+        LM_ERR("Failed to queue redis command\n");
324
+        return -1;
325
+    }
326
+
238 327
     argc = db_redis_key_list2arr(query, &argv);
239 328
     if (argc < 0) {
240 329
         LM_ERR("Failed to allocate memory for query array\n");
... ...
@@ -243,6 +332,10 @@ int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
243 332
     LM_DBG("query has %d args\n", argc);
244 333
 
245 334
     ret = redisAppendCommandArgv(con->con, argc, (const char**)argv, NULL);
335
+
336
+    // this should actually never happen, because if all replies
337
+    // are properly consumed for the previous command, it won't send
338
+    // out a new query until redisGetReply is called
246 339
     if (con->con->err == REDIS_ERR_EOF) {
247 340
         if (db_redis_connect(con) != 0) {
248 341
             LM_ERR("Failed to reconnect to redis db\n");
... ...
@@ -264,22 +357,40 @@ int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query) {
264 357
 
265 358
 int db_redis_get_reply(km_redis_con_t *con, void **reply) {
266 359
     int ret;
360
+    redis_key_t *query;
267 361
 
268 362
     *reply = NULL;
269 363
     ret = redisGetReply(con->con, reply);
270 364
     if (con->con->err == REDIS_ERR_EOF) {
365
+        LM_DBG("redis connection is gone, try reconnect\n");
366
+        con->append_counter = 0;
271 367
         if (db_redis_connect(con) != 0) {
272 368
             LM_ERR("Failed to reconnect to redis db\n");
273 369
             if (con->con) {
274 370
                 redisFree(con->con);
275 371
                 con->con = NULL;
276 372
             }
277
-            return ret;
373
+        }
374
+        // take commands from oldest to newest and re-do again,
375
+        // but don't queue them once again in retry-mode
376
+        while ((query = db_redis_shift_query(con))) {
377
+            LM_DBG("re-queueing appended command\n");
378
+            if (db_redis_append_command_argv(con, query, 0) != 0) {
379
+                LM_ERR("Failed to re-queue redis command");
380
+                return -1;
381
+            }
382
+            db_redis_key_free(&query);
278 383
         }
279 384
         ret = redisGetReply(con->con, reply);
280
-    }
281
-    if (!con->con->err)
385
+        if (con->con->err != REDIS_ERR_EOF) {
386
+            con->append_counter--;
387
+        }
388
+    } else {
389
+        LM_DBG("get_reply successful, popping query\n");
390
+        query = db_redis_pop_query(con);
391
+        db_redis_key_free(&query);
282 392
         con->append_counter--;
393
+    }
283 394
     return ret;
284 395
 }
285 396
 
... ...
@@ -292,6 +403,7 @@ void db_redis_free_reply(redisReply **reply) {
292 403
 
293 404
 void db_redis_consume_replies(km_redis_con_t *con) {
294 405
     redisReply *reply = NULL;
406
+    redis_key_t *query;
295 407
     while (con->append_counter > 0 && !con->con->err) {
296 408
         LM_DBG("consuming outstanding reply %u", con->append_counter);
297 409
         db_redis_get_reply(con, (void**)&reply);
... ...
@@ -300,4 +412,8 @@ void db_redis_consume_replies(km_redis_con_t *con) {
300 412
             reply = NULL;
301 413
         }
302 414
     }
415
+    while ((query = db_redis_shift_query(con))) {
416
+        LM_DBG("consuming queued command\n");
417
+        db_redis_key_free(&query);
418
+    }
303 419
 }
... ...
@@ -44,12 +44,20 @@
44 44
     } \
45 45
 } while(0);
46 46
 
47
+typedef struct redis_key redis_key_t;
48
+
49
+typedef struct redis_command {
50
+    redis_key_t *query;
51
+    struct redis_command *next;
52
+} redis_command_t;
53
+
47 54
 typedef struct km_redis_con {
48 55
     struct db_id* id;
49 56
     unsigned int ref;
50 57
     struct pool_con* next;
51 58
 
52 59
     redisContext *con;
60
+    redis_command_t *command_queue;
53 61
     unsigned int append_counter;
54 62
     struct str_hash_table tables;
55 63
 } km_redis_con_t;
... ...
@@ -65,7 +73,7 @@ void db_redis_free_connection(struct pool_con* con);
65 73
 
66 74
 int db_redis_connect(km_redis_con_t *con);
67 75
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query);
68
-int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query);
76
+int db_redis_append_command_argv(km_redis_con_t *con, redis_key_t *query, int queue);
69 77
 int db_redis_get_reply(km_redis_con_t *con, void **reply);
70 78
 void db_redis_consume_replies(km_redis_con_t *con);
71 79
 void db_redis_free_reply(redisReply **reply);
... ...
@@ -1088,7 +1088,7 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons
1088 1088
             LM_ERR("Failed to add key name to list\n");
1089 1089
             goto error;
1090 1090
         }
1091
-        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
1091
+        if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
1092 1092
             LM_ERR("Failed to append redis command\n");
1093 1093
             goto error;
1094 1094
         }
... ...
@@ -1127,7 +1127,7 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons
1127 1127
             }
1128 1128
         }
1129 1129
 
1130
-        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
1130
+        if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
1131 1131
             LM_ERR("Failed to append redis command\n");
1132 1132
             goto error;
1133 1133
         }
... ...
@@ -1451,7 +1451,7 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
1451 1451
             LM_ERR("Failed to add key name to pre-update exists query\n");
1452 1452
             goto error;
1453 1453
         }
1454
-        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
1454
+        if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
1455 1455
             LM_ERR("Failed to append redis command\n");
1456 1456
             goto error;
1457 1457
         }
... ...
@@ -1486,7 +1486,7 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
1486 1486
             }
1487 1487
         }
1488 1488
 
1489
-        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
1489
+        if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
1490 1490
             LM_ERR("Failed to append redis command\n");
1491 1491
             goto error;
1492 1492
         }
... ...
@@ -1601,7 +1601,7 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
1601 1601
             pkg_free(v.s);
1602 1602
         }
1603 1603
         update_queries++;
1604
-        if (db_redis_append_command_argv(con, query_v) != REDIS_OK) {
1604
+        if (db_redis_append_command_argv(con, query_v, 1) != REDIS_OK) {
1605 1605
             LM_ERR("Failed to append redis command\n");
1606 1606
             goto error;
1607 1607
         }
... ...
@@ -77,7 +77,7 @@ const db_key_t* _uk, const db_val_t* _uv, const int _n, const int _un);
77 77
  * Just like insert, but replace the row if it exists
78 78
  */
79 79
 int db_redis_replace(const db1_con_t* handle, const db_key_t* keys, const db_val_t* vals,
80
-		const int n, const int _un, const int _m);
80
+        const int n, const int _un, const int _m);
81 81
 
82 82
 /*
83 83
  * Store name of table that will be used by
... ...
@@ -41,7 +41,7 @@ struct redis_type {
41 41
 
42 42
 typedef struct redis_table redis_table_t;
43 43
 struct redis_table {
44
-	int version;
44
+    int version;
45 45
     redis_key_t *entry_keys;
46 46
     redis_type_t *types;
47 47
     struct str_hash_table columns;