Browse code

db_redis: use cluster api

Riccardo Villa authored on 05/01/2022 11:05:41
Showing 3 changed files
... ...
@@ -25,6 +25,15 @@
25 25
 #include "redis_table.h"
26 26
 #include "redis_dbase.h"
27 27
 
28
+#ifdef WITH_HIREDIS_CLUSTER
29
+static unsigned int MAX_URL_LENGTH = 1023;
30
+#define redisCommand redisClusterCommand
31
+#define redisFree redisClusterFree
32
+#define redisCommandArgv redisClusterCommandArgv
33
+#define redisAppendCommandArgv redisClusterAppendCommandArgv
34
+#define redisGetReply redisClusterGetReply
35
+#endif
36
+
28 37
 extern int db_redis_verbosity;
29 38
 
30 39
 static void print_query(redis_key_t *query) {
... ...
@@ -104,21 +113,59 @@ static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
104 113
 int db_redis_connect(km_redis_con_t *con) {
105 114
     struct timeval tv;
106 115
     redisReply *reply;
116
+#ifndef WITH_HIREDIS_CLUSTER
107 117
     int db;
118
+#endif
108 119
 
109 120
     tv.tv_sec = 1;
110 121
     tv.tv_usec = 0;
111
-
122
+#ifndef WITH_HIREDIS_CLUSTER
112 123
     db = atoi(con->id->database);
124
+#endif
113 125
     reply = NULL;
114 126
 
115 127
     // TODO: introduce require_master mod-param and check if we're indeed master
116 128
     // TODO: on carrier, if we have db fail-over, the currently connected
117 129
     // redis server will become slave without dropping connections?
118
-
130
+#ifdef WITH_HIREDIS_CLUSTER
131
+    int status;
132
+    char hosts[MAX_URL_LENGTH];
133
+    char* host_begin;
134
+    char* host_end;
135
+    LM_DBG("connecting to redis cluster at %.*s\n", con->id->url.len, con->id->url.s);
136
+    host_begin = strstr(con->id->url.s, "redis://");
137
+    if (host_begin) {
138
+        host_begin += 8;
139
+    } else {
140
+        LM_ERR("invalid url scheme\n");
141
+        goto err;
142
+    }
143
+    host_end = strstr(host_begin, "/");
144
+    if (! host_end) {
145
+        LM_ERR("invalid url: cannot find end of host part\n");
146
+        goto err;
147
+    }
148
+    if ((host_end - host_begin) > (MAX_URL_LENGTH-1)) {
149
+        LM_ERR("url too long\n");
150
+        goto err;
151
+    }
152
+    strncpy(hosts, host_begin, (host_end - host_begin));
153
+    hosts[MAX_URL_LENGTH-1] = '\0';
154
+    con->con = redisClusterContextInit();
155
+    if (! con->con) {
156
+        LM_ERR("no private memory left\n");
157
+        goto err;
158
+    }
159
+    redisClusterSetOptionAddNodes(con->con, hosts);
160
+    redisClusterSetOptionConnectTimeout(con->con, tv);
161
+    status = redisClusterConnect2(con->con);
162
+    if (status != REDIS_OK) {
163
+        LM_ERR("cannot open connection to cluster with hosts: %s, error: %s\n", hosts, con->con->errstr);
164
+        goto err;
165
+    }
166
+#else
119 167
     LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port);
120 168
     con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);
121
-
122 169
     if (!con->con) {
123 170
         LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
124 171
         goto err;
... ...
@@ -128,6 +175,7 @@ int db_redis_connect(km_redis_con_t *con) {
128 175
             con->con->errstr);
129 176
         goto err;
130 177
     }
178
+#endif
131 179
 
132 180
     if (con->id->password) {
133 181
         reply = redisCommand(con->con, "AUTH %s", con->id->password);
... ...
@@ -144,6 +192,7 @@ int db_redis_connect(km_redis_con_t *con) {
144 192
         freeReplyObject(reply); reply = NULL;
145 193
     }
146 194
 
195
+#ifndef WITH_HIREDIS_CLUSTER
147 196
     reply = redisCommand(con->con, "PING");
148 197
     if (!reply) {
149 198
         LM_ERR("cannot ping server on connection %.*s: %s\n",
... ...
@@ -169,8 +218,10 @@ int db_redis_connect(km_redis_con_t *con) {
169 218
         goto err;
170 219
     }
171 220
     freeReplyObject(reply); reply = NULL;
221
+#endif
172 222
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
173 223
 
224
+#ifndef WITH_HIREDIS_CLUSTER
174 225
     reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA);
175 226
     if (!reply) {
176 227
         LM_ERR("failed to load LUA script to server %.*s: %s\n",
... ...
@@ -194,6 +245,7 @@ int db_redis_connect(km_redis_con_t *con) {
194 245
     }
195 246
     strcpy(con->srem_key_lua, reply->str);
196 247
     freeReplyObject(reply); reply = NULL;
248
+#endif
197 249
     LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);
198 250
 
199 251
     return 0;
... ...
@@ -291,6 +343,63 @@ void db_redis_free_connection(struct pool_con* con) {
291 343
     pkg_free(_c);
292 344
 }
293 345
 
346
+#ifdef WITH_HIREDIS_CLUSTER
347
+void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node) {
348
+    char **argv = NULL;
349
+    int argc;
350
+    #define MAX_CMD_LENGTH 256
351
+    char cmd[MAX_CMD_LENGTH] = "";
352
+    size_t cmd_len = MAX_CMD_LENGTH - 1;
353
+    int i;
354
+
355
+    print_query(query);
356
+
357
+    argc = db_redis_key_list2arr(query, &argv);
358
+    if (argc < 0) {
359
+        LM_ERR("Failed to allocate memory for query array\n");
360
+        return NULL;
361
+    }
362
+    LM_DBG("query has %d args\n", argc);
363
+
364
+    for (i=0; i<argc; i++)
365
+    {
366
+        size_t arg_len = strlen(argv[i]);
367
+        if (arg_len > cmd_len)
368
+                        break;
369
+        strncat(cmd, argv[i], cmd_len);
370
+        cmd_len = cmd_len - arg_len;
371
+        if (cmd_len == 0)
372
+            break;
373
+        
374
+        if (i != argc - 1) {
375
+            strncat(cmd, " ", cmd_len);
376
+            cmd_len--;
377
+        }
378
+
379
+    }
380
+
381
+    LM_DBG("cmd is %s\n", cmd);
382
+
383
+    redisReply *reply = redisClusterCommandToNode(con->con, node, cmd);
384
+    if (con->con->err != REDIS_OK) {
385
+        LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
386
+        if (db_redis_connect(con) != 0) {
387
+            LM_ERR("Failed to reconnect to redis db\n");
388
+            pkg_free(argv);
389
+            if (con->con) {
390
+                redisFree(con->con);
391
+                con->con = NULL;
392
+            }
393
+            return NULL;
394
+        }
395
+        reply = redisClusterCommandToNode(con->con, node, cmd);
396
+    }
397
+    pkg_free(argv);
398
+    return reply;
399
+}
400
+    
401
+#endif
402
+
294 403
 void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
295 404
     char **argv = NULL;
296 405
     int argc;
... ...
@@ -23,14 +23,19 @@
23 23
 #ifndef _REDIS_CONNECTION_H_
24 24
 #define _REDIS_CONNECTION_H_
25 25
 
26
+#ifdef WITH_HIREDIS_CLUSTER
27
+#include <hircluster.h>
28
+#else
26 29
 #ifdef WITH_HIREDIS_PATH
27 30
 #include <hiredis/hiredis.h>
28 31
 #else
29 32
 #include <hiredis.h>
30 33
 #endif
34
+#endif
31 35
 
32 36
 #include "db_redis_mod.h"
33 37
 
38
+#ifndef WITH_REDIS_CLUSTER
34 39
 #define db_redis_check_reply(con, reply, err) do { \
35 40
     if (!(reply) && !(con)->con) { \
36 41
         LM_ERR("Failed to fetch type entry: no connection to server\n"); \
... ...
@@ -49,6 +54,26 @@
49 54
         goto err; \
50 55
     } \
51 56
 } while(0);
57
+#else
58
+#define db_redis_check_reply(con, reply, err) do { \
59
+    if (!(reply) && !(con)->con) { \
60
+        LM_ERR("Failed to fetch type entry: no connection to server\n"); \
61
+        goto err; \
62
+    } \
63
+    if (!(reply)) { \
64
+        LM_ERR("Failed to fetch type entry: %s\n", \
65
+                (con)->con->errstr); \
66
+        redisClusterFree((con)->con); \
67
+        (con)->con = NULL; \
68
+        goto err; \
69
+    } \
70
+    if ((reply)->type == REDIS_REPLY_ERROR) { \
71
+        LM_ERR("Failed to fetch type entry: %s\n", \
72
+                (reply)->str); \
73
+        goto err; \
74
+    } \
75
+} while(0);
76
+#endif
52 77
 
53 78
 typedef struct redis_key redis_key_t;
54 79
 
... ...
@@ -61,8 +86,11 @@ typedef struct km_redis_con {
61 86
     struct db_id* id;
62 87
     unsigned int ref;
63 88
     struct pool_con* next;
64
-
89
+#ifdef WITH_HIREDIS_CLUSTER
90
+    redisClusterContext *con;
91
+#else
65 92
     redisContext *con;
93
+#endif
66 94
     redis_command_t *command_queue;
67 95
     unsigned int append_counter;
68 96
     struct str_hash_table tables;
... ...
@@ -86,4 +114,8 @@ void db_redis_consume_replies(km_redis_con_t *con);
86 114
 void db_redis_free_reply(redisReply **reply);
87 115
 const char *db_redis_get_error(km_redis_con_t *con);
88 116
 
117
+#ifdef WITH_HIREDIS_CLUSTER
118
+void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node);
119
+#endif
120
+
89 121
 #endif /* _REDIS_CONNECTION_H_ */
... ...
@@ -832,10 +832,23 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc
832 832
         goto err;
833 833
     }
834 834
 
835
+#ifdef WITH_HIREDIS_CLUSTER
836
+nodeIterator niter;
837
+cluster_node *node;
838
+initNodeIterator(&niter, con->con);
839
+while ((node = nodeNext(&niter)) != NULL) {
840
+    if (node->role != REDIS_ROLE_MASTER)
841
+        continue;
842
+    reply = db_redis_command_argv_to_node(con, query_v, node);
843
+    if (!reply) {
844
+        LM_ERR("Invalid null reply from node %s\n", node->addr);
845
+        goto err;
846
+    }
847
+
848
+#else
835 849
     reply = db_redis_command_argv(con, query_v);
836
-    db_redis_key_free(&query_v);
850
+#endif
837 851
     db_redis_check_reply(con, reply, err);
838
-
839 852
     keys_list = reply;
840 853
 
841 854
 #endif
... ...
@@ -880,6 +893,10 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc
880 893
     } while (cursor > 0);
881 894
 #endif
882 895
 
896
+#ifdef WITH_HIREDIS_CLUSTER
897
+    }
898
+#endif    
899
+
883 900
     // for full table scans, we have to manually match all given keys
884 901
     // but only do this once for repeated invocations
885 902
     if (!*manual_keys) {
... ...
@@ -898,6 +915,8 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc
898 915
     if (reply) {
899 916
         db_redis_free_reply(&reply);
900 917
     }
918
+    
919
+    db_redis_key_free(&query_v);
901 920
 
902 921
     LM_DBG("got %lu entries by scan\n", (unsigned long) i);
903 922
     return 0;
... ...
@@ -1636,6 +1655,10 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
1636 1655
     redis_key_t *type_key;
1637 1656
     redis_key_t *set_key;
1638 1657
 
1658
+#ifdef WITH_HIREDIS_CLUSTER
1659
+    long long scard;
1660
+#endif
1661
+
1639 1662
     if (!*keys_count && do_table_scan) {
1640 1663
         if (!ts_scan_start)
1641 1664
             LM_WARN("performing full table scan on table '%.*s' while performing delete\n",
... ...
@@ -1806,6 +1829,57 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
1806 1829
         for (type_key = type_keys, set_key = set_keys; type_key;
1807 1830
                 type_key = type_key->next, set_key = set_key->next) {
1808 1831
 
1832
+#ifdef WITH_HIREDIS_CLUSTER
1833
+            if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
1834
+                LM_ERR("Failed to add srem command to post-delete query\n");
1835
+                goto error;
1836
+            }
1837
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
1838
+                LM_ERR("Failed to add key to delete query\n");
1839
+                goto error;
1840
+            }
1841
+            if (db_redis_key_add_str(&query_v, key) != 0) {
1842
+                LM_ERR("Failed to add key to delete query\n");
1843
+                goto error;
1844
+            }
1845
+            reply = db_redis_command_argv(con, query_v);
1846
+            db_redis_key_free(&query_v);
1847
+            db_redis_check_reply(con, reply, error);
1848
+            db_redis_free_reply(&reply);
1849
+
1850
+            if (db_redis_key_add_string(&query_v, "SCARD", 5) != 0) {
1851
+                LM_ERR("Failed to add scard command to post-delete query\n");
1852
+                goto error;
1853
+            }
1854
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
1855
+                LM_ERR("Failed to add key to delete query\n");
1856
+                goto error;
1857
+            }
1858
+            reply = db_redis_command_argv(con, query_v);
1859
+            db_redis_key_free(&query_v);
1860
+            db_redis_check_reply(con, reply, error);
1861
+            scard = reply->integer;
1862
+            db_redis_free_reply(&reply);
1863
+
1864
+            if (scard != 0)
1865
+                continue;
1866
+            
1867
+            if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
1868
+                LM_ERR("Failed to add srem command to post-delete query\n");
1869
+                goto error;
1870
+            }
1871
+            if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
1872
+                LM_ERR("Failed to add key to delete query\n");
1873
+                goto error;
1874
+            }
1875
+            if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
1876
+                LM_ERR("Failed to add key to delete query\n");
1877
+                goto error;
1878
+            }
1879
+            reply = db_redis_command_argv(con, query_v);
1880
+            db_redis_key_free(&query_v);
1881
+            db_redis_check_reply(con, reply, error);
1882
+#else
1809 1883
             if (db_redis_key_add_string(&query_v, "EVALSHA", 7) != 0) {
1810 1884
                 LM_ERR("Failed to add srem command to post-delete query\n");
1811 1885
                 goto error;
... ...
@@ -1834,6 +1908,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
1834 1908
             db_redis_key_free(&query_v);
1835 1909
             db_redis_check_reply(con, reply, error);
1836 1910
             db_redis_free_reply(&reply);
1911
+#endif
1837 1912
         }
1838 1913
         LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s);
1839 1914
         db_redis_key_free(&type_keys);
... ...
@@ -1883,6 +1958,9 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
1883 1958
     redis_key_t *new_type_keys = NULL;
1884 1959
     int new_type_keys_count = 0;
1885 1960
     redis_key_t *all_type_key;
1961
+#ifdef WITH_HIREDIS_CLUSTER
1962
+    long long scard;
1963
+#endif
1886 1964
 
1887 1965
     if (!(*keys_count) && do_table_scan) {
1888 1966
         LM_WARN("performing full table scan on table '%.*s' while performing update\n",
... ...
@@ -2194,6 +2272,58 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
2194 2272
 
2195 2273
                 db_redis_key_free(&query_v);
2196 2274
 
2275
+#ifdef WITH_HIREDIS_CLUSTER
2276
+                if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
2277
+                    LM_ERR("Failed to add srem command to post-delete query\n");
2278
+                    goto error;
2279
+                }
2280
+                if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
2281
+                    LM_ERR("Failed to add key to delete query\n");
2282
+                    goto error;
2283
+                }
2284
+                if (db_redis_key_add_str(&query_v, key) != 0) {
2285
+                    LM_ERR("Failed to add key to delete query\n");
2286
+                    goto error;
2287
+                }
2288
+                reply = db_redis_command_argv(con, query_v);
2289
+                db_redis_key_free(&query_v);
2290
+                db_redis_check_reply(con, reply, error);
2291
+                db_redis_free_reply(&reply);
2292
+
2293
+                if (db_redis_key_add_string(&query_v, "SCARD", 5) != 0) {
2294
+                    LM_ERR("Failed to add scard command to post-delete query\n");
2295
+                    goto error;
2296
+                }
2297
+                if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
2298
+                    LM_ERR("Failed to add key to delete query\n");
2299
+                    goto error;
2300
+                }
2301
+                reply = db_redis_command_argv(con, query_v);
2302
+                db_redis_key_free(&query_v);
2303
+                db_redis_check_reply(con, reply, error);
2304
+                scard = reply->integer;
2305
+                db_redis_free_reply(&reply);
2306
+
2307
+                if (scard != 0)
2308
+                    continue;
2309
+                
2310
+                if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) {
2311
+                    LM_ERR("Failed to add srem command to post-delete query\n");
2312
+                    goto error;
2313
+                }
2314
+                if (db_redis_key_add_str(&query_v, &set_key->key) != 0) {
2315
+                    LM_ERR("Failed to add key to delete query\n");
2316
+                    goto error;
2317
+                }
2318
+                if (db_redis_key_add_str(&query_v, &type_key->key) != 0) {
2319
+                    LM_ERR("Failed to add key to delete query\n");
2320
+                    goto error;
2321
+                }
2322
+                reply = db_redis_command_argv(con, query_v);
2323
+                db_redis_key_free(&query_v);
2324
+                db_redis_check_reply(con, reply, error);
2325
+                update_queries++;
2326
+#else
2197 2327
                 if (db_redis_key_add_string(&query_v, "EVAL", 4) != 0) {
2198 2328
                     LM_ERR("Failed to add srem command to post-delete query\n");
2199 2329
                     goto error;
... ...
@@ -2226,6 +2356,7 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con
2226 2356
                 }
2227 2357
 
2228 2358
                 db_redis_key_free(&query_v);
2359
+#endif
2229 2360
             }
2230 2361
         }
2231 2362