Browse code

dispatcher: congestion detection load balancing

Thanks to Amy Meyers for her help !

Julien Chavanton authored on 29/03/2018 23:14:21
Showing 4 changed files
... ...
@@ -269,6 +269,9 @@ int ds_set_attrs(ds_dest_t *dest, str *attrs)
269 269
 	for(pit = params_list; pit; pit = pit->next) {
270 270
 		if(pit->name.len == 4 && strncasecmp(pit->name.s, "duid", 4) == 0) {
271 271
 			dest->attrs.duid = pit->body;
272
+		} else if(pit->name.len == 2
273
+				  && strncasecmp(pit->name.s, "cc", 2) == 0) {
274
+			str2sint(&pit->body, &dest->attrs.congestion_control);
272 275
 		} else if(pit->name.len == 6
273 276
 				  && strncasecmp(pit->name.s, "weight", 6) == 0) {
274 277
 			str2sint(&pit->body, &dest->attrs.weight);
... ...
@@ -520,6 +523,7 @@ int dp_init_relative_weights(ds_set_t *dset)
520 523
 	if(dset == NULL || dset->dlist == NULL)
521 524
 		return -1;
522 525
 
526
+	lock_get(&dset->lock);
523 527
 	int rw_sum = 0;
524 528
 	/* find the sum of relative weights*/
525 529
 	for(j = 0; j < dset->nr; j++) {
... ...
@@ -529,6 +533,7 @@ int dp_init_relative_weights(ds_set_t *dset)
529 533
 	}
530 534
 
531 535
 	if(rw_sum == 0) {
536
+		lock_release(&dset->lock);
532 537
 		return 0;
533 538
 	}
534 539
 
... ...
@@ -540,11 +545,13 @@ int dp_init_relative_weights(ds_set_t *dset)
540 545
 
541 546
 		int current_slice =
542 547
 				dset->dlist[j].attrs.rweight * 100 / rw_sum; //truncate here;
548
+		LM_DBG("rw_sum[%d][%d][%d]\n",j, rw_sum, current_slice);
543 549
 		for(k = 0; k < current_slice; k++) {
544 550
 			dset->rwlist[t] = (unsigned int)j;
545 551
 			t++;
546 552
 		}
547 553
 	}
554
+
548 555
 	/* if the array was not completely filled (i.e., the sum of rweights is
549 556
 	 * less than 100 due to truncated), then use last address to fill the rest */
550 557
 	unsigned int last_insert =
... ...
@@ -557,7 +564,7 @@ int dp_init_relative_weights(ds_set_t *dset)
557 564
 	 * sending first 20 calls to it, but ensure that within a 100 calls,
558 565
 	 * 20 go to first address */
559 566
 	shuffle_uint100array(dset->rwlist);
560
-
567
+	lock_release(&dset->lock);
561 568
 	return 0;
562 569
 }
563 570
 
... ...
@@ -2290,6 +2297,8 @@ static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int l
2290 2297
 		latency_stats->average = latency;
2291 2298
 		latency_stats->estimate = latency;
2292 2299
 	}
2300
+	/* train the average if stable after 10 samples */
2301
+	if (latency_stats->count > 10 && latency_stats->stdev < 0.5) latency_stats->count = 500000;
2293 2302
 	if (latency_stats->min > latency)
2294 2303
 		latency_stats->min = latency;
2295 2304
 	if (latency_stats->max < latency)
... ...
@@ -2329,29 +2338,81 @@ int ds_update_latency(int group, str *address, int code)
2329 2338
 		LM_ERR("destination set [%d] not found\n", group);
2330 2339
 		return -1;
2331 2340
 	}
2332
-
2333
-	while(i < idx->nr) {
2334
-		if(idx->dlist[i].uri.len == address->len
2335
-				&& strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2336
-						   == 0) {
2337
-
2338
-			/* destination address found */
2339
-			state = idx->dlist[i].flags;
2340
-			ds_latency_stats_t *latency_stats = &idx->dlist[i].latency_stats;
2341
-			if (code == 408 && latency_stats->timeout < UINT32_MAX) {
2341
+	int apply_rweights = 0;
2342
+	int all_gw_congested = 1;
2343
+	int total_congestion_ms = 0;
2344
+	lock_get(&idx->lock);
2345
+	while (i < idx->nr) {
2346
+		ds_dest_t *ds_dest = &idx->dlist[i];
2347
+		ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2348
+		if (ds_dest->uri.len == address->len
2349
+				&& strncasecmp(ds_dest->uri.s, address->s, address->len) == 0) {
2350
+			/* Destination address found, this is the gateway that was pinged. */
2351
+			state = ds_dest->flags;
2352
+			if (code == 408 && latency_stats->timeout < UINT32_MAX)
2342 2353
 				latency_stats->timeout++;
2343
-			} else {
2344
-				struct timeval now;
2345
-				gettimeofday(&now, NULL);
2346
-				int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
2347
-			            + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2348
-				latency_stats_update(latency_stats, latency_ms);
2349
-				LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]\n", latency_stats->count, latency_ms,
2350
-					 latency_stats->average, address->len, address->s, code);
2354
+			struct timeval now;
2355
+			gettimeofday(&now, NULL);
2356
+			int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
2357
+		            + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2358
+			latency_stats_update(latency_stats, latency_ms);
2359
+
2360
+			int congestion_ms = latency_stats->estimate - latency_stats->average;
2361
+			if (congestion_ms < 0) congestion_ms = 0;
2362
+			total_congestion_ms += congestion_ms;
2363
+
2364
+			/* Adjusting weight using congestion detection based on latency estimator. */
2365
+			if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight) {
2366
+				int active_weight = ds_dest->attrs.weight - congestion_ms;
2367
+				if (active_weight <= 0) {
2368
+					active_weight = 0;
2369
+				} else {
2370
+					all_gw_congested = 0;
2371
+				}
2372
+				if (ds_dest->attrs.rweight != active_weight) {
2373
+					apply_rweights = 1;
2374
+					ds_dest->attrs.rweight = active_weight;
2375
+				}
2376
+				LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]cms[%d]\n",
2377
+					latency_stats->count, latency_ms,
2378
+					latency_stats->average, address->len, address->s,
2379
+					code, ds_dest->attrs.rweight, congestion_ms);
2351 2380
 			}
2352
-		}
2381
+		} else {
2382
+			/* Another gateway in the set, we verify if it is congested. */
2383
+			int congestion_ms = latency_stats->estimate - latency_stats->average;
2384
+			if (congestion_ms < 0) congestion_ms = 0;
2385
+			total_congestion_ms += congestion_ms;
2386
+			int active_weight = ds_dest->attrs.weight - congestion_ms;
2387
+			if (active_weight > 0) all_gw_congested = 0;
2388
+		}
2389
+		if (!ds_dest->attrs.congestion_control) all_gw_congested = 0;
2353 2390
 		i++;
2354 2391
 	}
2392
+	/* All the GWs are above their congestion threshold, load distribution will now be based on
2393
+	 * the ratio of congestion_ms each GW is facing. */
2394
+	if (all_gw_congested) {
2395
+		i = 0;
2396
+		while (i < idx->nr) {
2397
+			ds_dest_t *ds_dest = &idx->dlist[i];
2398
+			ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2399
+			int congestion_ms = latency_stats->estimate - latency_stats->average;
2400
+			/* We multiply by 2^4 to keep enough precision */
2401
+			int active_weight = (total_congestion_ms << 4) / congestion_ms;
2402
+			if (ds_dest->attrs.rweight != active_weight) {
2403
+				apply_rweights = 1;
2404
+				ds_dest->attrs.rweight = active_weight;
2405
+			}
2406
+			LM_DBG("all gw congested[%d][%d]latency_avg[%.2f][%.*s]code[%d]rweight[%d/%d:%d]cms[%d]\n",
2407
+				        total_congestion_ms, latency_stats->count, latency_stats->average,
2408
+				        address->len, address->s, code, total_congestion_ms, congestion_ms,
2409
+				        ds_dest->attrs.rweight, congestion_ms);
2410
+		i++;
2411
+		}
2412
+	}
2413
+
2414
+	lock_release(&idx->lock);
2415
+	if (apply_rweights) dp_init_relative_weights(idx);
2355 2416
 	return state;
2356 2417
 }
2357 2418
 
... ...
@@ -3099,7 +3160,7 @@ ds_set_t *ds_avl_insert(ds_set_t **root, int id, int *setn)
3099 3160
 		node->id = id;
3100 3161
 		node->longer = AVL_NEITHER;
3101 3162
 		*root = node;
3102
-
3163
+		lock_init(&node->lock);
3103 3164
 		avl_rebalance(rotation_top, id);
3104 3165
 
3105 3166
 		(*setn)++;
... ...
@@ -155,6 +155,7 @@ typedef struct _ds_attrs {
155 155
 	int maxload;
156 156
 	int weight;
157 157
 	int rweight;
158
+	int congestion_control;
158 159
 } ds_attrs_t;
159 160
 
160 161
 typedef struct _ds_latency_stats {
... ...
@@ -195,6 +196,7 @@ typedef struct _ds_set {
195 196
 	unsigned int rwlist[100];
196 197
 	struct _ds_set *next[2];
197 198
 	int longer;
199
+	gen_lock_t lock;
198 200
 } ds_set_t;
199 201
 /* clang-format on */
200 202
 
... ...
@@ -81,7 +81,7 @@
81 81
             <holder>Alessandro Arrichiello, Hewlett Packard</holder>
82 82
         </copyright>
83 83
 	<copyright>
84
-            <year>2017</year>
84
+            <year>2017, 2018</year>
85 85
             <holder>Julien chavanton, Flowroute</holder>
86 86
         </copyright>
87 87
    </bookinfo>
... ...
@@ -1110,6 +1110,19 @@ end
1110 1110
 				will be distributed as 25/50/25. After third host failing
1111 1111
 				distribution will be changed to 33/67/0.
1112 1112
 				</para>
1113
+				<para>
1114
+				Using this algorithm, you can also enable congestion control by setting the
1115
+				attibute 'cc=1', when 'cc' is enabled the 'rweight' attribute will also be
1116
+				used to control congestion tolerance. When facing congestion the weight of
1117
+				a gateway is lowered by 1 for every ms of estimated congestion, a 'rweight'
1118
+				value of 50 is recommended. See the example "configuring load balancing with
1119
+				congestion detection" bellow.
1120
+				</para>
1121
+				<para>
1122
+				The congestion estimation is done using an EWMA (see ds_latency_estimator_alpha).
1123
+				If all the gateways in a set are above their congestion threshold(weight), the
1124
+				load distribution is instead done using the ratio of estimated congestion ms.
1125
+				</para>
1113 1126
 			</listitem>
1114 1127
 			<listitem>
1115 1128
 				<para>
... ...
@@ -1150,6 +1163,48 @@ ds_select_dst("1", "$var(a)");
1150 1163
 ...
1151 1164
 ds_select_dst("1", "4", "3");
1152 1165
 ...
1166
+</programlisting>
1167
+		</example>
1168
+		<example>
1169
+		<title>configuring load balancing with congestion detection</title>
1170
+		<programlisting format="linespecific">
1171
+...
1172
+# sample of SQL provisionning statements
1173
+INSERT INTO "dispatcher" 
1174
+VALUES(1,1,'sip:192.168.0.1:5060',0,12,'rweight=50;weight=50;cc=1;','');
1175
+INSERT INTO "dispatcher" 
1176
+VALUES(2,1,'sip:192.168.0.2:5060',0,12,'rweight=50;weight=50;cc=1;','');
1177
+...
1178
+modparam("dispatcher", "ds_ping_interval", 1) # ping gateways once/second
1179
+modparam("dispatcher", "ds_ping_latency_stats", 1) # update congestion metrics
1180
+# configure the latency estimator
1181
+modparam("dispatcher", "ds_latency_estimator_alpha", 900)
1182
+...
1183
+if (!ds_select_dst("1", "11")) { # use relative weight based load distribution
1184
+...
1185
+# sample of output from 'kamcmd dispatcher.list'
1186
+DEST: {
1187
+	URI: sip:192.168.0.1:5060
1188
+	FLAGS: AP
1189
+	PRIORITY: 12
1190
+	ATTRS: {
1191
+		BODY: rweight=50;weight=50;cc=1 # configuration values
1192
+		DUID: 
1193
+		MAXLOAD: 0
1194
+		WEIGHT: 50
1195
+		RWEIGHT: 50
1196
+		SOCKET: 
1197
+	}
1198
+	LATENCY: {
1199
+		AVG: 20.104000
1200
+		STD: 1.273000
1201
+		# estimated congestion is currently 25ms = 45ms(EST) -20ms(AVG)
1202
+		EST: 45.005000
1203
+		MAX: 132
1204
+		TIMEOUT: 3
1205
+	}
1206
+}
1207
+...
1153 1208
 </programlisting>
1154 1209
 		</example>
1155 1210
 	</section>