Browse code

modules/websocket: Filled in MI commands to dump WebSocket connection details and Close a WebSocket

Peter Dunkley authored on 17/06/2012 20:31:29
Showing 2 changed files
... ...
@@ -21,7 +21,9 @@
21 21
  *
22 22
  */
23 23
 
24
+#include <limits.h>
24 25
 #include "../../tcp_conn.h"
26
+#include "../../tcp_server.h"
25 27
 #include "../../lib/kcore/kstats_wrapper.h"
26 28
 #include "../../lib/kmi/tree.h"
27 29
 #include "ws_frame.h"
... ...
@@ -83,6 +85,8 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
83 85
 	int mask_start, j;
84 86
 	char *buf = frame->tcpinfo->buf;
85 87
 
88
+	LM_INFO("decoding WebSocket frame\n");
89
+
86 90
 	/* Decode and validate first 9 bits */
87 91
 	if (len < 2)
88 92
 	{
... ...
@@ -200,11 +204,104 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame)
200 204
 
201 205
 static int encode_and_send_ws_frame(ws_frame_t *frame)
202 206
 {
203
-	/* TODO: convert ws_frame_t into a binary WebSocket frame and send over
204
-	   TCP/TLS */
207
+	int pos = 0, extended_length;
208
+	unsigned int frame_length;
209
+	char *send_buf;
210
+	struct dest_info dst;
211
+
212
+	LM_INFO("encoding WebSocket frame\n");
213
+
214
+	/* Validate the first byte */
215
+	if (!frame->fin)
216
+	{
217
+		LM_ERR("WebSocket fragmentation not supported in the sip "
218
+			"sub-protocol\n");
219
+		return -1;
220
+	}
221
+
222
+	if (frame->rsv1 || frame->rsv2 || frame->rsv3)
223
+	{
224
+		LM_ERR("WebSocket reserved fields with non-zero values\n");
225
+		return -1;
226
+	}
205 227
 
228
+	switch(frame->opcode)
229
+	{
230
+	case OPCODE_TEXT_FRAME:
231
+	case OPCODE_BINARY_FRAME:
232
+		LM_INFO("supported non-control frame: 0x%x\n",
233
+			(unsigned char) frame->opcode);
234
+		break;
235
+	case OPCODE_CLOSE:
236
+	case OPCODE_PING:
237
+	case OPCODE_PONG:
238
+		LM_INFO("supported control frame: 0x%x\n",
239
+			(unsigned char) frame->opcode);
240
+		break;
241
+	default:
242
+		LM_ERR("unsupported opcode: 0x%x\n",
243
+			(unsigned char) frame->opcode);
244
+		return -1;
245
+	}
246
+
247
+	/* validate the second byte */
248
+	if (frame->mask)
249
+	{
250
+		LM_ERR("this is a server - all messages sent will be "
251
+			"unmasked\n");
252
+		return -1;
253
+	}
254
+
255
+	if (frame->payload_len < 126) extended_length = 0;
256
+	else if (frame->payload_len <= USHRT_MAX ) extended_length = 2;
257
+	else if (frame->payload_len <= UINT_MAX) extended_length = 4;
258
+	else
259
+	{
260
+		LM_ERR("Kamailio only supports WebSocket frames with payload "
261
+			"<= %u\n", UINT_MAX);
262
+		return -1;
263
+	}
264
+
265
+	/* Allocate send buffer and build frame */
266
+	frame_length = frame->payload_len + extended_length + 2;
267
+	if ((send_buf = pkg_malloc(sizeof(unsigned char) * frame_length))
268
+			== NULL)
269
+	{
270
+		LM_ERR("allocating send buffer from pkg memory\n");
271
+		return -1;
272
+	}
273
+	memset(send_buf, 0, frame_length);
274
+	send_buf[pos++] = 0x80 | (frame->opcode & 0xff);
275
+	if (extended_length == 0)
276
+		send_buf[pos++] = (frame->payload_len & 0xff);
277
+	else if (extended_length == 2)
278
+	{
279
+		send_buf[pos++] = 126;
280
+		send_buf[pos++] = (frame->payload_len & 0xff00) >> 8;
281
+		send_buf[pos++] = (frame->payload_len & 0x00ff) >> 0;
282
+	}
283
+	else
284
+	{
285
+		send_buf[pos++] = 127;
286
+		send_buf[pos++] = (frame->payload_len & 0xff000000) >> 24;
287
+		send_buf[pos++] = (frame->payload_len & 0x00ff0000) >> 16;
288
+		send_buf[pos++] = (frame->payload_len & 0x0000ff00) >> 8;
289
+		send_buf[pos++] = (frame->payload_len & 0x000000ff) >> 0;
290
+	}
291
+	memcpy(&send_buf[pos], frame->payload_data, frame->payload_len);
292
+
293
+	init_dst_from_rcv(&dst, &frame->tcpinfo->con->rcv);
294
+	if (tcp_send(&dst, NULL, send_buf, frame_length) < 0)
295
+	{
296
+		LM_ERR("sending WebSocket frame\n");
297
+		pkg_free(send_buf);
298
+		update_stat(ws_failed_connections, 1);
299
+		return -1;
300
+	}
301
+	
206 302
 	update_stat(ws_transmitted_frames, 1);
207 303
 
304
+	pkg_free(send_buf);
208 305
 	return 0;
209 306
 }
210 307
 
... ...
@@ -245,18 +342,12 @@ static int handle_close(ws_frame_t *frame)
245 342
 
246 343
 static int handle_ping(ws_frame_t *frame)
247 344
 {
248
-	ws_frame_t ws_frame;
249
-
250 345
 	LM_INFO("Received Ping\n");
251 346
 
252
-	memset(&ws_frame, 0, sizeof(ws_frame_t));
253
-	ws_frame.fin = 1;
254
-	ws_frame.opcode = OPCODE_PONG;
255
-	ws_frame.payload_len = frame->payload_len;
256
-	ws_frame.payload_data =  frame->payload_data;
257
-	ws_frame.tcpinfo = frame->tcpinfo;
347
+	frame->opcode = OPCODE_PONG;
348
+	frame->mask = 0;
258 349
 
259
-	encode_and_send_ws_frame(&ws_frame);
350
+	encode_and_send_ws_frame(frame);
260 351
 
261 352
 	return 0;
262 353
 }
... ...
@@ -329,7 +420,69 @@ int ws_frame_received(void *data)
329 420
 
330 421
 struct mi_root *ws_mi_close(struct mi_root *cmd, void *param)
331 422
 {
332
-	/* TODO Close specified or all connections */
423
+	unsigned int id;
424
+	struct mi_node *node = NULL;
425
+	ws_frame_t frame;
426
+	tcp_event_info_t tcpinfo;
427
+	short int code = 1000;
428
+	str reason = str_init("Normal Closure");
429
+	char *data;
430
+
431
+	node = cmd->node.kids;
432
+	if (node == NULL)
433
+		return 0;
434
+	if (node->value.s == NULL || node->value.len == 0)
435
+	{
436
+		LM_ERR("empty connection ID parameter\n");
437
+		return init_mi_tree(400, "Empty connection ID parameter", 29);
438
+	}
439
+	if (str2int(&node->value, &id) < 0)
440
+	{
441
+		LM_ERR("converting string to int\n");
442
+		return 0;
443
+	}
444
+	if (node->next != NULL)
445
+	{
446
+		LM_ERR("too many parameters\n");
447
+		return init_mi_tree(400, "Too many parameters", 19);
448
+	}
449
+
450
+	if ((tcpinfo.con = tcpconn_get(id, 0, 0, 0, 0)) == NULL)
451
+	{
452
+		LM_ERR("bad connection ID parameter\n");
453
+		return init_mi_tree(400, "Bad connection ID parameter", 27);
454
+	}
455
+
456
+	if ((data = pkg_malloc(sizeof(char) * (reason.len + 2))) == NULL)
457
+	{
458
+		LM_ERR("allocating pkg memory\n");
459
+		return 0;
460
+	}
461
+
462
+	data[0] = (code & 0xff00) >> 8;
463
+	data[1] = (code & 0x00ff) >> 0;
464
+	memcpy(&data[2], reason.s, reason.len);
465
+
466
+	memset(&frame, 0, sizeof(frame));
467
+	frame.fin = 1;
468
+	frame.opcode = OPCODE_CLOSE;
469
+	frame.payload_len = reason.len + 2;
470
+	frame.payload_data = data;
471
+	frame.tcpinfo = &tcpinfo;
472
+
473
+	if (encode_and_send_ws_frame(&frame) < 0)
474
+	{
475
+		LM_ERR("sending WebSocket close\n");
476
+		pkg_free(data);
477
+		return init_mi_tree(500,"Sending WebSocket close", 23);
478
+	}
479
+
480
+	/* TODO: cleanly close TCP/TLS connection */
481
+
482
+	update_stat(ws_local_closed_connections, 1);
483
+	update_stat(ws_current_connections, -1);
484
+
485
+	pkg_free(data);
333 486
 	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
334 487
 }
335 488
 
... ...
@@ -23,8 +23,10 @@
23 23
 
24 24
 #include "../../dprint.h"
25 25
 #include "../../events.h"
26
+#include "../../ip_addr.h"
26 27
 #include "../../locking.h"
27 28
 #include "../../sr_module.h"
29
+#include "../../tcp_conn.h"
28 30
 #include "../../lib/kcore/kstats_wrapper.h"
29 31
 #include "../../lib/kmi/mi.h"
30 32
 #include "../../lib/kmi/tree.h"
... ...
@@ -35,6 +37,9 @@
35 37
 
36 38
 MODULE_VERSION
37 39
 
40
+extern gen_lock_t *tcpconn_lock;
41
+extern struct tcp_connection **tcpconn_id_hash;
42
+
38 43
 static int mod_init(void);
39 44
 static void destroy(void);
40 45
 
... ...
@@ -172,6 +177,47 @@ static void destroy(void)
172 177
 
173 178
 static struct mi_root *mi_dump(struct mi_root *cmd, void *param)
174 179
 {
175
-	/* TODO: output all open websocket connections */
180
+	int h, connections = 0;
181
+	char *src_proto, *dst_proto;
182
+	char src_ip[IP6_MAX_STR_SIZE + 1], dst_ip[IP6_MAX_STR_SIZE + 1];
183
+	struct tcp_connection *c;
184
+
185
+	TCPCONN_LOCK;
186
+	for (h = 0; h < TCP_ID_HASH_SIZE; h++)
187
+	{
188
+		c = tcpconn_id_hash[h];
189
+		while(c)
190
+		{
191
+			if (c->flags & F_CONN_WS)
192
+			{
193
+				src_proto = (c->rcv.proto== PROTO_TCP)
194
+						? "tcp" : "tls";
195
+				memset(src_ip, 0, IP6_MAX_STR_SIZE + 1);
196
+				ip_addr2sbuf(&c->rcv.src_ip, src_ip,
197
+						IP6_MAX_STR_SIZE);
198
+
199
+				dst_proto = (c->rcv.proto == PROTO_TCP)
200
+						? "tcp" : "tls";
201
+				memset(dst_ip, 0, IP6_MAX_STR_SIZE + 1);
202
+				ip_addr2sbuf(&c->rcv.dst_ip, src_ip,
203
+						IP6_MAX_STR_SIZE);
204
+
205
+				LM_ERR("id - %d, "
206
+					"src - %s:%s:%hu, "
207
+					"dst - %s:%s:%hu\n",
208
+					c->id,
209
+					src_proto, src_ip, c->rcv.src_port,
210
+					dst_proto, dst_ip, c->rcv.dst_port);
211
+
212
+				connections++;
213
+			}
214
+
215
+			c = c->id_next;
216
+		}
217
+	}
218
+	TCPCONN_UNLOCK;
219
+
220
+	LM_ERR("%d WebSocket connections found\n", connections);
221
+
176 222
 	return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
177 223
 }