Browse code

db_redis: Fix scanning large tables

* When querying large tables (e.g. pre-loading location by usrloc),
make sure to use O(1) when adding keys by prepending them to list.
* Increase batch size of redis scan command to reduce number of
redis queries.
* Batch creation of DB_ROW entries to free up memory allocated by
redis in heap regularly.
* Fix more issues reported by coverity.

Andreas Granig authored on 11/04/2018 15:28:32
Showing 3 changed files
... ...
@@ -25,7 +25,7 @@
25 25
 #include "redis_table.h"
26 26
 
27 27
 static void print_query(redis_key_t *query) {
28
-	redis_key_t *k;
28
+    redis_key_t *k;
29 29
 
30 30
     LM_DBG("Query dump:\n");
31 31
     for (k = query; k; k = k->next) {
... ...
@@ -355,6 +355,7 @@ int db_redis_get_reply(km_redis_con_t *con, void **reply) {
355 355
                 redisFree(con->con);
356 356
                 con->con = NULL;
357 357
             }
358
+            return -1;
358 359
         }
359 360
         // take commands from oldest to newest and re-do again,
360 361
         // but don't queue them once again in retry-mode
... ...
@@ -671,6 +671,14 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
671 671
             LM_ERR("Failed to add match pattern to scan query\n");
672 672
             goto err;
673 673
         }
674
+        if (db_redis_key_add_string(&query_v, "COUNT", 5) != 0) {
675
+            LM_ERR("Failed to add count command to scan query\n");
676
+            goto err;
677
+        }
678
+        if (db_redis_key_add_string(&query_v, "1000", 5) != 0) {
679
+            LM_ERR("Failed to add count value to scan query\n");
680
+            goto err;
681
+        }
674 682
         pkg_free(match); match = NULL;
675 683
 
676 684
         reply = db_redis_command_argv(con, query_v);
... ...
@@ -697,6 +705,7 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
697 705
                     table_name->len, table_name->s);
698 706
             goto err;
699 707
         }
708
+        LM_DBG("cursor is %lu\n", cursor);
700 709
 
701 710
         if (reply->element[1]->type != REDIS_REPLY_ARRAY) {
702 711
             LM_ERR("Invalid content type for scan on table '%.*s', expected array\n",
... ...
@@ -723,8 +732,8 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
723 732
                         j, table_name->len, table_name->s);
724 733
                 goto err;
725 734
             }
726
-            if (db_redis_key_add_string(query_keys, key->str, strlen(key->str)) != 0) {
727
-                LM_ERR("Failed to add redis key\n");
735
+            if (db_redis_key_prepend_string(query_keys, key->str, strlen(key->str)) != 0) {
736
+                LM_ERR("Failed to prepend redis key\n");
728 737
                 goto err;
729 738
             }
730 739
         }
... ...
@@ -746,6 +755,8 @@ static int db_redis_scan_query_keys(km_redis_con_t *con, const str *table_name,
746 755
     if (reply) {
747 756
         db_redis_free_reply(&reply);
748 757
     }
758
+
759
+    LM_DBG("got %lu entries by scan\n", i);
749 760
     return 0;
750 761
 
751 762
 err:
... ...
@@ -1053,7 +1064,7 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons
1053 1064
     redis_key_t *query_v = NULL;
1054 1065
     int num_rows = 0;
1055 1066
     redis_key_t *key;
1056
-    int j;
1067
+    int i, j, max;
1057 1068
 
1058 1069
     *_r = db_redis_new_result();
1059 1070
     if (!*_r) {
... ...
@@ -1078,6 +1089,16 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons
1078 1089
         }
1079 1090
     }
1080 1091
 
1092
+    // we allocate best case scenario (all rows match)
1093
+    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = *keys_count;
1094
+    if (db_allocate_rows(*_r) != 0) {
1095
+        LM_ERR("Failed to allocate memory for rows\n");
1096
+        return -1;
1097
+    }
1098
+    RES_COL_N(*_r) = _nc;
1099
+    // reset and increment in convert_row
1100
+    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
1101
+
1081 1102
     for (key = *keys; key; key = key->next) {
1082 1103
         redis_key_t *tmp = NULL;
1083 1104
         str *keyname = &(key->key);
... ...
@@ -1140,58 +1161,58 @@ static int db_redis_perform_query(const db1_con_t* _h, km_redis_con_t *con, cons
1140 1161
 
1141 1162
         db_redis_key_free(&query_v);
1142 1163
         query_v = NULL;
1143
-    }
1144
-
1145
-    // we allocate best case scenario (all rows match)
1146
-    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = num_rows;
1147
-    if (db_allocate_rows(*_r) != 0) {
1148
-        LM_ERR("Failed to allocate memory for rows\n");
1149
-        return -1;
1150
-    }
1151
-    RES_COL_N(*_r) = _nc;
1152
-    // reset and increment in convert_row
1153
-    RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
1154 1164
 
1155
-    for (key = *keys; key; key = key->next) {
1156
-        // get reply for EXISTS query
1157
-        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
1158
-            LM_ERR("Failed to get reply for query: %s\n",
1159
-                    con->con->errstr);
1160
-            goto error;
1161
-        }
1162
-        db_redis_check_reply(con, reply, error);
1163
-        if (reply->integer == 0) {
1164
-            LM_DBG("key does not exist, returning no row for query\n");
1165
-            db_redis_free_reply(&reply);
1166
-            // also free next reply, as this is a null row for the HMGET
1167
-            db_redis_get_reply(con, (void**)&reply);
1168
-            db_redis_check_reply(con, reply, error);
1169
-            db_redis_free_reply(&reply);
1170
-            continue;
1171
-        }
1172
-        db_redis_free_reply(&reply);
1165
+        max = 0;
1166
+        if (*keys_count == num_rows)
1167
+            max = (*keys_count) % 1000;
1168
+        else if (num_rows % 1000 == 0)
1169
+            max = 1000;
1170
+
1171
+        if (max) {
1172
+            LM_DBG("fetching next %d results\n", max);
1173
+            for (i = 0; i < max; ++i) {
1174
+                // get reply for EXISTS query
1175
+                if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
1176
+                    LM_ERR("Failed to get reply for query: %s\n",
1177
+                            con->con->errstr);
1178
+                    goto error;
1179
+                }
1180
+                db_redis_check_reply(con, reply, error);
1181
+                if (reply->integer == 0) {
1182
+                    LM_DBG("key does not exist, returning no row for query\n");
1183
+                    db_redis_free_reply(&reply);
1184
+                    // also free next reply, as this is a null row for the HMGET
1185
+                    db_redis_get_reply(con, (void**)&reply);
1186
+                    db_redis_check_reply(con, reply, error);
1187
+                    db_redis_free_reply(&reply);
1188
+                    continue;
1189
+                }
1190
+                db_redis_free_reply(&reply);
1173 1191
 
1174
-        // get reply for actual HMGET query
1175
-        if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
1176
-            LM_ERR("Failed to get reply for query: %s\n",
1177
-                    con->con->errstr);
1178
-            goto error;
1179
-        }
1180
-        db_redis_check_reply(con, reply, error);
1181
-        if (reply->type != REDIS_REPLY_ARRAY) {
1182
-            LM_ERR("Unexpected reply, expected array\n");
1183
-            goto error;
1184
-        }
1185
-        LM_DBG("dumping full query reply for row\n");
1186
-        db_redis_dump_reply(reply);
1192
+                // get reply for actual HMGET query
1193
+                if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
1194
+                    LM_ERR("Failed to get reply for query: %s\n",
1195
+                            con->con->errstr);
1196
+                    goto error;
1197
+                }
1198
+                db_redis_check_reply(con, reply, error);
1199
+                if (reply->type != REDIS_REPLY_ARRAY) {
1200
+                    LM_ERR("Unexpected reply, expected array\n");
1201
+                    goto error;
1202
+                }
1203
+                LM_DBG("dumping full query reply for row\n");
1204
+                db_redis_dump_reply(reply);
1187 1205
 
1188
-        if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
1189
-            LM_ERR("Failed to convert redis reply for row\n");
1190
-            goto error;
1206
+                if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
1207
+                    LM_ERR("Failed to convert redis reply for row\n");
1208
+                    goto error;
1209
+                }
1210
+                db_redis_free_reply(&reply);
1211
+            }
1191 1212
         }
1192
-        db_redis_free_reply(&reply);
1193 1213
     }
1194 1214
 
1215
+    LM_DBG("done performing query\n");
1195 1216
     return 0;
1196 1217
 
1197 1218
 error:
... ...
@@ -1199,7 +1220,7 @@ error:
1199 1220
     db_redis_key_free(&query_v);
1200 1221
     if(reply)
1201 1222
         db_redis_free_reply(&reply);
1202
-    if(_r && *_r) {
1223
+    if(*_r) {
1203 1224
         db_redis_free_result((db1_con_t*)_h, *_r); *_r = NULL;
1204 1225
     }
1205 1226
     return -1;
... ...
@@ -1244,13 +1265,13 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
1244 1265
             goto error;
1245 1266
     }
1246 1267
 
1247
-    LM_DBG("+++ delete all keys\n");
1268
+    LM_DBG("delete all keys\n");
1248 1269
     for (k = keys; k; k = k->next) {
1249 1270
         redis_key_t *all_type_key;
1250 1271
         str *key = &k->key;
1251 1272
         redis_key_t *tmp = NULL;
1252 1273
         int row_match;
1253
-        LM_DBG("+++ delete key '%.*s'\n", key->len, key->s);
1274
+        LM_DBG("delete key '%.*s'\n", key->len, key->s);
1254 1275
 
1255 1276
         if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
1256 1277
             LM_ERR("Failed to add exists command to pre-delete query\n");
... ...
@@ -1399,7 +1420,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con
1399 1420
         }
1400 1421
 
1401 1422
         //db_redis_key_free(&type_keys);
1402
-        LM_DBG("+++ done with loop '%.*s'\n", k->key.len, k->key.s);
1423
+        LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s);
1403 1424
     }
1404 1425
     db_redis_key_free(&type_keys);
1405 1426
     db_redis_key_free(&all_type_keys);
... ...
@@ -1677,6 +1698,7 @@ int db_redis_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
1677 1698
         LM_ERR("db result is null\n");
1678 1699
         return -1;
1679 1700
     }
1701
+
1680 1702
     con = REDIS_CON(_h);
1681 1703
     if (con && con->con == NULL) {
1682 1704
         if (db_redis_connect(con) != 0) {
... ...
@@ -505,6 +505,8 @@ int db_redis_parse_keys(km_redis_con_t *con) {
505 505
     p = start = redis_keys.s;
506 506
     state = DBREDIS_KEYS_TABLE_ST;
507 507
     do {
508
+        type = NULL;
509
+        key = NULL;
508 510
         switch(state) {
509 511
             case DBREDIS_KEYS_TABLE_ST:
510 512
                 while(p != end && *p != '=')
... ...
@@ -605,6 +607,10 @@ int db_redis_parse_keys(km_redis_con_t *con) {
605 607
     return 0;
606 608
 
607 609
 err:
610
+    if (type)
611
+        pkg_free(type);
612
+    if (key)
613
+        pkg_free(key);
608 614
     db_redis_free_tables(con);
609 615
     return -1;
610 616
 }
... ...
@@ -627,7 +633,8 @@ int db_redis_parse_schema(km_redis_con_t *con) {
627 633
     char full_path[_POSIX_PATH_MAX + 1];
628 634
     int path_len;
629 635
     struct stat fstat;
630
-    char c;
636
+    unsigned char c;
637
+    int cc;
631 638
 
632 639
     enum {
633 640
         DBREDIS_SCHEMA_COLUMN_ST,
... ...
@@ -651,6 +658,10 @@ int db_redis_parse_schema(km_redis_con_t *con) {
651 658
     }
652 659
 
653 660
     dir_name = (char*)pkg_malloc((redis_schema_path.len + 1) * sizeof(char));
661
+    if (!dir_name) {
662
+        LM_ERR("Failed to allocate memory for schema directory name\n");
663
+        goto err;
664
+    }
654 665
     strncpy(dir_name, redis_schema_path.s, redis_schema_path.len);
655 666
     dir_name[redis_schema_path.len] = '\0';
656 667
     srcdir = opendir(dir_name);
... ...
@@ -718,14 +729,15 @@ int db_redis_parse_schema(km_redis_con_t *con) {
718 729
                 goto err;
719 730
             }
720 731
 
721
-            c = fgetc(fin);
732
+            cc = fgetc(fin);
733
+            c = (unsigned char)cc;
722 734
 
723 735
             if (c == '\r')
724 736
                 continue;
725 737
             //LM_DBG("parsing char %c, buf is '%s' at pos %lu\n", c, buf, bufpos);
726 738
             switch(state) {
727 739
                 case DBREDIS_SCHEMA_COLUMN_ST:
728
-                    if (c == EOF) {
740
+                    if (cc == EOF) {
729 741
                         LM_ERR("Unexpected end of file in schema column name of file %s\n", full_path);
730 742
                         goto err;
731 743
                     }
... ...
@@ -751,7 +763,7 @@ int db_redis_parse_schema(km_redis_con_t *con) {
751 763
                     LM_DBG("found column name '%.*s'\n", column_name.len, column_name.s);
752 764
                     break;
753 765
                 case DBREDIS_SCHEMA_TYPE_ST:
754
-                    if (c == EOF) {
766
+                    if (cc == EOF) {
755 767
                         LM_ERR("Unexpected end of file in schema column type of file %s\n", full_path);
756 768
                         goto err;
757 769
                     }
... ...
@@ -791,7 +803,7 @@ int db_redis_parse_schema(km_redis_con_t *con) {
791 803
                     bufptr = buf;
792 804
                     break;
793 805
                 case DBREDIS_SCHEMA_VERSION_ST:
794
-                    if (c != '\n' && c != EOF) {
806
+                    if (c != '\n' && cc != EOF) {
795 807
                         *bufptr = c;
796 808
                         bufptr++;
797 809
                         continue;
... ...
@@ -804,7 +816,7 @@ int db_redis_parse_schema(km_redis_con_t *con) {
804 816
                     goto fileend;
805 817
                     break;
806 818
             }
807
-        } while (c != EOF);
819
+        } while (cc != EOF);
808 820
 
809 821
 fileend:
810 822
         fclose(fin);