Browse code

dispatcher: latency statistics

Julien Chavanton authored on 11/09/2017 22:15:49
Showing 5 changed files
... ...
@@ -31,6 +31,7 @@
31 31
 #include <string.h>
32 32
 #include <stdlib.h>
33 33
 #include <time.h>
34
+#include <math.h>
34 35
 
35 36
 #include "../../core/ut.h"
36 37
 #include "../../core/trim.h"
... ...
@@ -85,6 +86,8 @@ static int *_ds_ping_active = NULL;
85 85
 
86 86
 extern int ds_force_dst;
87 87
 extern str ds_event_callback;
88
+extern int ds_ping_latency_stats;
89
+extern float ds_latency_estimator_alpha;
88 90
 
89 91
 static db_func_t ds_dbf;
90 92
 static db1_con_t *ds_db_handle = NULL;
... ...
@@ -2271,6 +2274,85 @@ int ds_mark_dst(struct sip_msg *msg, int state)
2271 2271
 	return (ret == 0) ? 1 : -1;
2272 2272
 }
2273 2273
 
2274
+
2275
+static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int latency) {
2276
+	float current_average, current_q;
2277
+	/* after 2^21 smaples, ~24 days at 1s interval, the average becomes weighted moving average */
2278
+	if (latency_stats->count < 2097152)
2279
+		latency_stats->count++;
2280
+	if (latency_stats->count == 1) {
2281
+		latency_stats->stdev = 0.0f;
2282
+		latency_stats->last_q = 0.0f;
2283
+		latency_stats->max = latency;
2284
+		latency_stats->min = latency;
2285
+		latency_stats->average = latency;
2286
+		latency_stats->estimate = latency;
2287
+	}
2288
+	if (latency_stats->min > latency)
2289
+		latency_stats->min = latency;
2290
+	if (latency_stats->max < latency)
2291
+		latency_stats->max = latency;
2292
+
2293
+	/* standard deviation of the average/weighted moving average */
2294
+	if (latency_stats->count > 1) {
2295
+		current_average = latency_stats->average + (latency - latency_stats->average) / latency_stats->count;
2296
+		current_q = latency_stats->last_q + (latency - latency_stats->average)*(latency - current_average);
2297
+		latency_stats->average = current_average;
2298
+		latency_stats->last_q = current_q;
2299
+		latency_stats->stdev = sqrt(current_q/(latency_stats->count-1));
2300
+	}
2301
+	/* exponentialy weighted moving average */
2302
+	if (latency_stats->count < 10) {
2303
+		latency_stats->estimate = latency_stats->average;
2304
+	} else {
2305
+		latency_stats->estimate = latency_stats->estimate*ds_latency_estimator_alpha
2306
+		                          + latency*(1-ds_latency_estimator_alpha);
2307
+	}
2308
+}
2309
+
2310
+int ds_update_latency(int group, str *address, int code)
2311
+{
2312
+	int i = 0;
2313
+	int state = 0;
2314
+	ds_set_t *idx = NULL;
2315
+
2316
+	if(_ds_list == NULL || _ds_list_nr <= 0) {
2317
+		LM_ERR("the list is null\n");
2318
+		return -1;
2319
+	}
2320
+
2321
+	/* get the index of the set */
2322
+	if(ds_get_index(group, *crt_idx, &idx) != 0) {
2323
+		LM_ERR("destination set [%d] not found\n", group);
2324
+		return -1;
2325
+	}
2326
+
2327
+	while(i < idx->nr) {
2328
+		if(idx->dlist[i].uri.len == address->len
2329
+				&& strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2330
+						   == 0) {
2331
+
2332
+			/* destination address found */
2333
+			state = idx->dlist[i].flags;
2334
+			ds_latency_stats_t *latency_stats = &idx->dlist[i].latency_stats;
2335
+			if (code == 408 && latency_stats->timeout < UINT32_MAX) {
2336
+				latency_stats->timeout++;
2337
+			} else {
2338
+				struct timeval now;
2339
+				gettimeofday(&now, NULL);
2340
+				int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
2341
+			            + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2342
+				latency_stats_update(latency_stats, latency_ms);
2343
+				LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]\n", latency_stats->count, latency_ms,
2344
+					 latency_stats->average, address->len, address->s, code);
2345
+			}
2346
+		}
2347
+		i++;
2348
+	}
2349
+	return state;
2350
+}
2351
+
2352
+
2274 2353
 /**
2275 2354
  * Get state for given destination
2276 2355
  */
... ...
@@ -2735,6 +2817,8 @@ static void ds_options_callback(
2735 2735
 	uri.len = t->to.len - 8;
2736 2736
 	LM_DBG("OPTIONS-Request was finished with code %d (to %.*s, group %d)\n",
2737 2737
 			ps->code, uri.len, uri.s, group);
2738
+	if (ds_ping_latency_stats)
2739
+		ds_update_latency(group, &uri, ps->code);
2738 2740
 	/* ps->code contains the result-code of the request.
2739 2741
 	 *
2740 2742
 	 * We accept both a "200 OK" or the configured reply as a valid response */
... ...
@@ -2805,6 +2889,9 @@ void ds_ping_set(ds_set_t *node)
2805 2805
 					  && ds_default_socket.len > 0) {
2806 2806
 				uac_r.ssock = &ds_default_socket;
2807 2807
 			}
2808
+
2809
+			gettimeofday(&node->dlist[j].latency_stats.start, NULL);
2810
+
2808 2811
 			if(tmb.t_request(&uac_r, &node->dlist[j].uri, &node->dlist[j].uri,
2809 2812
 					   &ds_ping_from, &ds_outbound_proxy)
2810 2813
 					< 0) {
... ...
@@ -156,12 +156,25 @@ typedef struct _ds_attrs {
156 156
 	int rweight;
157 157
 } ds_attrs_t;
158 158
 
159
+typedef struct _ds_latency_stats {
160
+	struct timeval start;
161
+	int min;
162
+	int max;
163
+	float average;  // weigthed average, estimate of the last few weeks
164
+	float stdev;    // last standard deviation
165
+	float estimate; // short term estimate, EWMA exponential weighted moving average
166
+	float last_q;   // q for N-1
167
+	int32_t count;
168
+	uint32_t timeout;
169
+} ds_latency_stats_t;
170
+
159 171
 typedef struct _ds_dest {
160 172
 	str uri;
161 173
 	int flags;
162 174
 	int priority;
163 175
 	int dload;
164 176
 	ds_attrs_t attrs;
177
+	ds_latency_stats_t latency_stats;
165 178
 	struct socket_info * sock;
166 179
 	struct ip_addr ip_address; 	/*!< IP-Address of the entry */
167 180
 	unsigned short int port; 	/*!< Port of the URI */
... ...
@@ -103,6 +103,9 @@ int inactive_threshold = 1; /* number of replied requests, before a destination
103 103
 str ds_ping_method = str_init("OPTIONS");
104 104
 str ds_ping_from   = str_init("sip:dispatcher@localhost");
105 105
 static int ds_ping_interval = 0;
106
+int ds_ping_latency_stats = 0;
107
+int ds_latency_estimator_alpha_i = 900;
108
+float ds_latency_estimator_alpha = 0.9f;
106 109
 int ds_probing_mode  = DS_PROBE_NONE;
107 110
 
108 111
 static str ds_ping_reply_codes_str= STR_NULL;
... ...
@@ -242,6 +245,8 @@ static param_export_t params[]={
242 242
 	{"ds_ping_method",     PARAM_STR, &ds_ping_method},
243 243
 	{"ds_ping_from",       PARAM_STR, &ds_ping_from},
244 244
 	{"ds_ping_interval",   INT_PARAM, &ds_ping_interval},
245
+	{"ds_ping_latency_stats", INT_PARAM, &ds_ping_latency_stats},
246
+	{"ds_latency_estimator_alpha", INT_PARAM, &ds_latency_estimator_alpha_i},
245 247
 	{"ds_ping_reply_codes", PARAM_STR, &ds_ping_reply_codes_str},
246 248
 	{"ds_probing_mode",    INT_PARAM, &ds_probing_mode},
247 249
 	{"ds_hash_size",       INT_PARAM, &ds_hash_size},
... ...
@@ -527,7 +532,12 @@ static int mod_init(void)
527 527
 				return -1;
528 528
 		}
529 529
 	}
530
-
530
+	if (ds_latency_estimator_alpha_i > 0 && ds_latency_estimator_alpha_i < 1000) {
531
+		ds_latency_estimator_alpha = ds_latency_estimator_alpha_i/1000.0f;
532
+	} else {
533
+		LM_ERR("invalid ds_latency_estimator_alpha must be between 0 and 1000,"
534
+				" using default[%.3f]\n", ds_latency_estimator_alpha);
535
+	}
531 536
 	return 0;
532 537
 }
533 538
 
... ...
@@ -1184,6 +1194,7 @@ int ds_rpc_print_set(ds_set_t *node, rpc_t *rpc, void *ctx, void *rpc_handle)
1184 1184
 	void *sh;
1185 1185
 	void *vh;
1186 1186
 	void *wh;
1187
+	void *lh; // latency stats handle
1187 1188
 	int j;
1188 1189
 	char c[3];
1189 1190
 	str data = STR_NULL;
... ...
@@ -1248,6 +1259,21 @@ int ds_rpc_print_set(ds_set_t *node, rpc_t *rpc, void *ctx, void *rpc_handle)
1248 1248
 				return -1;
1249 1249
 			}
1250 1250
 		}
1251
+		if (ds_ping_latency_stats) {
1252
+			if(rpc->struct_add(vh, "{", "LATENCY", &lh) < 0) {
1253
+				rpc->fault(ctx, 500, "Internal error creating dest");
1254
+				return -1;
1255
+			}
1256
+			if (rpc->struct_add(lh, "fffdd", "AVG", node->dlist[j].latency_stats.average,
1257
+					  "STD", node->dlist[j].latency_stats.stdev,
1258
+					  "EST", node->dlist[j].latency_stats.estimate,
1259
+					  "MAX", node->dlist[j].latency_stats.max,
1260
+					  "TIMEOUT", node->dlist[j].latency_stats.timeout)
1261
+					< 0) {
1262
+				rpc->fault(ctx, 500, "Internal error creating dest struct");
1263
+				return -1;
1264
+			}
1265
+		}
1251 1266
 	}
1252 1267
 
1253 1268
 	return 0;
... ...
@@ -54,6 +54,11 @@
54 54
                 <email>martingil.luis@gmail.com</email>
55 55
                 </address>
56 56
             </editor>
57
+            <editor>
58
+                <firstname>Julien</firstname>
59
+                <surname>Chavanton</surname>
60
+                <email>jchavanton@gmail.com</email>
61
+            </editor>
57 62
 	</authorgroup>
58 63
 	<copyright>
59 64
 	    <year>2004</year>
... ...
@@ -75,6 +80,10 @@
75 75
             <year>2015</year>
76 76
             <holder>Alessandro Arrichiello, Hewlett Packard</holder>
77 77
         </copyright>
78
+	<copyright>
79
+            <year>2017</year>
80
+            <holder>Julien chavanton, Flowroute</holder>
81
+        </copyright>
78 82
    </bookinfo>
79 83
     <toc></toc>
80 84
     
... ...
@@ -1,7 +1,7 @@
1 1
 <?xml version="1.0" encoding='ISO-8859-1'?>
2 2
 <!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
3 3
 "http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
4
-
4
+ 
5 5
 <!-- Include general documentation entities -->
6 6
 <!ENTITY % docentities SYSTEM "../../../../doc/docbook/entities.xml">
7 7
 %docentities;
... ...
@@ -706,6 +706,79 @@ modparam("dispatcher", "force_dst", 1)
706 706
  		</example>
707 707
 	</section>
708 708
 
709
+ 	<section id="dispatcher.p.ds_ping_latency_stats">
710
+ 		<title><varname>ds_ping_latency_stats</varname> (int)</title>
711
+ 		<para>
712
+		Enable latency measurement when pinging nodes
713
+		</para>
714
+
715
+		<itemizedlist>
716
+		<listitem>
717
+			<para>If set to 0, disable latency measurement.</para>
718
+		</listitem>
719
+		<listitem>
720
+			<para>If set to 1, enable latency measurement.
721
+			</para>
722
+		</listitem>
723
+		</itemizedlist>
724
+ 		<para>
725
+ 		<emphasis>
726
+ 			Default value is <quote>0</quote>.
727
+ 		</emphasis>
728
+ 		</para>
729
+ 		<example>
730
+ 		<title>accessing the metrics</title>
731
+ <programlisting format="linespecific">
732
+# using the command :
733
+kamcmd dispatcher.list
734
+ ...
735
+DEST: {
736
+	URI: sip:1.2.3.4
737
+	FLAGS: AX
738
+	PRIORITY: 9
739
+	LATENCY: {
740
+		AVG: 24.250000 # weigthed moving average for the last few weeks
741
+		STD: 1.035000  # standard deviation of AVG
742
+		EST: 25.000000 # short term estimate, see parameter: ds_latency_estimator_alpha
743
+		MAX: 26        # maximun value seen
744
+		TIMEOUT: 0     # count of ping timeouts
745
+	}
746
+}
747
+ ...
748
+ </programlisting>
749
+ 		</example>
750
+ 		<example>
751
+ 		<title>Set the <quote>ds_ping_latency_stats</quote> parameter</title>
752
+ <programlisting format="linespecific">
753
+ ...
754
+ modparam("dispatcher", "ds_ping_latency_stats", 1)
755
+ ...
756
+ </programlisting>
757
+ 		</example>
758
+	</section>
759
+ 	<section id="dispatcher.p.ds_latency_estimator_alpha">
760
+ 		<title><varname>ds_latency_estimator_alpha</varname> (int)</title>
761
+		<para>
762
+		The value to be used to control the memory of the estimator EWMA "exponential weighted moving average" or
763
+		"the speed at which the older samples are dampened"
764
+		a goog explanation can be found here : http://www.itl.nist.gov/div898/handbook/pmc/section3/pmc324.htm
765
+		Because Kamailio doesn't support float parameter types, the value in the parameter is divided by 1000 and stored as float.
766
+		For example, if you want to set the alpha to be 0.75, use value 750 here.
767
+ 		</para>
768
+ 		<para>
769
+ 		<emphasis>
770
+			Default value is <quote>900 => 0.9</quote>.
771
+ 		</emphasis>
772
+ 		</para>
773
+ 		<example>
774
+ 		<title>Set the <quote>ds_hash_size</quote> parameter</title>
775
+ <programlisting format="linespecific">
776
+ ...
777
+ modparam("dispatcher", "ds_latency_estimator_alpha", 900)
778
+ ...
779
+ </programlisting>
780
+ 		</example>
781
+	</section>
709 782
  	<section id="dispatcher.p.ds_hash_size">
710 783
  		<title><varname>ds_hash_size</varname> (int)</title>
711 784
  		<para>