Browse code

rtpengine: add support for websocket protocol

Richard Fuchs authored on 24/03/2021 20:09:45
Showing 3 changed files
... ...
@@ -144,7 +144,7 @@
144 144
 		<title><varname>rtpengine_sock</varname> (string)</title>
145 145
 		<para>
146 146
 		Definition of socket(s) used to connect to (a set) &rtp; proxy. It may
147
-		specify a UNIX socket or an IPv4/IPv6 UDP socket.
147
+		specify an IPv4/IPv6 UDP socket or a websocket URI.
148 148
 		</para>
149 149
 		<para>
150 150
 		<emphasis>
... ...
@@ -157,6 +157,12 @@
157 157
 ...
158 158
 # single rtproxy
159 159
 modparam("rtpengine", "rtpengine_sock", "udp:localhost:12221")
160
+# IPv6 UDP
161
+modparam("rtpengine", "rtpengine_sock", "udp6:localhost:12221")
162
+# websocket
163
+modparam("rtpengine", "rtpengine_sock", "ws://localhost:12221/")
164
+# websocket with TLS
165
+modparam("rtpengine", "rtpengine_sock", "wss://localhost:12221/")
160 166
 # multiple rtproxies for LB with weights (missing weight defaults to 1)
161 167
 modparam("rtpengine", "rtpengine_sock",
162 168
 	"udp:localhost:12221=2 udp:localhost:12222=1")
... ...
@@ -2210,6 +2216,22 @@ modparam("rtpengine", "hash_algo", 2)
2210 2216
 		</example>
2211 2217
 	</section>
2212 2218
 
2219
+	<section id="rtpengine.p.wsapi">
2220
+		<title><varname>wsapi</varname> (string)</title>
2221
+		<para>
2222
+			Configure a backend API for websocket usage. Currently the only supported setting is
2223
+			<quote>lwsc</quote> (libwebsockets). If unset, then the websocket protocol cannot be used.
2224
+		</para>
2225
+		<example>
2226
+		<title>Set <varname>wsapi</varname> parameter</title>
2227
+<programlisting format="linespecific">
2228
+...
2229
+modparam("rtpengine", "wsapi", "lwsc")
2230
+...
2231
+</programlisting>
2232
+		</example>
2233
+	</section>
2234
+
2213 2235
 
2214 2236
 
2215 2237
 	</section>
... ...
@@ -80,6 +80,7 @@
80 80
 #include "../../core/char_msg_val.h"
81 81
 #include "../../modules/tm/tm_load.h"
82 82
 #include "../../modules/crypto/api.h"
83
+#include "../../modules/lwsc/api.h"
83 84
 #include "rtpengine.h"
84 85
 #include "rtpengine_funcs.h"
85 86
 #include "rtpengine_hash.h"
... ...
@@ -300,7 +301,8 @@ static pv_spec_t *media_duration_pvar = NULL;
300 301
 char* force_send_ip_str="";
301 302
 int force_send_ip_af = AF_UNSPEC;
302 303
 
303
-
304
+static str _rtpe_wsapi = STR_NULL;
305
+lwsc_api_t _rtpe_lwscb = {0};
304 306
 
305 307
 static enum hash_algo_t hash_algo = RTP_HASH_CALLID;
306 308
 
... ...
@@ -525,6 +527,8 @@ static param_export_t params[] = {
525 527
 	{"mos_average_roundtrip_leg_B_pv",  PARAM_STR, &side_B_mos_stats.average.roundtrip_leg_param },
526 528
 	{"mos_average_samples_B_pv",    PARAM_STR, &side_B_mos_stats.average.samples_param     },
527 529
 
530
+	{"wsapi",                       PARAM_STR, &_rtpe_wsapi    },
531
+
528 532
 	{0, 0, 0}
529 533
 };
530 534
 
... ...
@@ -985,7 +989,7 @@ int add_rtpengine_socks(struct rtpp_set *rtpp_list, char *rtpengine,
985 989
 			pnode->rn_recheck_ticks = ticks + get_ticks();
986 990
 		}
987 991
 		pnode->rn_weight = local_weight;
988
-		pnode->rn_umode = 0;
992
+		pnode->rn_umode = RNU_UNKNOWN;
989 993
 		pnode->rn_disabled = disabled;
990 994
 		pnode->rn_displayed = 1;
991 995
 		pnode->rn_url.s = shm_malloc(p2 - p1 + 1);
... ...
@@ -1002,17 +1006,21 @@ int add_rtpengine_socks(struct rtpp_set *rtpp_list, char *rtpengine,
1002 1006
 		/* Leave only address in rn_address */
1003 1007
 		pnode->rn_address = pnode->rn_url.s;
1004 1008
 		if (strncasecmp(pnode->rn_address, "udp:", 4) == 0) {
1005
-			pnode->rn_umode = 1;
1009
+			pnode->rn_umode = RNU_UDP;
1006 1010
 			pnode->rn_address += 4;
1007 1011
 		} else if (strncasecmp(pnode->rn_address, "udp6:", 5) == 0) {
1008
-			pnode->rn_umode = 6;
1012
+			pnode->rn_umode = RNU_UDP6;
1009 1013
 			pnode->rn_address += 5;
1010 1014
 		} else if (strncasecmp(pnode->rn_address, "unix:", 5) == 0) {
1011
-			pnode->rn_umode = 0;
1015
+			pnode->rn_umode = RNU_LOCAL;
1012 1016
 			pnode->rn_address += 5;
1017
+		} else if (strncasecmp(pnode->rn_address, "ws://", 5) == 0) {
1018
+			pnode->rn_umode = RNU_WS;
1019
+		} else if (strncasecmp(pnode->rn_address, "wss://", 6) == 0) {
1020
+			pnode->rn_umode = RNU_WSS;
1013 1021
 		} else {
1014 1022
 			lock_release(rtpp_no_lock);
1015
-			LM_WARN("Node address must start with 'udp:' or 'udp6:' or 'unix:'. Ignore '%s'.\n", pnode->rn_address);
1023
+			LM_WARN("Node address must start with 'udp:' or 'udp6:' or 'unix:' or 'ws://' or 'wss://'. Ignore '%s'.\n", pnode->rn_address);
1016 1024
 			shm_free(pnode->rn_url.s);
1017 1025
 			shm_free(pnode);
1018 1026
 
... ...
@@ -1023,26 +1031,42 @@ int add_rtpengine_socks(struct rtpp_set *rtpp_list, char *rtpengine,
1023 1031
 			}
1024 1032
 		}
1025 1033
 
1026
-		/* Check the rn_address is 'hostname:port' */
1027
-		/* Check the rn_address port is valid */
1028
-		if(pnode->rn_umode == 6) {
1029
-                        p1 = strstr(pnode->rn_address, "]:");
1030
-                        if(p1 != NULL) {
1031
-                                p1++;
1032
-                        }
1033
-                } else {
1034
-                        p1 = strchr(pnode->rn_address, ':');
1035
-                }
1036
-		if (p1 != NULL) {
1037
-			p1++;
1038
-		}
1034
+		if (pnode->rn_umode != RNU_WS && pnode->rn_umode != RNU_WSS) {
1035
+			/* Check the rn_address is 'hostname:port' */
1036
+			/* Check the rn_address port is valid */
1037
+			if(pnode->rn_umode == RNU_UDP6) {
1038
+				p1 = strstr(pnode->rn_address, "]:");
1039
+				if(p1 != NULL) {
1040
+					p1++;
1041
+				}
1042
+			} else {
1043
+				p1 = strchr(pnode->rn_address, ':');
1044
+			}
1045
+			if (p1 != NULL) {
1046
+				p1++;
1047
+			}
1039 1048
 
1040
-		if (p1 != NULL && p1[0] != '\0') {
1041
-			s1.s = p1;
1042
-			s1.len = strlen(p1);
1043
-			if (str2int(&s1, &port) < 0 || port > 0xFFFF) {
1049
+			if (p1 != NULL && p1[0] != '\0') {
1050
+				s1.s = p1;
1051
+				s1.len = strlen(p1);
1052
+				if (str2int(&s1, &port) < 0 || port > 0xFFFF) {
1053
+					lock_release(rtpp_no_lock);
1054
+					LM_WARN("Node address must end with a valid port number. Ignore '%s'.\n", pnode->rn_address);
1055
+					shm_free(pnode->rn_url.s);
1056
+					shm_free(pnode);
1057
+
1058
+					if (!isDB) {
1059
+						continue;
1060
+					} else {
1061
+						return 0;
1062
+					}
1063
+				}
1064
+			}
1065
+		} else {
1066
+			/* websocket */
1067
+			if (_rtpe_lwscb.loaded == 0) {
1044 1068
 				lock_release(rtpp_no_lock);
1045
-				LM_WARN("Node address must end with a valid port number. Ignore '%s'.\n", pnode->rn_address);
1069
+				LM_WARN("Websocket protocol requested, but no websocket API loaded. Ignore '%s'.\n", pnode->rn_address);
1046 1070
 				shm_free(pnode->rn_url.s);
1047 1071
 				shm_free(pnode);
1048 1072
 
... ...
@@ -1607,6 +1631,19 @@ mod_init(void)
1607 1631
 		return -1;
1608 1632
 	}
1609 1633
 
1634
+	if(_rtpe_wsapi.s!=NULL && _rtpe_wsapi.len==4
1635
+			&& strncasecmp(_rtpe_wsapi.s, "lwsc", 4)==0) {
1636
+		if(lwsc_load_api(&_rtpe_lwscb)) {
1637
+			LM_ERR("failed to load WS API: %s\n", _rtpe_wsapi.s);
1638
+			return -1;
1639
+		}
1640
+	} else {
1641
+		if(_rtpe_wsapi.s!=NULL && _rtpe_wsapi.len>0) {
1642
+			LM_ERR("unsupported WS API: %s\n", _rtpe_wsapi.s);
1643
+			return -1;
1644
+		}
1645
+	}
1646
+
1610 1647
 	/* initialize the list of set; mod_destroy does shm_free() if fail */
1611 1648
 	if (!rtpp_set_list) {
1612 1649
 		rtpp_set_list = shm_malloc(sizeof(struct rtpp_set_head));
... ...
@@ -1831,7 +1868,8 @@ static int build_rtpp_socks(int lmode, int rtest) {
1831 1868
 			char *hostname;
1832 1869
 			char *hp;
1833 1870
 
1834
-			if (pnode->rn_umode == 0) {
1871
+			if (pnode->rn_umode == RNU_LOCAL || pnode->rn_umode == RNU_WS
1872
+					|| pnode->rn_umode == RNU_WSS) {
1835 1873
 				rtpp_socks[pnode->idx] = -1;
1836 1874
 				goto rptest;
1837 1875
 			}
... ...
@@ -1861,7 +1899,7 @@ static int build_rtpp_socks(int lmode, int rtest) {
1861 1899
 			if (cp == NULL || *cp == '\0')
1862 1900
 				cp = CPORT;
1863 1901
 
1864
-			if(pnode->rn_umode == 6) {
1902
+			if(pnode->rn_umode == RNU_UDP6) {
1865 1903
 				hp = strrchr(hostname, ']');
1866 1904
 				if(hp != NULL)
1867 1905
 					*hp = '\0';
... ...
@@ -1875,7 +1913,7 @@ static int build_rtpp_socks(int lmode, int rtest) {
1875 1913
 
1876 1914
 			memset(&hints, 0, sizeof(hints));
1877 1915
 			hints.ai_flags = 0;
1878
-			hints.ai_family = (pnode->rn_umode == 6) ? AF_INET6 : AF_INET;
1916
+			hints.ai_family = (pnode->rn_umode == RNU_UDP6) ? AF_INET6 : AF_INET;
1879 1917
 			hints.ai_socktype = SOCK_DGRAM;
1880 1918
 			if ((n = getaddrinfo(hp, cp, &hints, &res)) != 0) {
1881 1919
 				LM_ERR("%s\n", gai_strerror(n));
... ...
@@ -1890,7 +1928,7 @@ static int build_rtpp_socks(int lmode, int rtest) {
1890 1928
 			}
1891 1929
 			pkg_free(hostname);
1892 1930
 
1893
-			rtpp_socks[pnode->idx] = socket((pnode->rn_umode == 6)
1931
+			rtpp_socks[pnode->idx] = socket((pnode->rn_umode == RNU_UDP6)
1894 1932
 				? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
1895 1933
 			if (rtpp_socks[pnode->idx] == -1) {
1896 1934
 				LM_ERR("can't create socket\n");
... ...
@@ -1912,7 +1950,7 @@ static int build_rtpp_socks(int lmode, int rtest) {
1912 1950
 
1913 1951
 			if((0 <= control_cmd_tos) && (control_cmd_tos < 256)) {
1914 1952
 				unsigned char tos = control_cmd_tos;
1915
-				if (pnode->rn_umode == 6) {
1953
+				if (pnode->rn_umode == RNU_UDP6) {
1916 1954
 					if(setsockopt(rtpp_socks[pnode->idx], IPPROTO_IPV6,
1917 1955
 							IPV6_TCLASS, &control_cmd_tos,
1918 1956
 							sizeof(control_cmd_tos)))
... ...
@@ -2865,10 +2903,12 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2865 2903
 	int fd, len, i, vcnt;
2866 2904
 	int rtpengine_retr, rtpengine_tout_ms = 1000;
2867 2905
 	char *cp;
2868
-	static char buf[0x10000];
2906
+	static char buf[0x40000];
2869 2907
 	struct pollfd fds[1];
2870 2908
 	struct iovec *v;
2871 2909
 	str cmd = STR_NULL;
2910
+	const static str rtpe_proto = { "ng.rtpengine.com", 16 };
2911
+	str request, response;
2872 2912
 
2873 2913
 	v = bencode_iovec(dict, &vcnt, 1, 0);
2874 2914
 	if (!v) {
... ...
@@ -2878,7 +2918,9 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2878 2918
 
2879 2919
 	len = 0;
2880 2920
 	cp = buf;
2881
-	if (node->rn_umode == 0) {
2921
+	rtpengine_tout_ms = cfg_get(rtpengine,rtpengine_cfg,rtpengine_tout_ms);
2922
+
2923
+	if (node->rn_umode == RNU_LOCAL) {
2882 2924
 		memset(&addr, 0, sizeof(addr));
2883 2925
 		addr.sun_family = AF_LOCAL;
2884 2926
 		strncpy(addr.sun_path, node->rn_address,
... ...
@@ -2914,7 +2956,55 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2914 2956
 			LM_ERR("can't read reply from RTPEngine <%s>\n", node->rn_url.s);
2915 2957
 			goto badproxy;
2916 2958
 		}
2959
+	} else if (node->rn_umode == RNU_WS || node->rn_umode == RNU_WSS) {
2960
+		/* assemble full request string, flatten iovec */
2961
+		v[0].iov_base = gencookie();
2962
+		v[0].iov_len = strlen(v[0].iov_base);
2963
+		len = 0;
2964
+		for (i = 0; i <= vcnt; i++)
2965
+			len += v[i].iov_len;
2966
+		request.s = pkg_malloc(len + 1);
2967
+		if (!request.s) {
2968
+			LM_ERR("out of memory\n");
2969
+			goto badproxy;
2970
+		}
2971
+		len = 0;
2972
+		for (i = 0; i <= vcnt; i++) {
2973
+			memcpy(request.s + len, v[i].iov_base, v[i].iov_len);
2974
+			len += v[i].iov_len;
2975
+		}
2976
+		request.s[len] = '\0';
2977
+		request.len = len;
2978
+
2979
+		len = _rtpe_lwscb.request(&node->rn_url, (str *) &rtpe_proto, &request, &response,
2980
+				rtpengine_tout_ms * 1000);
2981
+
2982
+		if (len < 0) {
2983
+			LM_ERR("failed to do websocket request\n");
2984
+			goto badproxy;
2985
+		}
2986
+
2987
+		/* process/copy response; verify cookie */
2988
+		if (response.len < v[0].iov_len) {
2989
+			LM_ERR("empty or short websocket response\n");
2990
+			pkg_free(response.s);
2991
+			goto badproxy;
2992
+		}
2993
+		if (memcmp(response.s, v[0].iov_base, v[0].iov_len)) {
2994
+			LM_ERR("mismatched cookie in websocket response\n");
2995
+			pkg_free(response.s);
2996
+			goto badproxy;
2997
+		}
2998
+		len = response.len - v[0].iov_len;
2999
+		if (len >= sizeof(buf)) {
3000
+			LM_ERR("websocket response too large\n");
3001
+			pkg_free(response.s);
3002
+			goto badproxy;
3003
+		}
3004
+		memcpy(buf, response.s + v[0].iov_len, len);
3005
+		pkg_free(response.s);
2917 3006
 	} else {
3007
+		/* UDP or UDP6 */
2918 3008
 		fds[0].fd = rtpp_socks[node->idx];
2919 3009
 		fds[0].events = POLLIN;
2920 3010
 		fds[0].revents = 0;
... ...
@@ -2938,7 +3028,6 @@ send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2938 3028
 					cmd.len, cmd.s, node->rn_url.s);
2939 3029
 				goto badproxy;
2940 3030
 			}
2941
-			rtpengine_tout_ms = cfg_get(rtpengine,rtpengine_cfg,rtpengine_tout_ms);
2942 3031
 			while ((poll(fds, 1, rtpengine_tout_ms) == 1) &&
2943 3032
 				(fds[0].revents & POLLIN) != 0) {
2944 3033
 				do {
... ...
@@ -54,7 +54,14 @@ enum rtpe_operation {
54 54
 struct rtpp_node {
55 55
 	unsigned int		idx;			/* overall index */
56 56
 	str			rn_url;			/* unparsed, deletable */
57
-	int			rn_umode;
57
+	enum {
58
+		RNU_UNKNOWN = -1,
59
+		RNU_LOCAL = 0,
60
+		RNU_UDP = 1,
61
+		RNU_UDP6 = 6,
62
+		RNU_WS = 2,
63
+		RNU_WSS = 3,
64
+	}			rn_umode;
58 65
 	char			*rn_address;		/* substring of rn_url */
59 66
 	int			rn_disabled;		/* found unaccessible? */
60 67
 	unsigned int		rn_weight;		/* for load balancing */