Browse code

htable: Modify previous commit to create new API for RPC and keep old API for backwards compatibility

Thanks to @miconda for a hint!

Olle E. Johansson authored on 07/12/2021 09:09:29
Showing 1 changed files
... ...
@@ -410,6 +410,10 @@ error:
410 410
 	return -1;
411 411
 }
412 412
 
413
+/* Replay DMQ action
414
+
415
+Return 0 for non-error. Allt other returns are parsed as error.
416
+*/
413 417
 int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname,
414 418
 		int type, int_str* val, int mode) {
415 419
 
Browse code

htable: replicate the operation rm with sw

- GH #2573

Daniel-Constantin Mierla authored on 03/12/2020 12:50:58
Showing 1 changed files
... ...
@@ -350,7 +350,9 @@ error:
350 350
 	return 0;
351 351
 }
352 352
 
353
-int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
353
+int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname,
354
+		int type, int_str* val, int mode)
355
+{
354 356
 
355 357
 	srjson_doc_t jdoc;
356 358
 
... ...
@@ -370,7 +372,8 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int
370 372
 		srjson_AddStrToObject(&jdoc, jdoc.root, "cname", cname->s, cname->len);
371 373
 	}
372 374
 
373
-	if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE || action==HT_DMQ_RM_CELL_RE) {
375
+	if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE
376
+			|| action==HT_DMQ_RM_CELL_RE || action==HT_DMQ_RM_CELL_SW) {
374 377
 		srjson_AddNumberToObject(&jdoc, jdoc.root, "type", type);
375 378
 		if (type&AVP_VAL_STR) {
376 379
 			srjson_AddStrToObject(&jdoc, jdoc.root, "strval", val->s.s, val->s.len);
... ...
@@ -407,7 +410,8 @@ error:
407 410
 	return -1;
408 411
 }
409 412
 
410
-int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
413
+int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname,
414
+		int type, int_str* val, int mode) {
411 415
 
412 416
 	ht_t* ht;
413 417
 	ht = ht_get_table(htname);
... ...
@@ -416,7 +420,8 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int ty
416 420
 		return -1;
417 421
 	}
418 422
 
419
-	LM_DBG("replaying action %d on %.*s=>%.*s...\n", action, htname->len, htname->s, cname->len, cname->s);
423
+	LM_DBG("replaying action %d on %.*s=>%.*s...\n", action,
424
+			htname->len, htname->s, cname->len, cname->s);
420 425
 
421 426
 	if (action==HT_DMQ_SET_CELL) {
422 427
 		return ht_set_cell(ht, cname, type, val, mode);
... ...
@@ -426,6 +431,8 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int ty
426 431
 		return ht_del_cell(ht, cname);
427 432
 	} else if (action==HT_DMQ_RM_CELL_RE) {
428 433
 		return ht_rm_cell_re(&val->s, ht, mode);
434
+	} else if (action==HT_DMQ_RM_CELL_SW) {
435
+		return ht_rm_cell_op(&val->s, ht, mode, HT_RM_OP_SW);
429 436
 	} else {
430 437
 		LM_ERR("unrecognized action\n");
431 438
 		return -1;
Browse code

htable: make ht_dmq_init_sync variable extern in ht_dmq.c

- avoiding redeclaration, it is set via modparam

Daniel-Constantin Mierla authored on 16/01/2020 10:23:40
Showing 1 changed files
... ...
@@ -48,7 +48,7 @@ static str dmq_500_rpl  = str_init("Server Internal Error");
48 48
 static int dmq_cell_group_empty_size = 12; // {"cells":[]}
49 49
 static int dmq_cell_group_max_size = 60000;
50 50
 static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group;
51
-int ht_dmq_init_sync;
51
+extern int ht_dmq_init_sync;
52 52
 
53 53
 dmq_api_t ht_dmqb;
54 54
 dmq_peer_t* ht_dmq_peer = NULL;
Browse code

htable: fix a possible null pointer dereference in dmq startup error case

Henning Westerholt authored on 31/05/2019 10:42:21
Showing 1 changed files
... ...
@@ -588,11 +588,11 @@ int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
588 588
 			if(ht==NULL) {
589 589
 				LM_WARN("unable to get table %.*s\n",
590 590
 						htname.len, (htname.s)?htname.s:"");
591
-			}
592
-
593
-			if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) {
594
-				LM_WARN("unable to set cell %.*s in table %.*s\n",
595
-						cname.len, cname.s, ht->name.len, ht->name.s);
591
+			} else {
592
+				if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) {
593
+					LM_WARN("unable to set cell %.*s in table %.*s\n",
594
+							cname.len, cname.s, ht->name.len, ht->name.s);
595
+				}
596 596
 			}
597 597
 		}
598 598
 
Browse code

htable: fix infinite loop during dmq sync of large tables

- reported by Enrico Bandiera (GH #1863)

Charles Chance authored on 21/02/2019 19:27:32
Showing 1 changed files
... ...
@@ -139,36 +139,43 @@ static int ht_dmq_cell_group_flush(dmq_node_t* node) {
139 139
 
140 140
 	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
141 141
 	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
142
+	int ret = 0;
142 143
 
143 144
 	srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells);
144 145
 
145
-	LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
146
+	LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
146 147
 	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
147 148
 	if(jdoc->buf.s==NULL) {
148 149
 		LM_ERR("unable to serialize data\n");
149
-		return -1;
150
+		ret = -1;
151
+		goto cleanup;
150 152
 	}
151 153
 	jdoc->buf.len = strlen(jdoc->buf.s);
152 154
 
153 155
 	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
154 156
 	if (ht_dmq_send(&jdoc->buf, node)!=0) {
155 157
 		LM_ERR("unable to send data\n");
156
-		return -1;
158
+		ret = -1;
157 159
 	}
158 160
 
159
-	LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
161
+cleanup:
162
+
163
+	srjson_DeleteItemFromObject(jdoc, jdoc->root, "cells");
164
+	ht_dmq_jdoc_cell_group.count = 0;
165
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
166
+
167
+	if(jdoc->buf.s!=NULL) {
168
+		jdoc->free_fn(jdoc->buf.s);
169
+		jdoc->buf.s = NULL;
170
+	}
160 171
 
161
-	srjson_Delete(jdoc, jdoc_cells);
162 172
 	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
163 173
 	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
164 174
 		LM_ERR("cannot re-create json cells array! \n");
165
-		return -1;
175
+		ret = -1;
166 176
 	}
167 177
 
168
-	ht_dmq_jdoc_cell_group.count = 0;
169
-	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
170
-
171
-	return 0;
178
+	return ret;
172 179
 }
173 180
 
174 181
 static void ht_dmq_cell_group_destroy() {
Browse code

htable: safety checks for values replicated via dmq

Daniel-Constantin Mierla authored on 20/11/2018 11:33:33
Showing 1 changed files
... ...
@@ -575,15 +575,18 @@ int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
575 575
 			}
576 576
 		}
577 577
 
578
-		ht = ht_get_table(&htname);
579
-		if(ht==NULL) {
580
-			LM_WARN("unable to get table %.*s\n",
581
-					htname.len, (htname.s)?htname.s:"");
582
-		}
578
+		if(htname.s!=NULL && htname.len>0 && cname.s!=NULL
579
+				&& cname.len>0) {
580
+			ht = ht_get_table(&htname);
581
+			if(ht==NULL) {
582
+				LM_WARN("unable to get table %.*s\n",
583
+						htname.len, (htname.s)?htname.s:"");
584
+			}
583 585
 
584
-		if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) {
585
-			LM_WARN("unable to set cell %.*s in table %.*s\n",
586
-					cname.len, cname.s, ht->name.len, ht->name.s);
586
+			if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) {
587
+				LM_WARN("unable to set cell %.*s in table %.*s\n",
588
+						cname.len, cname.s, ht->name.len, ht->name.s);
589
+			}
587 590
 		}
588 591
 
589 592
 		cell = cell->next;
Browse code

htable: use local variable for name in log message instead of null htable pointer

Daniel-Constantin Mierla authored on 18/11/2018 08:10:46
Showing 1 changed files
... ...
@@ -576,11 +576,15 @@ int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
576 576
 		}
577 577
 
578 578
 		ht = ht_get_table(&htname);
579
-		if(ht==NULL)
580
-			LM_WARN("unable to get table %.*s\n", ht->name.len, ht->name.s);
579
+		if(ht==NULL) {
580
+			LM_WARN("unable to get table %.*s\n",
581
+					htname.len, (htname.s)?htname.s:"");
582
+		}
581 583
 
582
-		if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0)
583
-			LM_WARN("unable to set cell %.*s in table %.*s\n", cname.len, cname.s, ht->name.len, ht->name.s);
584
+		if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0) {
585
+			LM_WARN("unable to set cell %.*s in table %.*s\n",
586
+					cname.len, cname.s, ht->name.len, ht->name.s);
587
+		}
584 588
 
585 589
 		cell = cell->next;
586 590
 	}
Browse code

htable: init vars in ht_dmq_handle_sync() to avoid compile warnings

Daniel-Constantin Mierla authored on 17/11/2018 08:07:33
Showing 1 changed files
... ...
@@ -537,17 +537,16 @@ error:
537 537
 int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
538 538
 	LM_DBG("handling sync\n");
539 539
 
540
-	srjson_t* cells;
541
-	srjson_t* cell;
542
-	srjson_t* it;
543
-	str htname;
544
-	str cname;
545
-	int type;
546
-	int_str val;
547
-	int expire;
548
-	ht_t* ht;
549
-	time_t now;
550
-
540
+	srjson_t* cells = NULL;
541
+	srjson_t* cell = NULL;
542
+	srjson_t* it = NULL;
543
+	str htname = STR_NULL;
544
+	str cname = STR_NULL;
545
+	int type = 0;
546
+	int_str val = {0};
547
+	int expire = 0;
548
+	ht_t* ht = NULL;
549
+	time_t now = 0;
551 550
 
552 551
 	cells = jdoc->root->child;
553 552
 	cell = cells->child;
Browse code

htable: added startup synchronization over dmq

Charles Chance authored on 28/09/2018 17:09:28
Showing 1 changed files
... ...
@@ -24,11 +24,6 @@
24 24
 #include "ht_dmq.h"
25 25
 #include "ht_api.h"
26 26
 
27
-static str ht_dmq_content_type = str_init("application/json");
28
-static str dmq_200_rpl  = str_init("OK");
29
-static str dmq_400_rpl  = str_init("Bad Request");
30
-static str dmq_500_rpl  = str_init("Server Internal Error");
31
-
32 27
 typedef struct _ht_dmq_repdata {
33 28
 	int action;
34 29
 	str htname;
... ...
@@ -39,10 +34,155 @@ typedef struct _ht_dmq_repdata {
39 34
 	int expire;
40 35
 } ht_dmq_repdata_t;
41 36
 
37
+typedef struct _ht_dmq_jdoc_cell_group {
38
+	int count;
39
+	int size;
40
+	srjson_doc_t jdoc;
41
+	srjson_t *jdoc_cells;
42
+} ht_dmq_jdoc_cell_group_t;
43
+
44
+static str ht_dmq_content_type = str_init("application/json");
45
+static str dmq_200_rpl  = str_init("OK");
46
+static str dmq_400_rpl  = str_init("Bad Request");
47
+static str dmq_500_rpl  = str_init("Server Internal Error");
48
+static int dmq_cell_group_empty_size = 12; // {"cells":[]}
49
+static int dmq_cell_group_max_size = 60000;
50
+static ht_dmq_jdoc_cell_group_t ht_dmq_jdoc_cell_group;
51
+int ht_dmq_init_sync;
52
+
42 53
 dmq_api_t ht_dmqb;
43 54
 dmq_peer_t* ht_dmq_peer = NULL;
44 55
 dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
45 56
 
57
+int ht_dmq_send(str* body, dmq_node_t* node);
58
+int ht_dmq_send_sync(dmq_node_t* node);
59
+int ht_dmq_handle_sync(srjson_doc_t* jdoc);
60
+
61
+static int ht_dmq_cell_group_init(void) {
62
+
63
+	if (ht_dmq_jdoc_cell_group.jdoc.root)
64
+		return 0; // already initialised
65
+
66
+	ht_dmq_jdoc_cell_group.count = 0;
67
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
68
+
69
+	srjson_InitDoc(&ht_dmq_jdoc_cell_group.jdoc, NULL);
70
+
71
+	ht_dmq_jdoc_cell_group.jdoc.root = srjson_CreateObject(&ht_dmq_jdoc_cell_group.jdoc);
72
+	if (ht_dmq_jdoc_cell_group.jdoc.root==NULL) {
73
+		LM_ERR("cannot create json root object! \n");
74
+		return -1;
75
+	}
76
+
77
+	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
78
+	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
79
+		LM_ERR("cannot create json cells array! \n");
80
+		srjson_DestroyDoc(&ht_dmq_jdoc_cell_group.jdoc);
81
+		return -1;
82
+	}
83
+
84
+	return 0;
85
+}
86
+
87
+static int ht_dmq_cell_group_write(str* htname, ht_cell_t* ptr) {
88
+
89
+	// jsonify cell and add to array
90
+
91
+	str tmp;
92
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
93
+	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
94
+	srjson_t * jdoc_cell = srjson_CreateObject(jdoc);
95
+
96
+	if(!jdoc_cell) {
97
+		LM_ERR("cannot create cell json root\n");
98
+		return -1;
99
+	}
100
+
101
+	// add json overhead
102
+	if(ptr->flags&AVP_VAL_STR) {
103
+		ht_dmq_jdoc_cell_group.size += 54; // {"htname":"","cname":"","type":,"strval":"","expire":}
104
+	} else {
105
+		ht_dmq_jdoc_cell_group.size += 52; // {"htname":"","cname":"","type":,"intval":,"expire":}
106
+	}
107
+
108
+	srjson_AddStrToObject(jdoc, jdoc_cell, "htname", htname->s, htname->len);
109
+	ht_dmq_jdoc_cell_group.size += htname->len;
110
+
111
+	srjson_AddStrToObject(jdoc, jdoc_cell, "cname", ptr->name.s, ptr->name.len);
112
+	ht_dmq_jdoc_cell_group.size += ptr->name.len;
113
+
114
+	if (ptr->flags&AVP_VAL_STR) {
115
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", AVP_VAL_STR);
116
+		ht_dmq_jdoc_cell_group.size += 1;
117
+		srjson_AddStrToObject(jdoc, jdoc_cell, "strval", ptr->value.s.s, ptr->value.s.len);
118
+		ht_dmq_jdoc_cell_group.size += ptr->value.s.len;
119
+	} else {
120
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "type", 0);
121
+		ht_dmq_jdoc_cell_group.size += 1;
122
+		srjson_AddNumberToObject(jdoc, jdoc_cell, "intval", ptr->value.n);
123
+		tmp.s = sint2str((long)ptr->value.n, &tmp.len);
124
+		ht_dmq_jdoc_cell_group.size += tmp.len;
125
+	}
126
+
127
+	srjson_AddNumberToObject(jdoc, jdoc_cell, "expire", ptr->expire);
128
+	tmp.s = sint2str((long)ptr->expire, &tmp.len);
129
+	ht_dmq_jdoc_cell_group.size += tmp.len;
130
+
131
+	srjson_AddItemToArray(jdoc, jdoc_cells, jdoc_cell);
132
+
133
+	ht_dmq_jdoc_cell_group.count++;
134
+
135
+	return 0;
136
+}
137
+
138
+static int ht_dmq_cell_group_flush(dmq_node_t* node) {
139
+
140
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
141
+	srjson_t *jdoc_cells = ht_dmq_jdoc_cell_group.jdoc_cells;
142
+
143
+	srjson_AddItemToObject(jdoc, jdoc->root, "cells", jdoc_cells);
144
+
145
+	LM_DBG("json[%s]\n", srjson_PrintUnformatted(jdoc, jdoc->root));
146
+	jdoc->buf.s = srjson_PrintUnformatted(jdoc, jdoc->root);
147
+	if(jdoc->buf.s==NULL) {
148
+		LM_ERR("unable to serialize data\n");
149
+		return -1;
150
+	}
151
+	jdoc->buf.len = strlen(jdoc->buf.s);
152
+
153
+	LM_DBG("sending serialized data %.*s\n", jdoc->buf.len, jdoc->buf.s);
154
+	if (ht_dmq_send(&jdoc->buf, node)!=0) {
155
+		LM_ERR("unable to send data\n");
156
+		return -1;
157
+	}
158
+
159
+	LM_DBG("jdoc size[%d]\n", ht_dmq_jdoc_cell_group.size);
160
+
161
+	srjson_Delete(jdoc, jdoc_cells);
162
+	ht_dmq_jdoc_cell_group.jdoc_cells = srjson_CreateArray(&ht_dmq_jdoc_cell_group.jdoc);
163
+	if (ht_dmq_jdoc_cell_group.jdoc_cells==NULL) {
164
+		LM_ERR("cannot re-create json cells array! \n");
165
+		return -1;
166
+	}
167
+
168
+	ht_dmq_jdoc_cell_group.count = 0;
169
+	ht_dmq_jdoc_cell_group.size = dmq_cell_group_empty_size;
170
+
171
+	return 0;
172
+}
173
+
174
+static void ht_dmq_cell_group_destroy() {
175
+
176
+	srjson_doc_t *jdoc = &ht_dmq_jdoc_cell_group.jdoc;
177
+
178
+	if(jdoc->buf.s!=NULL) {
179
+		jdoc->free_fn(jdoc->buf.s);
180
+		jdoc->buf.s = NULL;
181
+	}
182
+	srjson_DestroyDoc(jdoc);
183
+
184
+}
185
+
46 186
 /**
47 187
  * @brief add notification peer
48 188
  */
... ...
@@ -59,7 +199,7 @@ int ht_dmq_initialize()
59 199
 	}
60 200
 
61 201
 	not_peer.callback = ht_dmq_handle_msg;
62
-	not_peer.init_callback = NULL;
202
+	not_peer.init_callback = (ht_dmq_init_sync ? ht_dmq_request_sync : NULL);
63 203
 	not_peer.description.s = "htable";
64 204
 	not_peer.description.len = 6;
65 205
 	not_peer.peer_id.s = "htable";
... ...
@@ -76,14 +216,20 @@ error:
76 216
 	return -1;
77 217
 }
78 218
 
79
-int ht_dmq_broadcast(str* body)
80
-{
219
+int ht_dmq_send(str* body, dmq_node_t* node) {
81 220
 	if (!ht_dmq_peer) {
82 221
 		LM_ERR("ht_dmq_peer is null!\n");
83 222
 		return -1;
84 223
 	}
85
-	LM_DBG("sending broadcast...\n");
86
-	ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
224
+	if (node) {
225
+		LM_DBG("sending dmq message ...\n");
226
+		ht_dmqb.send_message(ht_dmq_peer, body, node,
227
+				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
228
+	} else {
229
+		LM_DBG("sending dmq broadcast...\n");
230
+		ht_dmqb.bcast_message(ht_dmq_peer, body, 0,
231
+				&ht_dmq_resp_callback, 1, &ht_dmq_content_type);
232
+	}
87 233
 	return 0;
88 234
 }
89 235
 
... ...
@@ -138,35 +284,45 @@ int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq
138 284
 		}
139 285
 	}
140 286
 
141
-	for(it=jdoc.root->child; it; it = it->next)
142
-	{
143
-		LM_DBG("found field: %s\n", it->string);
144
-		if (strcmp(it->string, "action")==0) {
145
-			action = SRJSON_GET_INT(it);
146
-		} else if (strcmp(it->string, "htname")==0) {
147
-			htname.s = it->valuestring;
148
-			htname.len = strlen(htname.s);
149
-		} else if (strcmp(it->string, "cname")==0) {
150
-			cname.s = it->valuestring;
151
-			cname.len = strlen(cname.s);
152
-		} else if (strcmp(it->string, "type")==0) {
153
-			type = SRJSON_GET_INT(it);
154
-		} else if (strcmp(it->string, "strval")==0) {
155
-			val.s.s = it->valuestring;
156
-			val.s.len = strlen(val.s.s);
157
-		} else if (strcmp(it->string, "intval")==0) {
158
-			val.n = SRJSON_GET_INT(it);
159
-		} else if (strcmp(it->string, "mode")==0) {
160
-			mode = SRJSON_GET_INT(it);
287
+	if (unlikely(strcmp(jdoc.root->child->string, "cells")==0)) {
288
+		ht_dmq_handle_sync(&jdoc);
289
+	} else {
290
+
291
+		for(it=jdoc.root->child; it; it = it->next)
292
+		{
293
+			LM_DBG("found field: %s\n", it->string);
294
+			if (strcmp(it->string, "action")==0) {
295
+				action = SRJSON_GET_INT(it);
296
+			} else if (strcmp(it->string, "htname")==0) {
297
+				htname.s = it->valuestring;
298
+				htname.len = strlen(htname.s);
299
+			} else if (strcmp(it->string, "cname")==0) {
300
+				cname.s = it->valuestring;
301
+				cname.len = strlen(cname.s);
302
+			} else if (strcmp(it->string, "type")==0) {
303
+				type = SRJSON_GET_INT(it);
304
+			} else if (strcmp(it->string, "strval")==0) {
305
+				val.s.s = it->valuestring;
306
+				val.s.len = strlen(val.s.s);
307
+			} else if (strcmp(it->string, "intval")==0) {
308
+				val.n = SRJSON_GET_INT(it);
309
+			} else if (strcmp(it->string, "mode")==0) {
310
+				mode = SRJSON_GET_INT(it);
311
+			} else {
312
+				LM_ERR("unrecognized field in json object\n");
313
+				goto invalid;
314
+			}
315
+		}
316
+
317
+		if (unlikely(action == HT_DMQ_SYNC)) {
318
+			ht_dmq_send_sync(dmq_node);
161 319
 		} else {
162
-			LM_ERR("unrecognized field in json object\n");
163
-			goto invalid;
320
+			if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
321
+				LM_ERR("failed to replay action\n");
322
+				goto error;
323
+			}
164 324
 		}
165
-	}
166 325
 
167
-	if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
168
-		LM_ERR("failed to replay action\n");
169
-		goto error;
170 326
 	}
171 327
 
172 328
 	srjson_DestroyDoc(&jdoc);
... ...
@@ -222,7 +378,7 @@ int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int
222 378
 	if(jdoc.buf.s!=NULL) {
223 379
 		jdoc.buf.len = strlen(jdoc.buf.s);
224 380
 		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
225
-		if (ht_dmq_broadcast(&jdoc.buf)!=0) {
381
+		if (ht_dmq_send(&jdoc.buf, 0)!=0) {
226 382
 			goto error;
227 383
 		}
228 384
 		jdoc.free_fn(jdoc.buf.s);
... ...
@@ -264,9 +420,172 @@ int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int ty
264 420
 	} else if (action==HT_DMQ_RM_CELL_RE) {
265 421
 		return ht_rm_cell_re(&val->s, ht, mode);
266 422
 	} else {
267
-		LM_ERR("unrecognized action");
423
+		LM_ERR("unrecognized action\n");
424
+		return -1;
425
+	}
426
+}
427
+
428
+int ht_dmq_request_sync() {
429
+
430
+	srjson_doc_t jdoc;
431
+
432
+	LM_DBG("requesting sync from dmq peers\n");
433
+	srjson_InitDoc(&jdoc, NULL);
434
+
435
+	jdoc.root = srjson_CreateObject(&jdoc);
436
+	if(jdoc.root==NULL) {
437
+		LM_ERR("cannot create json root\n");
438
+		goto error;
439
+	}
440
+
441
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", HT_DMQ_SYNC);
442
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
443
+	if(jdoc.buf.s==NULL) {
444
+		LM_ERR("unable to serialize data\n");
445
+		goto error;
446
+	}
447
+	jdoc.buf.len = strlen(jdoc.buf.s);
448
+	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
449
+	if (ht_dmq_send(&jdoc.buf, 0)!=0) {
450
+		goto error;
451
+	}
452
+
453
+	jdoc.free_fn(jdoc.buf.s);
454
+	jdoc.buf.s = NULL;
455
+	srjson_DestroyDoc(&jdoc);
456
+	return 0;
457
+
458
+error:
459
+	if(jdoc.buf.s!=NULL) {
460
+		jdoc.free_fn(jdoc.buf.s);
461
+		jdoc.buf.s = NULL;
462
+	}
463
+	srjson_DestroyDoc(&jdoc);
464
+	return -1;
465
+}
466
+
467
+int ht_dmq_send_sync(dmq_node_t* node) {
468
+	ht_t *ht;
469
+	ht_cell_t *it;
470
+	time_t now;
471
+	int i;
472
+
473
+	ht = ht_get_root();
474
+	if(ht==NULL)
475
+	{
476
+		LM_DBG("no htables to sync!\n");
477
+		return 0;
478
+	}
479
+
480
+	if (ht_dmq_cell_group_init() < 0)
268 481
 		return -1;
482
+
483
+	now = time(NULL);
484
+
485
+	while (ht != NULL)
486
+	{
487
+		if (!ht->dmqreplicate)
488
+			goto skip;
489
+
490
+		for(i=0; i<ht->htsize; i++)
491
+		{
492
+			ht_slot_lock(ht, i);
493
+			it = ht->entries[i].first;
494
+			while(it)
495
+			{
496
+				if(ht->htexpire > 0) {
497
+					if (it->expire <= now) {
498
+						LM_DBG("skipping expired entry\n");
499
+						it = it->next;
500
+						continue;
501
+					}
502
+				}
503
+
504
+				if (ht_dmq_cell_group_write(&ht->name, it) < 0) {
505
+					ht_slot_unlock(ht, i);
506
+					goto error;
507
+				}
508
+
509
+				if (ht_dmq_jdoc_cell_group.size >= dmq_cell_group_max_size) {
510
+					LM_DBG("sending group count[%d]size[%d]\n", ht_dmq_jdoc_cell_group.count, ht_dmq_jdoc_cell_group.size);
511
+					if (ht_dmq_cell_group_flush(node) < 0) {
512
+						ht_slot_unlock(ht, i);
513
+						goto error;
514
+					}
515
+				}
516
+
517
+				it = it->next;
518
+			}
519
+			ht_slot_unlock(ht, i);
520
+		}
521
+
522
+skip:
523
+		ht = ht->next;
269 524
 	}
525
+
526
+	if (ht_dmq_cell_group_flush(node) < 0)
527
+		goto error;
528
+
529
+	ht_dmq_cell_group_destroy();
530
+	return 0;
531
+
532
+error:
533
+	ht_dmq_cell_group_destroy();
534
+	return -1;
535
+}
536
+
537
+int ht_dmq_handle_sync(srjson_doc_t* jdoc) {
538
+	LM_DBG("handling sync\n");
539
+
540
+	srjson_t* cells;
541
+	srjson_t* cell;
542
+	srjson_t* it;
543
+	str htname;
544
+	str cname;
545
+	int type;
546
+	int_str val;
547
+	int expire;
548
+	ht_t* ht;
549
+	time_t now;
550
+
551
+
552
+	cells = jdoc->root->child;
553
+	cell = cells->child;
554
+
555
+	now = time(NULL);
556
+
557
+	while (cell) {
558
+		for(it=cell->child; it; it = it->next) {
559
+			if (strcmp(it->string, "htname")==0) {
560
+				htname.s = it->valuestring;
561
+				htname.len = strlen(htname.s);
562
+			} else if (strcmp(it->string, "cname")==0) {
563
+				cname.s = it->valuestring;
564
+				cname.len = strlen(cname.s);
565
+			} else if (strcmp(it->string, "type")==0) {
566
+				type = SRJSON_GET_INT(it);
567
+			} else if (strcmp(it->string, "strval")==0) {
568
+				val.s.s = it->valuestring;
569
+				val.s.len = strlen(val.s.s);
570
+			} else if (strcmp(it->string, "intval")==0) {
571
+				val.n = SRJSON_GET_INT(it);
572
+			} else if (strcmp(it->string, "expire")==0) {
573
+				expire = SRJSON_GET_INT(it);
574
+			} else {
575
+				LM_WARN("unrecognized field in json object\n");
576
+			}
577
+		}
578
+
579
+		ht = ht_get_table(&htname);
580
+		if(ht==NULL)
581
+			LM_WARN("unable to get table %.*s\n", ht->name.len, ht->name.s);
582
+
583
+		if (ht_set_cell_ex(ht, &cname, type, &val, 0, expire - now) < 0)
584
+			LM_WARN("unable to set cell %.*s in table %.*s\n", cname.len, cname.s, ht->name.len, ht->name.s);
585
+
586
+		cell = cell->next;
587
+	}
588
+	return 0;
270 589
 }
271 590
 
272 591
 /**
Browse code

core, lib, modules: restructured source code tree

- new folder src/ to hold the source code for main project applications
- main.c is in src/
- all core files are subfolder are in src/core/
- modules are in src/modules/
- libs are in src/lib/
- application Makefiles are in src/
- application binary is built in src/ (src/kamailio)

Daniel-Constantin Mierla authored on 07/12/2016 11:03:51
Showing 1 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,280 @@
1
+/**
2
+ *
3
+ * Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
4
+ *
5
+ * This file is part of Kamailio, a free SIP server.
6
+ *
7
+ * Kamailio is free software; you can redistribute it and/or modify
8
+ * it under the terms of the GNU General Public License as published by
9
+ * the Free Software Foundation; either version 2 of the License, or
10
+ * (at your option) any later version
11
+ *
12
+ * Kamailio is distributed in the hope that it will be useful,
13
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
+ * GNU General Public License for more details.
16
+ *
17
+ * You should have received a copy of the GNU General Public License
18
+ * along with this program; if not, write to the Free Software
19
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20
+ *
21
+ */
22
+
23
+
24
+#include "ht_dmq.h"
25
+#include "ht_api.h"
26
+
27
+static str ht_dmq_content_type = str_init("application/json");
28
+static str dmq_200_rpl  = str_init("OK");
29
+static str dmq_400_rpl  = str_init("Bad Request");
30
+static str dmq_500_rpl  = str_init("Server Internal Error");
31
+
32
+typedef struct _ht_dmq_repdata {
33
+	int action;
34
+	str htname;
35
+	str cname;
36
+	int type;
37
+	int intval;
38
+	str strval;
39
+	int expire;
40
+} ht_dmq_repdata_t;
41
+
42
+dmq_api_t ht_dmqb;
43
+dmq_peer_t* ht_dmq_peer = NULL;
44
+dmq_resp_cback_t ht_dmq_resp_callback = {&ht_dmq_resp_callback_f, 0};
45
+
46
+/**
47
+ * @brief add notification peer
48
+ */
49
+int ht_dmq_initialize()
50
+{
51
+	dmq_peer_t not_peer;
52
+
53
+	/* load the DMQ API */
54
+	if (dmq_load_api(&ht_dmqb)!=0) {
55
+		LM_ERR("cannot load dmq api\n");
56
+		return -1;
57
+	} else {
58
+		LM_DBG("loaded dmq api\n");
59
+	}
60
+
61
+	not_peer.callback = ht_dmq_handle_msg;
62
+	not_peer.init_callback = NULL;
63
+	not_peer.description.s = "htable";
64
+	not_peer.description.len = 6;
65
+	not_peer.peer_id.s = "htable";
66
+	not_peer.peer_id.len = 6;
67
+	ht_dmq_peer = ht_dmqb.register_dmq_peer(&not_peer);
68
+	if(!ht_dmq_peer) {
69
+		LM_ERR("error in register_dmq_peer\n");
70
+		goto error;
71
+	} else {
72
+		LM_DBG("dmq peer registered\n");
73
+	}
74
+	return 0;
75
+error:
76
+	return -1;
77
+}
78
+
79
+int ht_dmq_broadcast(str* body)
80
+{
81
+	if (!ht_dmq_peer) {
82
+		LM_ERR("ht_dmq_peer is null!\n");
83
+		return -1;
84
+	}
85
+	LM_DBG("sending broadcast...\n");
86
+	ht_dmqb.bcast_message(ht_dmq_peer, body, 0, &ht_dmq_resp_callback, 1, &ht_dmq_content_type);
87
+	return 0;
88
+}
89
+
90
+/**
91
+ * @brief ht dmq callback
92
+ */
93
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
94
+{
95
+	int content_length;
96
+	str body;
97
+	ht_dmq_action_t action = HT_DMQ_NONE;
98
+	str htname, cname;
99
+	int type = 0, mode = 0;
100
+	int_str val;
101
+	srjson_doc_t jdoc;
102
+	srjson_t *it = NULL;
103
+
104
+	/* received dmq message */
105
+	LM_DBG("dmq message received\n");
106
+
107
+	srjson_InitDoc(&jdoc, NULL);
108
+
109
+	if(!msg->content_length) {
110
+		LM_ERR("no content length header found\n");
111
+		goto invalid;
112
+	}
113
+	content_length = get_content_length(msg);
114
+	if(!content_length) {
115
+		LM_DBG("content length is 0\n");
116
+		goto invalid;
117
+	}
118
+
119
+	body.s = get_body(msg);
120
+	body.len = content_length;
121
+
122
+	if (!body.s) {
123
+		LM_ERR("unable to get body\n");
124
+		goto error;
125
+	}
126
+
127
+	/* parse body */
128
+	LM_DBG("body: %.*s\n", body.len, body.s);
129
+
130
+	jdoc.buf = body;
131
+
132
+	if(jdoc.root == NULL) {
133
+		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
134
+		if(jdoc.root == NULL)
135
+		{
136
+			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
137
+			goto invalid;
138
+		}
139
+	}
140
+
141
+	for(it=jdoc.root->child; it; it = it->next)
142
+	{
143
+		LM_DBG("found field: %s\n", it->string);
144
+		if (strcmp(it->string, "action")==0) {
145
+			action = SRJSON_GET_INT(it);
146
+		} else if (strcmp(it->string, "htname")==0) {
147
+			htname.s = it->valuestring;
148
+			htname.len = strlen(htname.s);
149
+		} else if (strcmp(it->string, "cname")==0) {
150
+			cname.s = it->valuestring;
151
+			cname.len = strlen(cname.s);
152
+		} else if (strcmp(it->string, "type")==0) {
153
+			type = SRJSON_GET_INT(it);
154
+		} else if (strcmp(it->string, "strval")==0) {
155
+			val.s.s = it->valuestring;
156
+			val.s.len = strlen(val.s.s);
157
+		} else if (strcmp(it->string, "intval")==0) {
158
+			val.n = SRJSON_GET_INT(it);
159
+		} else if (strcmp(it->string, "mode")==0) {
160
+			mode = SRJSON_GET_INT(it);
161
+		} else {
162
+			LM_ERR("unrecognized field in json object\n");
163
+			goto invalid;
164
+		}
165
+	}
166
+
167
+	if (ht_dmq_replay_action(action, &htname, &cname, type, &val, mode)!=0) {
168
+		LM_ERR("failed to replay action\n");
169
+		goto error;
170
+	}
171
+
172
+	srjson_DestroyDoc(&jdoc);
173
+	resp->reason = dmq_200_rpl;
174
+	resp->resp_code = 200;
175
+	return 0;
176
+
177
+invalid:
178
+	srjson_DestroyDoc(&jdoc);
179
+	resp->reason = dmq_400_rpl;
180
+	resp->resp_code = 400;
181
+	return 0;
182
+
183
+error:
184
+	srjson_DestroyDoc(&jdoc);
185
+	resp->reason = dmq_500_rpl;
186
+	resp->resp_code = 500;
187
+	return 0;
188
+}
189
+
190
+int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode) {
191
+
192
+	srjson_doc_t jdoc;
193
+
194
+	LM_DBG("replicating action to dmq peers...\n");
195
+
196
+	srjson_InitDoc(&jdoc, NULL);
197
+
198
+	jdoc.root = srjson_CreateObject(&jdoc);
199
+	if(jdoc.root==NULL) {
200
+		LM_ERR("cannot create json root\n");
201
+		goto error;
202
+	}
203
+
204
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
205
+	srjson_AddStrToObject(&jdoc, jdoc.root, "htname", htname->s, htname->len);
206
+	if (cname!=NULL) {
207
+		srjson_AddStrToObject(&jdoc, jdoc.root, "cname", cname->s, cname->len);
208
+	}
209
+
210
+	if (action==HT_DMQ_SET_CELL || action==HT_DMQ_SET_CELL_EXPIRE || action==HT_DMQ_RM_CELL_RE) {
211
+		srjson_AddNumberToObject(&jdoc, jdoc.root, "type", type);
212
+		if (type&AVP_VAL_STR) {
213
+			srjson_AddStrToObject(&jdoc, jdoc.root, "strval", val->s.s, val->s.len);
214
+		} else {
215
+			srjson_AddNumberToObject(&jdoc, jdoc.root, "intval", val->n);
216
+		}
217
+	}
218
+
219
+	srjson_AddNumberToObject(&jdoc, jdoc.root, "mode", mode);
220
+
221
+	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
222
+	if(jdoc.buf.s!=NULL) {
223
+		jdoc.buf.len = strlen(jdoc.buf.s);
224
+		LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
225
+		if (ht_dmq_broadcast(&jdoc.buf)!=0) {