Browse code

dispatcher: congestion control refactoring

- decouple latency update with congestion control further
- decouple congestion control with weighted congestion control, make
sure that miss-configuration (no positive weight and/or no cc attributes) are handle
gracefully)
- fix an error in the doc

Julien Chavanton authored on 20/10/2020 17:46:25
Showing 2 changed files
... ...
@@ -2639,14 +2639,47 @@ static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int l
2639 2639
 	}
2640 2640
 }
2641 2641
 
2642
+typedef struct congestion_control_state {
2643
+	int gw_congested_count;
2644
+	int gw_normal_count;
2645
+	int total_congestion_ms;
2646
+	int enabled;
2647
+	int apply_rweights;
2648
+} congestion_control_state_t;
2649
+
2650
+int ds_update_weighted_congestion_control(congestion_control_state_t *cc, int weight, ds_latency_stats_t *latency_stats)
2651
+{
2652
+	int active_weight = 0;
2653
+	int congestion_ms = latency_stats->estimate - latency_stats->average;
2654
+	if (weight <= 0) return 0;
2655
+	if (congestion_ms < 0) congestion_ms = 0;
2656
+	cc->total_congestion_ms += congestion_ms;
2657
+	active_weight = weight - congestion_ms;
2658
+	if (active_weight < 0) active_weight = 0;
2659
+	if (active_weight == 0) {
2660
+		cc->gw_congested_count++;
2661
+	} else {
2662
+		cc->gw_normal_count++;
2663
+	}
2664
+	return active_weight;
2665
+}
2666
+
2667
+void ds_init_congestion_control_state(congestion_control_state_t *cc)
2668
+{
2669
+	cc->gw_congested_count = 0;
2670
+	cc->gw_normal_count = 0;
2671
+	cc->total_congestion_ms = 0;
2672
+	cc->enabled = 1;
2673
+	cc->apply_rweights = 0;
2674
+}
2675
+
2642 2676
 int ds_update_latency(int group, str *address, int code)
2643 2677
 {
2644 2678
 	int i = 0;
2645 2679
 	int state = 0;
2646 2680
 	ds_set_t *idx = NULL;
2647
-	int apply_rweights = 0;
2648
-	int all_gw_congested = 1;
2649
-	int total_congestion_ms = 0;
2681
+	congestion_control_state_t cc;
2682
+	ds_init_congestion_control_state(&cc);
2650 2683
 
2651 2684
 	if(_ds_list == NULL || _ds_list_nr <= 0) {
2652 2685
 		LM_ERR("the list is null\n");
... ...
@@ -2666,7 +2699,6 @@ int ds_update_latency(int group, str *address, int code)
2666 2699
 				&& strncasecmp(ds_dest->uri.s, address->s, address->len) == 0) {
2667 2700
 			struct timeval now;
2668 2701
 			int latency_ms;
2669
-			int congestion_ms;
2670 2702
 			/* Destination address found, this is the gateway that was pinged. */
2671 2703
 			state = ds_dest->flags;
2672 2704
 			if (code == 408 && latency_stats->timeout < UINT32_MAX)
... ...
@@ -2676,43 +2708,28 @@ int ds_update_latency(int group, str *address, int code)
2676 2708
 		            + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2677 2709
 			latency_stats_update(latency_stats, latency_ms);
2678 2710
 
2679
-			congestion_ms = latency_stats->estimate - latency_stats->average;
2680
-			if (congestion_ms < 0) congestion_ms = 0;
2681
-			total_congestion_ms += congestion_ms;
2682
-
2683 2711
 			/* Adjusting weight using congestion detection based on latency estimator. */
2684
-			if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight) {
2685
-				int active_weight = ds_dest->attrs.weight - congestion_ms;
2686
-				if (active_weight <= 0) {
2687
-					active_weight = 0;
2688
-				} else {
2689
-					all_gw_congested = 0;
2690
-				}
2712
+			if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) {
2713
+				int active_weight = ds_update_weighted_congestion_control(&cc, ds_dest->attrs.weight, latency_stats);
2691 2714
 				if (ds_dest->attrs.rweight != active_weight) {
2692
-					apply_rweights = 1;
2715
+					cc.apply_rweights = 1;
2693 2716
 					ds_dest->attrs.rweight = active_weight;
2694 2717
 				}
2695 2718
 				LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]cms[%d]\n",
2696 2719
 					latency_stats->count, latency_ms,
2697 2720
 					latency_stats->average, address->len, address->s,
2698
-					code, ds_dest->attrs.rweight, congestion_ms);
2721
+					code, ds_dest->attrs.rweight, ds_dest->attrs.weight - active_weight);
2699 2722
 			}
2700
-		} else {
2701
-			/* Another gateway in the set, we verify if it is congested. */
2702
-			int congestion_ms;
2703
-			int active_weight;
2704
-			congestion_ms = latency_stats->estimate - latency_stats->average;
2705
-			if (congestion_ms < 0) congestion_ms = 0;
2706
-			total_congestion_ms += congestion_ms;
2707
-			active_weight = ds_dest->attrs.weight - congestion_ms;
2708
-			if (active_weight > 0) all_gw_congested = 0;
2723
+		} else if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) {
2724
+			/* This is another gateway in the set, we verify if it is congested. */
2725
+			ds_update_weighted_congestion_control(&cc, ds_dest->attrs.weight, latency_stats);
2709 2726
 		}
2710
-		if (!ds_dest->attrs.congestion_control) all_gw_congested = 0;
2727
+		if (!ds_dest->attrs.congestion_control) cc.enabled = 0;
2711 2728
 		i++;
2712 2729
 	}
2713 2730
 	/* All the GWs are above their congestion threshold, load distribution will now be based on
2714 2731
 	 * the ratio of congestion_ms each GW is facing. */
2715
-	if (all_gw_congested) {
2732
+	if (cc.enabled && cc.gw_congested_count > 1 && cc.gw_normal_count == 0) {
2716 2733
 		i = 0;
2717 2734
 		while (i < idx->nr) {
2718 2735
 			int congestion_ms;
... ...
@@ -2721,21 +2738,21 @@ int ds_update_latency(int group, str *address, int code)
2721 2738
 			ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2722 2739
 			congestion_ms = latency_stats->estimate - latency_stats->average;
2723 2740
 			/* We multiply by 2^4 to keep enough precision */
2724
-			active_weight = (total_congestion_ms << 4) / congestion_ms;
2741
+			active_weight = (cc.total_congestion_ms << 4) / congestion_ms;
2725 2742
 			if (ds_dest->attrs.rweight != active_weight) {
2726
-				apply_rweights = 1;
2743
+				cc.apply_rweights = 1;
2727 2744
 				ds_dest->attrs.rweight = active_weight;
2728 2745
 			}
2729 2746
 			LM_DBG("all gw congested[%d][%d]latency_avg[%.2f][%.*s]code[%d]rweight[%d/%d:%d]cms[%d]\n",
2730
-				        total_congestion_ms, latency_stats->count, latency_stats->average,
2731
-				        address->len, address->s, code, total_congestion_ms, congestion_ms,
2747
+				        cc.total_congestion_ms, latency_stats->count, latency_stats->average,
2748
+				        ds_dest->uri.len, ds_dest->uri.s, code, cc.total_congestion_ms, congestion_ms,
2732 2749
 				        ds_dest->attrs.rweight, congestion_ms);
2733 2750
 		i++;
2734 2751
 		}
2735 2752
 	}
2736 2753
 
2737 2754
 	lock_release(&idx->lock);
2738
-	if (apply_rweights) dp_init_relative_weights(idx);
2755
+	if (cc.enabled && cc.apply_rweights) dp_init_relative_weights(idx);
2739 2756
 	return state;
2740 2757
 }
2741 2758
 
... ...
@@ -1241,7 +1241,7 @@ modparam("dispatcher", "reload_delta", 1)
1241 1241
 				</para>
1242 1242
 				<para>
1243 1243
 				Using this algorithm, you can also enable congestion control by setting the
1244
-				attibute 'cc=1', when 'cc' is enabled the 'rweight' attribute will also be
1244
+				attibute 'cc=1', when 'cc' is enabled the 'weight' attribute will also be
1245 1245
 				used to control congestion tolerance. When facing congestion the weight of
1246 1246
 				a gateway is lowered by 1 for every ms of estimated congestion, a 'rweight'
1247 1247
 				value of 50 is recommended. See the example "configuring load balancing with