Browse code

dispatcher: relative weight distribution added

- it is possible to assign "rweight" (relative weight) param to each host in destination group.
rweight is in the integer range from 1 to 100.
Active host usage probability is rweight/(sum of all active host rweights in destination group).
So INACTIVE/DISABLED destinations are removed from probability calculation.

Savolainen Dmitri authored on 20/06/2015 19:02:28
Showing 3 changed files
... ...
@@ -88,6 +88,8 @@ int *next_idx   = NULL;
88 88
 static void ds_run_route(struct sip_msg *msg, str *uri, char *route);
89 89
 
90 90
 void destroy_list(int);
91
+void shuffle_uint100array(unsigned int* arr);
92
+int ds_reinit_rweight_on_state_change(int old_state, int new_state, ds_set_t *dset);
91 93
 
92 94
 /**
93 95
  *
... ...
@@ -131,12 +133,13 @@ int ds_print_sets(void)
131 133
 	{
132 134
 		for(i=0; i<si->nr; i++)
133 135
 		{
134
-			LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d)\n", si->id,
136
+			LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d,%d)\n", si->id,
135 137
 					si->dlist[i].uri.len, si->dlist[i].uri.s,
136 138
 					si->dlist[i].flags, si->dlist[i].priority,
137 139
 					si->dlist[i].attrs.duid.len, si->dlist[i].attrs.duid.s,
138 140
 					si->dlist[i].attrs.maxload,
139
-					si->dlist[i].attrs.weight);
141
+					si->dlist[i].attrs.weight,
142
+					si->dlist[i].attrs.rweight);
140 143
 		}
141 144
 		si = si->next;
142 145
 	}
... ...
@@ -217,7 +220,18 @@ int ds_set_attrs(ds_dest_t *dest, str *attrs)
217 220
 		} else if(pit->name.len==6
218 221
 				&& strncasecmp(pit->name.s, "socket", 6)==0) {
219 222
 			dest->attrs.socket = pit->body;
223
+		}else if(pit->name.len==7
224
+				&& strncasecmp(pit->name.s, "rweight", 7)==0) {
225
+			int tmp_rweight;
226
+			str2sint(&pit->body, &tmp_rweight);
227
+			if ( tmp_rweight>=1 && tmp_rweight<=100 ) {
228
+				dest->attrs.rweight = tmp_rweight;
229
+			}
230
+			else{
231
+				LM_ERR("rweight %d not in 1-100 range; skipped", tmp_rweight);
232
+			}
220 233
 		}
234
+
221 235
 	}
222 236
 	return 0;
223 237
 }
... ...
@@ -397,6 +411,81 @@ err:
397 411
 	return -1;
398 412
 }
399 413
 
414
+
415
+/* for internal usage; arr must be arr[100] */
416
+void shuffle_uint100array(unsigned int* arr){
417
+	if (arr == NULL)
418
+		return;
419
+	int k;
420
+	int j;
421
+	unsigned int t;
422
+	srand(time(0));
423
+	for (j=0; j<100; j++)
424
+	{
425
+		k = j + (rand() % (100-j));
426
+		t = arr[j];
427
+		arr[j] = arr[k];
428
+		arr[k] = t;
429
+	}
430
+}
431
+
432
+
433
+/**
434
+ * Initialize the relative weight distribution for a destination set
435
+ * - fill the array of 0..99 elements where to keep the index of the
436
+ *   destination address to be used. The Nth call will use
437
+ *   the address with the index at possition N%100
438
+ */
439
+int dp_init_relative_weights(ds_set_t *dset)
440
+{
441
+	int j;
442
+	int k;
443
+	int t;
444
+
445
+	if(dset==NULL || dset->dlist==NULL)
446
+		return -1;
447
+	
448
+	int rw_sum = 0;
449
+	/* find the sum of relative weights*/
450
+	for(j=0; j<dset->nr; j++){
451
+		if( ds_skip_dst(dset->dlist[j].flags ) )
452
+			continue;
453
+		rw_sum += dset->dlist[j].attrs.rweight;
454
+	}
455
+
456
+	if (rw_sum == 0){
457
+		return 0;
458
+	}
459
+
460
+	/* fill the array based on the relative weight of each destination */
461
+	t = 0;
462
+	for(j=0; j<dset->nr; j++)
463
+	{
464
+		if( ds_skip_dst(dset->dlist[j].flags ) )
465
+			continue;	
466
+
467
+		int current_slice = dset->dlist[j].attrs.rweight*100/rw_sum;  //truncate here; 
468
+		for(k=0; k<current_slice; k++)
469
+		{
470
+			dset->rwlist[t] = (unsigned int)j;
471
+			t++;
472
+		}
473
+	}
474
+	/* if the array was not completely filled (i.e., the sum of weights is
475
+	 * less than 100 due to truncated), then use last address to fill the rest */
476
+	for(; t<100; t++)
477
+		dset->rwlist[t] = (unsigned int)(dset->nr-1);
478
+
479
+	/* shuffle the content of the array in order to mix the selection
480
+	 * of the addresses (e.g., if first address has weight=20, avoid
481
+	 * sending first 20 calls to it, but ensure that within a 100 calls,
482
+	 * 20 go to first address */
483
+	shuffle_uint100array(dset->rwlist);
484
+
485
+	return 0;
486
+}
487
+
488
+
400 489
 /**
401 490
  * Initialize the weight distribution for a destination set
402 491
  * - fill the array of 0..99 elements where to keep the index of the
... ...
@@ -441,14 +530,7 @@ randomize:
441 530
 	 * of the addresses (e.g., if first address has weight=20, avoid
442 531
 	 * sending first 20 calls to it, but ensure that within a 100 calls,
443 532
 	 * 20 go to first address */
444
-	srand(time(0));
445
-	for (j=0; j<100; j++)
446
-	{
447
-		k = j + (rand() % (100-j));
448
-		t = (int)dset->wlist[j];
449
-		dset->wlist[j] = dset->wlist[k];
450
-		dset->wlist[k] = (unsigned int)t;
451
-	}
533
+	shuffle_uint100array(dset->wlist);
452 534
 
453 535
 	return 0;
454 536
 }
... ...
@@ -488,6 +570,7 @@ int reindex_dests(int list_idx, int setn)
488 570
 		}
489 571
 		sp->dlist = dp0;
490 572
 		dp_init_weights(sp);
573
+		dp_init_relative_weights(sp);
491 574
 	}
492 575
 
493 576
 	LM_DBG("found [%d] dest sets\n", setn);
... ...
@@ -1799,6 +1882,10 @@ int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, unsigned int limit, in
1799 1882
 				}
1800 1883
 			}
1801 1884
 			break;
1885
+		case 11: /* relative weight based distribution */
1886
+			hash = idx->rwlist[idx->rwlast];
1887
+			idx->rwlast = (idx->rwlast+1) % 100;
1888
+			break;
1802 1889
 		default:
1803 1890
 			LM_WARN("algo %d not implemented - using first entry...\n", alg);
1804 1891
 			hash = 0;
... ...
@@ -2292,6 +2379,8 @@ int ds_update_state(sip_msg_t *msg, int group, str *address, int state)
2292 2379
 				if(ds_skip_dst(old_state) && !ds_skip_dst(idx->dlist[i].flags))
2293 2380
 					ds_run_route(msg, address, "dispatcher:dst-up");
2294 2381
 			}
2382
+			if (idx->dlist[i].attrs.rweight > 0)
2383
+				ds_reinit_rweight_on_state_change(old_state, idx->dlist[i].flags, idx);
2295 2384
 
2296 2385
 			return 0;
2297 2386
 		}
... ...
@@ -2343,6 +2432,26 @@ static void ds_run_route(sip_msg_t *msg, str *uri, char *route)
2343 2432
 	set_route_type(backup_rt);
2344 2433
 }
2345 2434
 
2435
+
2436
+/**
2437
+ recalculate relative states if some destination state was changed
2438
+ */
2439
+int ds_reinit_rweight_on_state_change(int old_state, int new_state, ds_set_t *dset)
2440
+{
2441
+	if (dset == NULL){
2442
+		LM_ERR("destination set is null\n");
2443
+		return -1;
2444
+	}
2445
+	if (	(!ds_skip_dst(old_state) && ds_skip_dst(new_state)) ||
2446
+		(ds_skip_dst(old_state) && !ds_skip_dst(new_state)) )
2447
+	{
2448
+		dp_init_relative_weights(dset);
2449
+	}
2450
+
2451
+	return 0;
2452
+}
2453
+
2454
+
2346 2455
 /**
2347 2456
  *
2348 2457
  */
... ...
@@ -2370,10 +2479,15 @@ int ds_reinit_state(int group, str *address, int state)
2370 2479
 				&& strncasecmp(idx->dlist[i].uri.s, address->s,
2371 2480
 					address->len)==0)
2372 2481
 		{
2482
+			int old_state = idx->dlist[i].flags;
2373 2483
 			/* reset the bits used for states */
2374 2484
 			idx->dlist[i].flags &= ~(DS_STATES_ALL);
2375 2485
 			/* set the new states */
2376 2486
 			idx->dlist[i].flags |= state;
2487
+			if (idx->dlist[i].attrs.rweight > 0){
2488
+				ds_reinit_rweight_on_state_change(old_state, idx->dlist[i].flags, idx);
2489
+			}
2490
+
2377 2491
 			return 0;
2378 2492
 		}
2379 2493
 	}
... ...
@@ -149,6 +149,7 @@ typedef struct _ds_attrs
149 149
 	str socket;
150 150
 	int maxload;
151 151
 	int weight;
152
+	int rweight;
152 153
 } ds_attrs_t;
153 154
 
154 155
 typedef struct _ds_dest
... ...
@@ -172,8 +173,10 @@ typedef struct _ds_set
172 173
 	int nr;				/*!< number of items in dst set */
173 174
 	int last;			/*!< last used item in dst set (round robin) */
174 175
 	int wlast;			/*!< last used item in dst set (by weight) */
176
+	int rwlast;			/*!< last used item in dst set (by relaitive weight) */
175 177
 	ds_dest_t *dlist;
176 178
 	unsigned int wlist[100];
179
+	unsigned int rwlist[100];
177 180
 	struct _ds_set *next;
178 181
 } ds_set_t;
179 182
 
... ...
@@ -1211,12 +1211,13 @@ static void dispatcher_rpc_list(rpc_t* rpc, void* ctx)
1211 1211
 					rpc->fault(ctx, 500, "Internal error creating dest struct");
1212 1212
 					return;
1213 1213
 				}
1214
-				if(rpc->struct_add(wh, "SSddS",
1214
+				if(rpc->struct_add(wh, "SSdddS",
1215 1215
 							"BODY", &(list->dlist[j].attrs.body),
1216 1216
 							"DUID", (list->dlist[j].attrs.duid.s)?
1217 1217
 							&(list->dlist[j].attrs.duid):&data,
1218 1218
 							"MAXLOAD", list->dlist[j].attrs.maxload,
1219 1219
 							"WEIGHT", list->dlist[j].attrs.weight,
1220
+							"RWEIGHT", list->dlist[j].attrs.rweight,
1220 1221
 							"SOCKET", (list->dlist[j].attrs.socket.s)?
1221 1222
 							&(list->dlist[j].attrs.socket):&data)<0)
1222 1223
 				{