Browse code

dispatcher: algorithm 13

latency optimized round-robin with failover

Julien Chavanton authored on 29/09/2020 21:50:03
Showing 3 changed files
... ...
@@ -78,6 +78,7 @@
78 78
 #define DS_ALG_CALLLOAD 10
79 79
 #define DS_ALG_RELWEIGHT 11
80 80
 #define DS_ALG_PARALLEL 12
81
+#define DS_ALG_LATENCY 13
81 82
 
82 83
 /* increment call load */
83 84
 #define DS_LOAD_INC(dgrp, didx) do { \
... ...
@@ -2061,11 +2062,37 @@ int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
2061 2062
 		}
2062 2063
 	}
2063 2064
 
2064
-	LM_DBG("selected target destinations: %d\n", vstate.cnt);
2065
+	LM_NOTICE("selected target destinations: %d\n", vstate.cnt);
2065 2066
 
2066 2067
 	return ret;
2067 2068
 }
2068 2069
 
2070
+typedef struct sorted_ds {
2071
+	int idx;
2072
+	int priority;
2073
+} sorted_ds_t;
2074
+
2075
+int ds_manage_routes_fill_reodered_xavp(sorted_ds_t *ds_sorted, ds_set_t *idx, ds_select_state_t *rstate)
2076
+{
2077
+	int i;
2078
+	for(i=0; i < idx->nr && rstate->cnt < rstate->limit; i++) {
2079
+		if(ds_sorted[i].idx < 0 || ds_skip_dst(idx->dlist[i].flags)
2080
+				|| (ds_use_default != 0 && ds_sorted[i].idx == (idx->nr - 1))) {
2081
+			continue;
2082
+		}
2083
+		if(ds_add_xavp_record(idx, ds_sorted[i].idx, rstate->setid, rstate->alg,
2084
+					&rstate->lxavp)<0) {
2085
+			LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2086
+					ds_sorted[i].idx, rstate->setid);
2087
+			return -1;
2088
+		}
2089
+		LM_ERR("destination added in the xavp (%d/%d)\n",
2090
+					ds_sorted[i].idx, rstate->setid);
2091
+		rstate->cnt++;
2092
+	}
2093
+	return 0;
2094
+}
2095
+
2069 2096
 int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state_t *rstate)
2070 2097
 {
2071 2098
 	int i;
... ...
@@ -2125,6 +2152,79 @@ int ds_manage_routes_fill_xavp(unsigned int hash, ds_set_t *idx, ds_select_state
2125 2152
 	return 0;
2126 2153
 }
2127 2154
 
2155
+
2156
+void ds_sorted_by_priority(sorted_ds_t * sorted_ds, int size) {
2157
+	int i,ii;
2158
+	for(i=0;i<size;++i) {
2159
+		for(ii=1;ii<size;++ii) {
2160
+			sorted_ds_t temp;
2161
+			if(sorted_ds[ii-1].priority < sorted_ds[ii].priority) {
2162
+				temp.idx = sorted_ds[ii].idx;
2163
+				temp.priority = sorted_ds[ii].priority;
2164
+				sorted_ds[ii].idx = sorted_ds[ii-1].idx;
2165
+				sorted_ds[ii].priority = sorted_ds[ii-1].priority;
2166
+				sorted_ds[ii-1].idx = temp.idx;
2167
+				sorted_ds[ii-1].priority = temp.priority;
2168
+			}
2169
+		}
2170
+	}
2171
+}
2172
+
2173
+int ds_manage_route_algo13(ds_set_t *idx, ds_select_state_t *rstate) {
2174
+	int hash = idx->last;
2175
+	int y = 0;
2176
+	int z = hash;
2177
+	int active_priority = 0;
2178
+	sorted_ds_t *ds_sorted = pkg_malloc(sizeof(sorted_ds_t) * idx->nr);
2179
+	if(ds_sorted == NULL) {
2180
+		LM_ERR("no more pkg\n");
2181
+		return -1;
2182
+	}
2183
+
2184
+	for(y=0; y<idx->nr ;y++) {
2185
+		int latency_proirity_handicap = 0;
2186
+		ds_dest_t * ds_dest = &idx->dlist[z];
2187
+		int gw_priority = ds_dest->priority;
2188
+		int gw_latency = ds_dest->latency_stats.estimate;
2189
+		int gw_inactive = ds_skip_dst(ds_dest->flags);
2190
+		if(!gw_inactive) {
2191
+			if(gw_latency > gw_priority && gw_priority > 0)
2192
+				latency_proirity_handicap = gw_latency / gw_priority;
2193
+			ds_dest->attrs.rpriority = gw_priority - latency_proirity_handicap;
2194
+			if(ds_dest->attrs.rpriority < 1 && gw_priority > 0)
2195
+				ds_dest->attrs.rpriority = 1;
2196
+			if(ds_dest->attrs.rpriority > active_priority) {
2197
+				hash = z;
2198
+				active_priority = ds_dest->attrs.rpriority;
2199
+			}
2200
+			ds_sorted[y].idx = z;
2201
+			ds_sorted[y].priority = ds_dest->attrs.rpriority;
2202
+			LM_NOTICE("[active]idx[%d]uri[%.*s]priority[%d-%d=%d]latency[%dms]flag[%d]\n",
2203
+				z, ds_dest->uri.len, ds_dest->uri.s,
2204
+				gw_priority, latency_proirity_handicap,
2205
+				ds_dest->attrs.rpriority, gw_latency, ds_dest->flags);
2206
+		} else {
2207
+			ds_sorted[y].idx = -1;
2208
+			ds_sorted[y].priority = -1;
2209
+			LM_NOTICE("[inactive]idx[%d]uri[%.*s]priority[%d]latency[%dms]flag[%d]",
2210
+				z, ds_dest->uri.len, ds_dest->uri.s,
2211
+				gw_priority, gw_latency, ds_dest->flags);
2212
+		}
2213
+		if(ds_use_default != 0 && idx->nr != 1)
2214
+			z = (z + 1) % (idx->nr - 1);
2215
+		else
2216
+			z = (z + 1) % idx->nr;
2217
+	}
2218
+	idx->last = hash % idx->nr;
2219
+	LM_NOTICE("priority[%d]gateway_selected[%d]next_index[%d]\n", active_priority, hash, idx->last);
2220
+	ds_sorted_by_priority(ds_sorted, idx->nr);
2221
+	for(y=0; y<idx->nr ;y++) {
2222
+		LM_NOTICE("ds_sorted:idx[%d]priority[%d]\n", ds_sorted[y].idx, ds_sorted[y].priority);
2223
+	}
2224
+	ds_manage_routes_fill_reodered_xavp(ds_sorted, idx, rstate);
2225
+	return hash;
2226
+}
2227
+
2128 2228
 /**
2129 2229
  *
2130 2230
  */
... ...
@@ -2134,6 +2234,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
2134 2234
 	unsigned int hash;
2135 2235
 	ds_set_t *idx = NULL;
2136 2236
 	int ulast = 0;
2237
+	int xavp_filled = 0;
2137 2238
 
2138 2239
 	if(msg == NULL) {
2139 2240
 		LM_ERR("bad parameters\n");
... ...
@@ -2259,6 +2360,13 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
2259 2360
 		case DS_ALG_PARALLEL: /* 12 - parallel dispatching */
2260 2361
 			hash = 0;
2261 2362
 			break;
2363
+		case DS_ALG_LATENCY: /* 13 - latency optimized round-robin with failover */
2364
+			hash = ds_manage_route_algo13(idx, rstate);
2365
+			if (hash < 0)
2366
+				return -1;
2367
+			xavp_filled = 1;
2368
+			ulast = 1;
2369
+			break;
2262 2370
 		default:
2263 2371
 			LM_WARN("algo %d not implemented - using first entry...\n",
2264 2372
 					rstate->alg);
... ...
@@ -2274,7 +2382,7 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
2274 2382
 	i = hash;
2275 2383
 
2276 2384
 	/* if selected address is inactive, find next active */
2277
-	while(ds_skip_dst(idx->dlist[i].flags)) {
2385
+	while(!xavp_filled && ds_skip_dst(idx->dlist[i].flags)) {
2278 2386
 		if(ds_use_default != 0 && idx->nr != 1)
2279 2387
 			i = (i + 1) % (idx->nr - 1);
2280 2388
 		else
... ...
@@ -2330,8 +2438,11 @@ int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
2330 2438
 		return 1;
2331 2439
 	}
2332 2440
 
2333
-	if (ds_manage_routes_fill_xavp(hash, idx, rstate) == -1)
2334
-		return -1;
2441
+	if(!xavp_filled) {
2442
+		if(ds_manage_routes_fill_xavp(hash, idx, rstate) == -1){
2443
+			return -1;
2444
+		}
2445
+	}
2335 2446
 
2336 2447
 	/* add default dst to last position in XAVP list */
2337 2448
 	if(ds_use_default != 0 && hash != idx->nr - 1
... ...
@@ -2701,12 +2812,22 @@ int ds_update_latency(int group, str *address, int code)
2701 2812
 			int latency_ms;
2702 2813
 			/* Destination address found, this is the gateway that was pinged. */
2703 2814
 			state = ds_dest->flags;
2815
+			if (!(state & DS_PROBING_DST)) {
2816
+				i++;
2817
+				continue;
2818
+			}
2704 2819
 			if (code == 408 && latency_stats->timeout < UINT32_MAX)
2705 2820
 				latency_stats->timeout++;
2706 2821
 			gettimeofday(&now, NULL);
2707 2822
 			latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
2708 2823
 		            + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2709
-			latency_stats_update(latency_stats, latency_ms);
2824
+			if (code != 408)
2825
+				latency_stats_update(latency_stats, latency_ms);
2826
+
2827
+			LM_NOTICE("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]\n",
2828
+					latency_stats->count, latency_ms,
2829
+					latency_stats->average, address->len, address->s,
2830
+					code, ds_dest->attrs.rweight);
2710 2831
 
2711 2832
 			/* Adjusting weight using congestion detection based on latency estimator. */
2712 2833
 			if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight > 0) {
... ...
@@ -190,6 +190,7 @@ typedef struct _ds_attrs {
190 190
 	int congestion_control;
191 191
 	str ping_from;
192 192
 	str obproxy;
193
+	int rpriority;
193 194
 } ds_attrs_t;
194 195
 
195 196
 typedef struct _ds_latency_stats {
... ...
@@ -1261,6 +1261,34 @@ modparam("dispatcher", "reload_delta", 1)
1261 1261
 				making sense in this case.
1262 1262
 				</para>
1263 1263
 			</listitem>
1264
+			<listitem>
1265
+				<para>
1266
+				<quote>13</quote> - latency optimized dispatching
1267
+				</para>
1268
+				<para>
1269
+				- The algorithm will load balance using round-robin prioritizing the gateways with the highest priority.
1270
+				</para>
1271
+				<para>
1272
+				- If ds_ping_latency_stats is active the algorithm be able to adjust the priority of the gateway automaticaly,
1273
+				the priority will be lowered by 1 point every time the latency ms is as high as the priority.
1274
+				</para>
1275
+				<example>
1276
+				<title><function>latency_optimized_dispatching</function> usage</title>
1277
+				<programlisting format="linespecific">
1278
+Using this simple formula : ADJUSTED_PRIORITY = PRIORITY - (ESTIMATED_LATENCY/PRIORITY)
1279
+
1280
+GATEWAY | PRIORITY | ESTIMATED | ADJUSTED | LOAD
1281
+   #    |          |  LATENCY  | PRIORITY | DISTRIBUTION
1282
+   1    |    30    |    21     |    30    | 33%
1283
+   2    |    30    |    91     |    27    | 0%
1284
+   3    |    30    |    61     |    28    | 0%
1285
+   4    |    30    |    19     |    30    | 33%
1286
+   5    |    30    |    32     |    29    | 0%
1287
+   6    |    30    |    0      |    30    | 33%
1288
+   7    |    30    |    201    |    24    | 0%
1289
+				</programlisting>
1290
+				</example>
1291
+			</listitem>
1264 1292
 			<listitem>
1265 1293
 				<para>
1266 1294
 				<quote>X</quote> - if the algorithm is not implemented, the