Browse code

htable: added startup synchronization over dmq

Charles Chance authored on 28/09/2018 17:09:28
Showing 4 changed files
... ...
@@ -647,6 +647,28 @@ modparam("htable", "db_expires", 1)
647 647
 ...
648 648
 modparam("htable", "enable_dmq", 1)
649 649
 ...
650
+</programlisting>
651
+		</example>
652
+	</section>
653
+	<section id="htable.p.dmq_init_sync">
654
+		<title><varname>dmq_init_sync</varname> (integer)</title>
655
+		<para>
656
+			If set to 1, will request synchronization from other nodes at startup. It applies
657
+			to all tables having the "dmqreplicate" parameter set. As above, it is important to 
658
+			ensure the definition (size, autoexpire etc.) of replicated tables is identical 
659
+			across all instances.
660
+		</para>
661
+		<para>
662
+		<emphasis>
663
+			Default value is 0.
664
+		</emphasis>
665
+		</para>
666
+		<example>
667
+		<title>Set <varname>dmq_init_sync</varname> parameter</title>
668
+		<programlisting format="linespecific">
669
+...
670
+modparam("htable", "dmq_init_sync", 1)
671
+...
650 672
 </programlisting>
651 673
 		</example>
652 674
 	</section>
... ...
@@ -24,11 +24,6 @@
24 24
 #include "ht_dmq.h"
25 25
 #include "ht_api.h"
26 26
 
27
-static str ht_dmq_content_type = str_init("application/json");
28
-static str dmq_200_rpl  = str_init("OK");
29
-static str dmq_400_rpl  = str_init("Bad Request");
30
-static str dmq_500_rpl  = str_init("Server Internal Error");
31
-
32 27
 typedef struct _ht_dmq_repdata {
33 28
 	int action;
34 29
 	str htname;
... ...
@@ -39,10 +34,155 @@ typedef struct _ht_dmq_repdata {
39 34
 	int expire;
40 35
 } ht_dmq_repdata_t;
41 36
 
37
+typedef struct _ht_dmq_jdoc_cell_group {
38
+	int count;
39
+	int size;
40
+	srjson_doc_t jdoc;
41
+	srjson_t *jdoc_cells;
42
+} ht_dmq_jdoc_cell_group_t;
43
+
44
+static str ht_dmq_content_type = str_init("application/json");
45
+static str dmq_200_rpl  = str_init("OK");
46
+static str dmq_400_rpl  = str_init("Bad Request");
47
+static str dmq_500_rpl  = str_init("Server Internal Error");
48
+static int dmq_cell_group_empty_size = 12; // {"cells":[]}
49
+static int dmq_cell_group_max_size = 60000;
50
+static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group;
51
+int ht_dmq_init_sync;
52
+
42 53
 dmq_api_t ht_dmqb;
43 54
 dmq_peer_t* ht_dmq_peer = NULL;
44 55
 dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
45 56
 
57
+int ht_dmq_send(str* body, dmq_node_t* node);
58
+int ht_dmq_send_sync(dmq_node_t* node);
59
+int ht_dmq_handle_sync(srjson_doc_t* jdoc);
60
+
61
+static int ht_dmq_cell_group_init(void) {
62
+
63
+	if (ht_dmq_jdoc_cell_group.jdoc.root)
64
+		return 0; // already initialised
65
+
66
+	ht_dmq_jdoc_cell_group.count = 0;
67
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
68
+
69
+	srjson_InitDoc(&ht_dmq_jdoc_cell_group.jdoc, NULL);
70
+
71
+	ht_dmq_jdoc_cell_group.jdoc.root = srjson_CreateObject(&ht_dmq_jdoc_cell_group.jdoc);
72
+	if (ht_dmq_jdoc_cell_group.jdoc.root==NULL) {
73
+		LM_ERR("cannot create json root object! \n");
74
+		return -1;
75
+	}
76
+
77
+	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
78
+	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
79
+		LM_ERR("cannot create json cells array! \n");
80
+		srjson_DestroyDoc(&ht_dmq_jdoc_cell_group.jdoc);
81
+		return -1;
82
+	}
83
+
84
+	return 0;
85
+}
86
+
87
+static int ht_dmq_cell_group_write(str* htname, ht_cell_t* ptr) {
88
+
89
+	// jsonify cell and add to array
90
+
91
+	str tmp;
92
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
93
+	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
94
+	srjson_t * jdoc_cell = srjson_CreateObject(jdoc);
95
+
96
+	if(!jdoc_cell) {
97
+		LM_ERR("cannot create cell json root\n");
98
+		return -1;
99
+	}
100
+
101
+	// add json overhead
102
+	if(ptr->flags&AVP_VAL_STR) {
103
+		ht_dmq_jdoc_cell_group.size += 54; // {"htname":"","cname":"","type":,"strval":"","expire":}
104
+	} else {
105
+		ht_dmq_jdoc_cell_group.size += 52; // {"htname":"","cname":"","type":,"intval":,"expire":}
106
+	}
107
+
108
+	srjson_AddStrToObject(jdoc, jdoc_cell, "htname", htname->s, htname->len);
109
+	ht_dmq_jdoc_cell_group.size += htname->len;
110
+
111
+	srjson_AddStrToObject(jdoc, jdoc_cell, "cname", ptr->name.s, ptr->name.len);
112
+	ht_dmq_jdoc_cell_group.size += ptr->name.len;
113
+
114
+	if (ptr->flags&AVP_VAL_STR) {
115
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", AVP_VAL_STR);
116
+		ht_dmq_jdoc_cell_group.size += 1;
117
+		srjson_AddStrToObject(jdoc, jdoc_cell, "strval", ptr->value.s.s, ptr->value.s.len);
118
+		ht_dmq_jdoc_cell_group.size += ptr->value.s.len;
119
+	} else {
120
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", 0);
121
+		ht_dmq_jdoc_cell_group.size += 1;
122
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "intval", ptr->value.n);
123
+		tmp.s = sint2str((long)ptr->value.n, &tmp.len);
124
+		ht_dmq_jdoc_cell_group.size += tmp.len;
125
+	}
126
+
127
+	srjson_AddNumberToObject(jdoc, jdoc_cell, "expire", ptr->expire);
128
+	tmp.s = sint2str((long)ptr->expire, &tmp.len);
129
+	ht_dmq_jdoc_cell_group.size += tmp.len;
130
+
131
+	srjson_AddItemToArray(jdoc, jdoc_cells, jdoc_cell);
132
+
133
+	ht_dmq_jdoc_cell_group.count++;
134
+
135
+	return 0;
136
+}
137
+
138
+static int ht_dmq_cell_group_flush(dmq_node_t* node) {
139
+
140
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
141
+	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
142
+
143
+	srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells);
144
+
145
+	LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
146
+	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
147
+	if(jdoc->buf.s==NULL) {
148
+		LM_ERR("unable to serialize data\n");
149
+		return -1;
150
+	}
151
+	jdoc->buf.len = strlen(jdoc->buf.s);
152
+
153
+	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
154
+	if (ht_dmq_send(&jdoc->buf, node)!=0) {
155
+		LM_ERR("unable to send data\n");
156
+		return -1;
157
+	}
158
+
159
+	LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
160
+
161
+	srjson_Delete(jdoc, jdoc_cells);
162
+	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
163
+	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
164
+		LM_ERR("cannot re-create json cells array! \n");
165
+		return -1;
166
+	}
167
+
168
+	ht_dmq_jdoc_cell_group.count = 0;
169
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
170
+
171
+	return 0;
172
+}
173
+
174
+static void ht_dmq_cell_group_destroy() {
175
+
176
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
177
+
178
+	if(jdoc->buf.s!=NULL) {
179
+		jdoc->free_fn(jdoc->buf.s);
180
+		jdoc->buf.s = NULL;
181
+	}
182
+	srjson_DestroyDoc(jdoc);
183
+
184
+}
185
+
46 186
 /**
47 187
  * @brief add notification peer
48 188
  */
... ...
@@ -59,7 +199,7 @@ int ht_dmq_initialize()
59 199
 	}
60 200
 
61 201
 	not_peer.callback = ht_dmq_handle_msg;
62
-	not_peer.init_callback = NULL;
202
+	not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL);
63 203
 	not_peer.description.s = "htable";
64 204
 	not_peer.description.len = 6;
65 205
 	not_peer.peer_id.s = "htable";
... ...
@@ -76,14 +216,20 @@ error:
76 216
 	return -1;
77 217
 }
78 218
 
79
-int ht_dmq_broadcast(str* body)
80
-{
219
+int ht_dmq_send(str* body, dmq_node_t* node) {
81 220
 	if (!ht_dmq_peer) {
82 221
 		LM_ERR("ht_dmq_peer is null!\n");
83 222
 		return -1;
84 223
 	}
85
-	LM_DBG("sending broadcast...\n");
86
-	ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
224
+	if (node) {
225
+		LM_DBG("sending dmq message ...\n");
226
+		ht_dmqb.send_message(ht_dmq_peer, body, node,
227
+				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
228
+	} else {
229
+		LM_DBG("sending dmq broadcast...\n");
230
+		ht_dmqb.bcast_message(ht_dmq_peer, body, 0,
231
+				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
232
+	}
87 233
 	return 0;
88 234
 }
89 235
 
... ...
@@ -138,35 +284,45 @@ int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq
138 284
 		}
139 285
 	}
140 286
 
141
-	for(it=jdoc.root->child; it; it = it->next)
142
-	{
143
-		LM_DBG("found field: %s\n", it->string);
144
-		if (strcmp(it->string, "action")==0) {
145
-			action = SRJSON_GET_INT(it);
146
-		} else if (strcmp(it->string, "htname")==0) {
147
-			htname.s = it->valuestring;
148
-			htname.len = strlen(htname.s);
149
-		} else if (strcmp(it->string, "cname")==0) {
150
-			cname.s = it->valuestring;
151
-			cname.len = strlen(cname.s);
152
-		} else if (strcmp(it->string, "type")==0) {
153
-			type = SRJSON_GET_INT(it);
154
-		} else if (strcmp(it->string, "strval")==0) {
155
-			val.s.s = it->valuestring;
156
-			val.s.len = strlen(val.s.s);
157
-		} else if (strcmp(it->string, "intval")==0) {
158
-			val.n = SRJSON_GET_INT(it);
159
-		} else if (strcmp(it->string, "mode")==0) {
160
-			mode = SRJSON_GET_INT(it);
287
+	if (unlikely(strcmp(jdoc.root->child->string, "cells")==0)) {
288
+		ht_dmq_handle_sync(&jdoc);
289
+	} else {
290
+
291
+		for(it=jdoc.root->child; it; it = it->next)
292
+		{
293
+			LM_DBG("found field: %s\n", it->string);
294
+			if (strcmp(it->string, "action")==0) {
295
+				action = SRJSON_GET_INT(it);
296
+			} else if (strcmp(it->string, "htname")==0) {
297
+				htname.s = it->valuestring;
298
+				htname.len = strlen(htname.s);
299
+			} else if (strcmp(it->string, "cname")==0) {
300
+				cname.s = it->valuestring;
301
+				cname.len = strlen(cname.s);
302
+			} else if (strcmp(it->string, "type")==0) {
303
+				type = SRJSON_GET_INT(it);
304
+			} else if (strcmp(it->string, "strval")==0) {
305
+				val.s.s = it->valuestring;
306
+				val.s.len = strlen(val.s.s);
307
+			} else if (strcmp(it->string, "intval")==0) {
308
+				val.n = SRJSON_GET_INT(it);
309
+			} else if (strcmp(it->string, "mode")==0) {
310
+				mode = SRJSON_GET_INT(it);
311
+			} else {
312
+				LM_ERR("unrecognized field in json object\n");
313
+				goto invalid;
314
+			}
315
+		}
316
+
317
+		if (unlikely(action == HT_DMQ_SYNC)) {
318
+			ht_dmq_send_sync(dmq_node);
161 319
 		} else {
162
-			LM_ERR("unrecognized field in json object\n");
163
-			goto invalid;
320
+			if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
321
+				LM_ERR("failed to replay action\n");
322
+				goto error;
323
+			}
164 324
 		}
165
-	}
166 325
 
167
-	if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
168
-		LM_ERR("failed to replay action\n");
169
-		goto error;
170 326
 	}
171 327
 
172 328
 	srjson_DestroyDoc(&jdoc);
... ...
@@ -222,7 +378,7 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int
222 378
 	if(jdoc.buf.s!=NULL) {
223 379
 		jdoc.buf.len = strlen(jdoc.buf.s);
224 380
 		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
225
-		if (ht_dmq_broadcast(&jdoc.buf)!=0) {
381
+		if (ht_dmq_send(&jdoc.buf, 0)!=0) {
226 382
 			goto error;
227 383
 		}
228 384
 		jdoc.free_fn(jdoc.buf.s);
... ...
@@ -264,9 +420,172 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int ty
264 420
 	} else if (action==HT_DMQ_RM_CELL_RE) {
265 421
 		return ht_rm_cell_re(&val->s, ht, mode);
266 422
 	} else {
267
-		LM_ERR("unrecognized action");
423
+		LM_ERR("unrecognized action\n");
424
+		return -1;
425
+	}
426
+}
427
+
428
+int ht_dmq_request_sync() {
429
+
430
+	srjson_doc_t jdoc;
431
+
432
+	LM_DBG("requesting sync from dmq peers\n");
433
+	srjson_InitDoc(&jdoc, NULL);
434
+
435
+	jdoc.root = srjson_CreateObject(&jdoc);
436
+	if(jdoc.root==NULL) {
437
+		LM_ERR("cannot create json root\n");
438
+		goto error;
439
+	}
440
+
441
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC);
442
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
443
+	if(jdoc.buf.s==NULL) {
444
+		LM_ERR("unable to serialize data\n");
445
+		goto error;
446
+	}
447
+	jdoc.buf.len = strlen(jdoc.buf.s);
448
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
449
+	if (ht_dmq_send(&jdoc.buf, 0)!=0) {
450
+		goto error;
451
+	}
452
+
453
+	jdoc.free_fn(jdoc.buf.s);
454
+	jdoc.buf.s = NULL;
455
+	srjson_DestroyDoc(&jdoc);
456
+	return 0;
457
+
458
+error:
459
+	if(jdoc.buf.s!=NULL) {
460
+		jdoc.free_fn(jdoc.buf.s);
461
+		jdoc.buf.s = NULL;
462
+	}
463
+	srjson_DestroyDoc(&jdoc);
464
+	return -1;
465
+}
466
+
467
+int ht_dmq_send_sync(dmq_node_t* node) {
468
+	ht_t *ht;
469
+	ht_cell_t *it;
470
+	time_t now;
471
+	int i;
472
+
473
+	ht = ht_get_root();
474
+	if(ht==NULL)
475
+	{
476
+		LM_DBG("no htables to sync!\n");
477
+		return 0;
478
+	}
479
+
480
+	if (ht_dmq_cell_group_init() < 0)
268 481
 		return -1;
482
+
483
+	now = time(NULL);
484
+
485
+	while (ht != NULL)
486
+	{
487
+		if (!ht->dmqreplicate)
488
+			goto skip;
489
+
490
+		for(i=0; i<ht->htsize; i++)
491
+		{
492
+			ht_slot_lock(ht, i);
493
+			it = ht->entries[i].first;
494
+			while(it)
495
+			{
496
+				if(ht->htexpire > 0) {
497
+					if (it->expire <= now) {
498
+						LM_DBG("skipping expired entry\n");
499
+						it = it->next;
500
+						continue;
501
+					}
502
+				}
503
+
504
+				if (ht_dmq_cell_group_write(&ht->name, it) < 0) {
505
+					ht_slot_unlock(ht, i);
506
+					goto error;
507
+				}
508
+
509
+				if (ht_dmq_jdoc_cell_group.size >= dmq_cell_group_max_size) {
510
+					LM_DBG("sending group count[%d]size[%d]\n", ht_dmq_jdoc_cell_group.count, ht_dmq_jdoc_cell_group.size);
511
+					if (ht_dmq_cell_group_flush(node) < 0) {
512
+						ht_slot_unlock(ht, i);
513
+						goto error;
514
+					}
515
+				}
516
+
517
+				it = it->next;
518
+			}
519
+			ht_slot_unlock(ht, i);
520
+		}
521
+
522
+skip:
523
+		ht = ht->next;
269 524
 	}
525
+
526
+	if (ht_dmq_cell_group_flush(node) < 0)
527
+		goto error;
528
+
529
+	ht_dmq_cell_group_destroy();
530
+	return 0;
531
+
532
+error:
533
+	ht_dmq_cell_group_destroy();
534
+	return -1;
535
+}
536
+
537
+int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
538
+	LM_DBG("handling sync\n");
539
+
540
+	srjson_t* cells;
541
+	srjson_t* cell;
542
+	srjson_t* it;
543
+	str htname;
544
+	str cname;
545
+	int type;
546
+	int_str val;
547
+	int expire;
548
+	ht_t* ht;
549
+	time_t now;
550
+
551
+
552
+	cells = jdoc->root->child;
553
+	cell = cells->child;
554
+
555
+	now = time(NULL);
556
+
557
+	while (cell) {
558
+		for(it=cell->child; it; it = it->next) {
559
+			if (strcmp(it->string, "htname")==0) {
560
+				htname.s = it->valuestring;
561
+				htname.len = strlen(htname.s);
562
+			} else if (strcmp(it->string, "cname")==0) {
563
+				cname.s = it->valuestring;
564
+				cname.len = strlen(cname.s);
565
+			} else if (strcmp(it->string, "type")==0) {
566
+				type = SRJSON_GET_INT(it);
567
+			} else if (strcmp(it->string, "strval")==0) {
568
+				val.s.s = it->valuestring;
569
+				val.s.len = strlen(val.s.s);
570
+			} else if (strcmp(it->string, "intval")==0) {
571
+				val.n = SRJSON_GET_INT(it);
572
+			} else if (strcmp(it->string, "expire")==0) {
573
+				expire = SRJSON_GET_INT(it);
574
+			} else {
575
+				LM_WARN("unrecognized field in json object\n");
576
+			}
577
+		}
578
+
579
+		ht = ht_get_table(&htname);
580
+		if(ht==NULL)
581
+			LM_WARN("unable to get table %.*s\n", ht->name.len, ht->name.s);
582
+
583
+		if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0)
584
+			LM_WARN("unable to set cell %.*s in table %.*s\n", cname.len, cname.s, ht->name.len, ht->name.s);
585
+
586
+		cell = cell->next;
587
+	}
588
+	return 0;
270 589
 }
271 590
 
272 591
 /**
... ...
@@ -31,18 +31,22 @@ extern dmq_api_t ht_dmqb;
31 31
 extern dmq_peer_t* ht_dmq_peer;
32 32
 extern dmq_resp_cback_t ht_dmq_resp_callback;
33 33
 
34
+int ht_dmq_init_sync;
35
+
34 36
 typedef enum {
35 37
 		HT_DMQ_NONE,
36 38
         HT_DMQ_SET_CELL,
37 39
         HT_DMQ_SET_CELL_EXPIRE,
38 40
         HT_DMQ_DEL_CELL,
39
-        HT_DMQ_RM_CELL_RE
41
+        HT_DMQ_RM_CELL_RE,
42
+        HT_DMQ_SYNC
40 43
 } ht_dmq_action_t;
41 44
 
42 45
 int ht_dmq_initialize();
43 46
 int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
44 47
 int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
45 48
 int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode);
49
+int ht_dmq_request_sync();
46 50
 int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
47 51
 
48 52
 #endif
... ...
@@ -52,6 +52,7 @@ MODULE_VERSION
52 52
 int  ht_timer_interval = 20;
53 53
 int  ht_db_expires_flag = 0;
54 54
 int  ht_enable_dmq = 0;
55
+int  ht_dmq_init_sync = 0;
55 56
 int  ht_timer_procs = 0;
56 57
 static int ht_event_callback_mode = 0;
57 58
 
... ...
@@ -153,6 +154,7 @@ static param_export_t params[]={
153 154
 	{"timer_interval",      INT_PARAM, &ht_timer_interval},
154 155
 	{"db_expires",          INT_PARAM, &ht_db_expires_flag},
155 156
 	{"enable_dmq",          INT_PARAM, &ht_enable_dmq},
157
+	{"dmq_init_sync",       INT_PARAM, &ht_dmq_init_sync},
156 158
 	{"timer_procs",         PARAM_INT, &ht_timer_procs},
157 159
 	{"event_callback",      PARAM_STR, &ht_event_callback},
158 160
 	{"event_callback_mode", PARAM_INT, &ht_event_callback_mode},
... ...
@@ -218,7 +220,7 @@ static int mod_init(void)
218 220
 		}
219 221
 	}
220 222
 
221
-	if (ht_enable_dmq>0 && ht_dmq_initialize()!=0) {
223
+	if (ht_enable_dmq>0 && ht_dmq_initialize(ht_dmq_init_sync)!=0) {
222 224
 		LM_ERR("failed to initialize dmq integration\n");
223 225
 		return -1;
224 226
 	}