Browse code

core: draft support for TCP transport (WIP)

Raphael Coeffic authored on 22/08/2013 07:16:33
Showing 17 changed files
... ...
@@ -277,46 +277,15 @@ void RegisterDialog::fixUacContactHosts(const AmSipRequest& req,
277 277
     if(contact_hiding) {
278 278
       uac_contacts[i].uri_user = encodeUsername(uac_contacts[i],
279 279
 						req,cp,ctx);
280
-
281
-      list<sip_avp*> uri_params;
282
-      const string& old_params = uac_contacts[i].uri_param;
283
-      const char* c = old_params.c_str();
284
-
285
-      if(parse_gen_params(&uri_params,&c,old_params.length(),0) < 0) {
286
-
287
-	DBG("could not parse Contact URI parameters: '%s'",
288
-	    uac_contacts[i].uri_param.c_str());
289
-	free_gen_params(&uri_params);
290
-	continue;
291
-      }
292
-
293
-      // Suppress transport parameter
294
-      // hack to suppress transport=tcp
295
-      string new_params;
296
-      for(list<sip_avp*>::iterator p_it = uri_params.begin();
297
-	  p_it != uri_params.end(); p_it++) {
298
-
299
-	DBG("parsed");
300
-	if( ((*p_it)->name.len == (sizeof("transport")-1)) &&
301
-	    !memcmp((*p_it)->name.s,"transport",sizeof("transport")-1) ) {
302
-	  continue;
303
-	}
304
-
305
-	if(!new_params.empty()) new_params += ";";
306
-	new_params += c2stlstr((*p_it)->name);
307
-	if((*p_it)->value.len) {
308
-	  new_params += "=" + c2stlstr((*p_it)->value);
309
-	}
310
-      }
311
-
312
-      free_gen_params(&uri_params);
313
-      uac_contacts[i].uri_param = new_params;
314 280
     }
315 281
     else if(!reg_caching) {
316 282
       cp.fix_reg_contact(ctx,req,uac_contacts[i]);
317 283
       continue;
318 284
     }
319 285
 
286
+    // remove 'transport' param from Contact
287
+    removeTransport(uac_contacts[i]);
288
+
320 289
     // patch host & port
321 290
     uac_contacts[i].uri_host = AmConfig::SIP_Ifs[oif].getIP();
322 291
 
... ...
@@ -325,11 +294,49 @@ void RegisterDialog::fixUacContactHosts(const AmSipRequest& req,
325 294
     else
326 295
       uac_contacts[i].uri_port = int2str(AmConfig::SIP_Ifs[oif].LocalPort);
327 296
       
328
-    DBG("Patching host and port for Contact-HF: host='%s';port='%s'",
297
+    DBG("Patching host, port and transport for Contact-HF: host='%s';port='%s'",
329 298
 	uac_contacts[i].uri_host.c_str(),uac_contacts[i].uri_port.c_str());
330 299
   }
331 300
 }
332 301
 
302
+int RegisterDialog::removeTransport(AmUriParser& uri)
303
+{
304
+  list<sip_avp*> uri_params;
305
+  string old_params = uri.uri_param;
306
+  const char* c = old_params.c_str();
307
+
308
+  if(parse_gen_params(&uri_params,&c,old_params.length(),0) < 0) {
309
+
310
+    DBG("could not parse Contact URI parameters: '%s'",
311
+	uri.uri_param.c_str());
312
+    free_gen_params(&uri_params);
313
+    return -1;
314
+  }
315
+
316
+  // Suppress transport parameter
317
+  // hack to suppress transport=tcp
318
+  string new_params;
319
+  for(list<sip_avp*>::iterator p_it = uri_params.begin();
320
+      p_it != uri_params.end(); p_it++) {
321
+
322
+    DBG("parsed");
323
+    if( ((*p_it)->name.len == (sizeof("transport")-1)) &&
324
+	!memcmp((*p_it)->name.s,"transport",sizeof("transport")-1) ) {
325
+      continue;
326
+    }
327
+
328
+    if(!new_params.empty()) new_params += ";";
329
+    new_params += c2stlstr((*p_it)->name);
330
+    if((*p_it)->value.len) {
331
+      new_params += "=" + c2stlstr((*p_it)->value);
332
+    }
333
+  }
334
+
335
+  free_gen_params(&uri_params);
336
+  uri.uri_param = new_params;
337
+  return 0;
338
+}
339
+
333 340
 int RegisterDialog::initAor(const AmSipRequest& req)
334 341
 {
335 342
   AmUriParser from_parser;
... ...
@@ -55,6 +55,9 @@ class RegisterDialog
55 55
   // so that getOutboundIf() can be called
56 56
   void fixUacContactHosts(const AmSipRequest& req, const SBCCallProfile& cp);
57 57
 
58
+  // remove the transport parameter from a URI
59
+  int removeTransport(AmUriParser& uri);
60
+
58 61
 public:
59 62
   RegisterDialog(SBCCallProfile &profile, vector<AmDynInvoke*> &cc_modules);
60 63
   ~RegisterDialog();
... ...
@@ -713,6 +713,7 @@ int AmConfig::insert_SIP_interface(const SIP_interface& intf)
713 713
 
714 714
 	return -1;
715 715
       }
716
+      //FIXME: what happens now? shouldn't we insert the interface????
716 717
     }
717 718
   }
718 719
 
... ...
@@ -46,6 +46,7 @@
46 46
 #include "sip/msg_hdrs.h"
47 47
 #include "sip/udp_trsp.h"
48 48
 #include "sip/ip_util.h"
49
+#include "sip/tcp_trsp.h"
49 50
 
50 51
 #include "log.h"
51 52
 
... ...
@@ -60,6 +61,105 @@
60 61
 bool _SipCtrlInterface::log_parsed_messages = true;
61 62
 int _SipCtrlInterface::udp_rcvbuf = -1;
62 63
 
64
+int _SipCtrlInterface::alloc_udp_structs()
65
+{
66
+    udp_sockets = new udp_trsp_socket*[ AmConfig::SIP_Ifs.size() ];
67
+    udp_servers = new udp_trsp* [ AmConfig::SIPServerThreads
68
+				  * AmConfig::SIP_Ifs.size() ];
69
+
70
+    if(udp_sockets && udp_servers)
71
+	return 0;
72
+
73
+    return -1;
74
+}
75
+
76
+int _SipCtrlInterface::init_udp_servers(int if_num)
77
+{
78
+    udp_trsp_socket* udp_socket = 
79
+	new udp_trsp_socket(if_num,AmConfig::SIP_Ifs[if_num].SigSockOpts
80
+			    | (AmConfig::ForceOutboundIf ? 
81
+			       trsp_socket::force_outbound_if : 0)
82
+			    | (AmConfig::UseRawSockets ?
83
+			       trsp_socket::use_raw_sockets : 0),
84
+			    AmConfig::SIP_Ifs[if_num].NetIfIdx);
85
+	
86
+    if(!AmConfig::SIP_Ifs[if_num].PublicIP.empty()) {
87
+	udp_socket->set_public_ip(AmConfig::SIP_Ifs[if_num].PublicIP);
88
+    }
89
+
90
+    if(udp_socket->bind(AmConfig::SIP_Ifs[if_num].LocalIP,
91
+			AmConfig::SIP_Ifs[if_num].LocalPort) < 0){
92
+
93
+	ERROR("Could not bind SIP/UDP socket to %s:%i",
94
+	      AmConfig::SIP_Ifs[if_num].LocalIP.c_str(),
95
+	      AmConfig::SIP_Ifs[if_num].LocalPort);
96
+
97
+	delete udp_socket;
98
+	return -1;
99
+    }
100
+
101
+    if(udp_rcvbuf > 0) {
102
+	udp_socket->set_recvbuf_size(udp_rcvbuf);
103
+    }
104
+
105
+    trans_layer::instance()->register_transport(udp_socket);
106
+    udp_sockets[if_num] = udp_socket;
107
+    inc_ref(udp_socket);
108
+    nr_udp_sockets++;
109
+
110
+    for(int j=0; j<AmConfig::SIPServerThreads;j++){
111
+	udp_servers[if_num * AmConfig::SIPServerThreads + j] = 
112
+	    new udp_trsp(udp_socket);
113
+	nr_udp_servers++;
114
+    }
115
+
116
+    return 0;
117
+}
118
+
119
+int _SipCtrlInterface::alloc_tcp_structs()
120
+{
121
+    tcp_sockets = new tcp_server_socket*[ AmConfig::SIP_Ifs.size() ];
122
+    tcp_servers = new tcp_trsp* [ AmConfig::SIP_Ifs.size() ];
123
+
124
+    if(tcp_sockets && tcp_servers)
125
+	return 0;
126
+
127
+    return -1;
128
+}
129
+
130
+int _SipCtrlInterface::init_tcp_servers(int if_num)
131
+{
132
+    tcp_server_socket* tcp_socket = new tcp_server_socket(if_num);
133
+    // if(!AmConfig::SIP_Ifs[if_num].PublicIP.empty()) {
134
+    // 	tcp_socket->set_public_ip(AmConfig::SIP_Ifs[if_num].PublicIP);
135
+    // }
136
+
137
+    if(tcp_socket->bind(AmConfig::SIP_Ifs[if_num].LocalIP,
138
+			AmConfig::SIP_Ifs[if_num].LocalPort) < 0){
139
+
140
+	ERROR("Could not bind SIP/TCP socket to %s:%i",
141
+	      AmConfig::SIP_Ifs[if_num].LocalIP.c_str(),
142
+	      AmConfig::SIP_Ifs[if_num].LocalPort);
143
+
144
+	delete tcp_socket;
145
+	return -1;
146
+    }
147
+
148
+    // if(udp_rcvbuf > 0) {
149
+    // 	udp_socket->set_recvbuf_size(udp_rcvbuf);
150
+    // }
151
+
152
+    trans_layer::instance()->register_transport(tcp_socket);
153
+    tcp_sockets[if_num] = tcp_socket;
154
+    inc_ref(tcp_socket);
155
+    nr_tcp_sockets++;
156
+
157
+    tcp_servers[if_num] = new tcp_trsp(tcp_socket);
158
+    nr_tcp_servers++;
159
+
160
+    return 0;
161
+}
162
+
63 163
 int _SipCtrlInterface::load()
64 164
 {
65 165
     if (!AmConfig::OutboundProxy.empty()) {
... ...
@@ -119,48 +219,27 @@ int _SipCtrlInterface::load()
119 219
 	DBG("assuming SIP default settings.\n");
120 220
     }
121 221
 
122
-    udp_sockets = new udp_trsp_socket*[AmConfig::SIP_Ifs.size()];
123
-    udp_servers = new udp_trsp*[AmConfig::SIPServerThreads * AmConfig::SIP_Ifs.size()];
222
+    if(alloc_udp_structs() < 0) {
223
+	ERROR("no enough memory to alloc UDP structs");
224
+	return -1;
225
+    }
124 226
 
125
-    // Init transport instances
227
+    // Init UDP transport instances
126 228
     for(unsigned int i=0; i<AmConfig::SIP_Ifs.size();i++) {
127
-
128
-	udp_trsp_socket* udp_socket = 
129
-	    new udp_trsp_socket(i,AmConfig::SIP_Ifs[i].SigSockOpts
130
-				| (AmConfig::ForceOutboundIf ? 
131
-				   trsp_socket::force_outbound_if : 0)
132
-				| (AmConfig::UseRawSockets ?
133
-				   trsp_socket::use_raw_sockets : 0),
134
-				AmConfig::SIP_Ifs[i].NetIfIdx);
135
-	
136
-	if(!AmConfig::SIP_Ifs[i].PublicIP.empty()) {
137
-	    udp_socket->set_public_ip(AmConfig::SIP_Ifs[i].PublicIP);
138
-	}
139
-
140
-	if(udp_socket->bind(AmConfig::SIP_Ifs[i].LocalIP,
141
-			    AmConfig::SIP_Ifs[i].LocalPort) < 0){
142
-
143
-	    ERROR("Could not bind SIP/UDP socket to %s:%i",
144
-		  AmConfig::SIP_Ifs[i].LocalIP.c_str(),
145
-		  AmConfig::SIP_Ifs[i].LocalPort);
146
-
147
-	    delete udp_socket;
229
+	if(init_udp_servers(i) < 0) {
148 230
 	    return -1;
149 231
 	}
232
+    }
150 233
 
151
-	if(udp_rcvbuf > 0) {
152
-	    udp_socket->set_recvbuf_size(udp_rcvbuf);
153
-	}
154
-
155
-	trans_layer::instance()->register_transport(udp_socket);
156
-	udp_sockets[i] = udp_socket;
157
-	inc_ref(udp_socket);
158
-	nr_udp_sockets++;
234
+    if(alloc_tcp_structs() < 0) {
235
+	ERROR("no enough memory to alloc TCP structs");
236
+	return -1;
237
+    }
159 238
 
160
-	for(int j=0; j<AmConfig::SIPServerThreads;j++){
161
-	    udp_servers[i*AmConfig::SIPServerThreads + j] = 
162
-		new udp_trsp(udp_socket);
163
-	    nr_udp_servers++;
239
+    // Init TCP transport instances
240
+    for(unsigned int i=0; i<AmConfig::SIP_Ifs.size();i++) {
241
+	if(init_tcp_servers(i) < 0) {
242
+	    return -1;
164 243
 	}
165 244
     }
166 245
 
... ...
@@ -168,8 +247,11 @@ int _SipCtrlInterface::load()
168 247
 }
169 248
 
170 249
 _SipCtrlInterface::_SipCtrlInterface()
171
-    : stopped(false), udp_servers(NULL), udp_sockets(NULL),
172
-      nr_udp_sockets(0), nr_udp_servers(0)
250
+    : stopped(false),
251
+      udp_servers(NULL), udp_sockets(NULL),
252
+      nr_udp_sockets(0), nr_udp_servers(0),
253
+      tcp_servers(NULL), tcp_sockets(NULL),
254
+      nr_tcp_sockets(0), nr_tcp_servers(0)
173 255
 {
174 256
     trans_layer::instance()->register_ua(this);
175 257
 }
... ...
@@ -300,6 +382,12 @@ int _SipCtrlInterface::run()
300 382
 	}
301 383
     }
302 384
 
385
+    if (NULL != tcp_servers) {
386
+	for(int i=0; i<nr_tcp_servers;i++){
387
+	    tcp_servers[i]->start();
388
+	}
389
+    }
390
+
303 391
     while (!stopped.get()) {
304 392
         stopped.wait_for();
305 393
     }
... ...
@@ -329,11 +417,22 @@ void _SipCtrlInterface::cleanup()
329 417
 	nr_udp_servers = 0;
330 418
     }
331 419
 
420
+    if (NULL != tcp_servers) {
421
+	for(int i=0; i<nr_tcp_servers;i++){
422
+	    tcp_servers[i]->stop();
423
+	    tcp_servers[i]->join();
424
+	    delete tcp_servers[i];
425
+	}
426
+
427
+	delete [] tcp_servers;
428
+	tcp_servers = NULL;
429
+	nr_tcp_servers = 0;
430
+    }
431
+
332 432
     trans_layer::instance()->clear_transports();
333 433
 
334 434
     if (NULL != udp_sockets) {
335 435
 	for(int i=0; i<nr_udp_sockets;i++){
336
-	    //delete udp_sockets[i];
337 436
 	    DBG("dec_ref(%p)",udp_sockets[i]);
338 437
 	    dec_ref(udp_sockets[i]);
339 438
 	}
... ...
@@ -342,6 +441,17 @@ void _SipCtrlInterface::cleanup()
342 441
 	udp_sockets = NULL;
343 442
 	nr_udp_sockets = 0;
344 443
     }
444
+
445
+    if (NULL != tcp_sockets) {
446
+	for(int i=0; i<nr_tcp_sockets;i++){
447
+	    DBG("dec_ref(%p)",tcp_sockets[i]);
448
+	    dec_ref(tcp_sockets[i]);
449
+	}
450
+
451
+	delete [] tcp_sockets;
452
+	tcp_sockets = NULL;
453
+	nr_tcp_sockets = 0;
454
+    }
345 455
 }
346 456
 
347 457
 int _SipCtrlInterface::send(const AmSipReply &rep, const string& dialog_id,
... ...
@@ -48,6 +48,9 @@ class trans_ticket;
48 48
 class udp_trsp_socket;
49 49
 class udp_trsp;
50 50
 
51
+class tcp_server_socket;
52
+class tcp_trsp;
53
+
51 54
 class _SipCtrlInterface:
52 55
     public sip_ua
53 56
 {
... ...
@@ -67,6 +70,18 @@ class _SipCtrlInterface:
67 70
     unsigned short    nr_udp_servers;
68 71
     udp_trsp**        udp_servers;
69 72
 
73
+    unsigned short    nr_tcp_sockets;
74
+    tcp_server_socket** tcp_sockets;
75
+
76
+    unsigned short    nr_tcp_servers;
77
+    tcp_trsp**        tcp_servers;
78
+
79
+    int alloc_udp_structs();
80
+    int init_udp_servers(int if_num);
81
+
82
+    int alloc_tcp_structs();
83
+    int init_tcp_servers(int if_num);
84
+
70 85
 public:
71 86
 
72 87
     static string outbound_host;
... ...
@@ -583,7 +583,9 @@ int main(int argc, char* argv[])
583 583
   AmRtpReceiver::instance()->start();
584 584
 
585 585
   INFO("Starting SIP stack (control interface)\n");
586
-  sip_ctrl.load();
586
+  if(sip_ctrl.load()) {
587
+    goto error;
588
+  }
587 589
   
588 590
   INFO("Loading plug-ins\n");
589 591
   AmPlugIn::instance()->init();
... ...
@@ -113,8 +113,7 @@ using std::list;
113 113
 enum {
114 114
     ST_CR=100,
115 115
     ST_LF,
116
-    ST_CRLF,
117
-    ST_EoL_WSP // [CR] LF WSP
116
+    ST_CRLF
118 117
 };
119 118
 
120 119
 #define case_CR_LF \
... ...
@@ -80,6 +80,8 @@ struct sip_header
80 80
     ~sip_header();
81 81
 };
82 82
 
83
+int parse_header_type(sip_header* h);
84
+
83 85
 int parse_headers(list<sip_header*>& hdrs, char** c, char* end);
84 86
 void free_headers(list<sip_header*>& hdrs);
85 87
 
... ...
@@ -237,7 +237,7 @@ int parse_method(int* method, const char* beg, int len)
237 237
 }
238 238
 
239 239
 
240
-static int parse_first_line(sip_msg* msg, char** c)
240
+static int parse_first_line(sip_msg* msg, char** c, char* end)
241 241
 {
242 242
     enum {
243 243
 	FL_METH=0,
... ...
@@ -269,7 +269,7 @@ static int parse_first_line(sip_msg* msg, char** c)
269 269
 
270 270
     bool is_request=false;
271 271
 
272
-    for(;**c;(*c)++){
272
+    for(;(*c < end) && **c;(*c)++){
273 273
 
274 274
 	switch(st){
275 275
 
... ...
@@ -334,6 +334,9 @@ static int parse_first_line(sip_msg* msg, char** c)
334 334
 		    st = FL_SIPVER_SP;
335 335
 		}
336 336
 	    }
337
+	    else {
338
+	      st = FL_ERR;
339
+	    }
337 340
 	    break;
338 341
 
339 342
 	case FL_METH:
... ...
@@ -520,15 +523,16 @@ int parse_headers(sip_msg* msg, char** c, char* end)
520 523
 int parse_sip_msg(sip_msg* msg, char*& err_msg)
521 524
 {
522 525
     char* c = msg->buf;
526
+    char* end = msg->buf + msg->len;
523 527
 
524
-    int err = parse_first_line(msg,&c);
528
+    int err = parse_first_line(msg,&c,end);
525 529
 
526 530
     if(err) {
527 531
 	err_msg = (char*)"Could not parse first line";
528 532
 	return MALFORMED_FLINE;
529 533
     }
530 534
 
531
-    err = parse_headers(msg,&c,c+msg->len);
535
+    err = parse_headers(msg,&c,end);
532 536
 
533 537
     if(!err){
534 538
 	msg->body.set(c,msg->len - (c - msg->buf));
535 539
new file mode 100644
... ...
@@ -0,0 +1,290 @@
1
+#include "sip_parser_async.h"
2
+#include "parse_common.h"
3
+#include "log.h"
4
+
5
+#include "AmUtils.h"
6
+
7
+#include <string.h>
8
+
9
+#include <string>
10
+using std::string;
11
+
12
+static int skip_line_async(parser_state* pst, char* end)
13
+{
14
+  char*& c = pst->c;
15
+  int& st = pst->st;
16
+  int& saved_st = pst->saved_st;
17
+
18
+  for(; (c < end) && *c; c++){
19
+
20
+    switch(st){
21
+
22
+    case 0/* START */:
23
+      switch(*c) {
24
+      case_CR_LF;
25
+      default: break;
26
+      }
27
+      break;
28
+
29
+    case_ST_CR(*c);
30
+
31
+    case ST_LF:
32
+    case ST_CRLF:
33
+      return 0;
34
+
35
+    default:
36
+      DBG("Bad state! st=%i\n",st);
37
+      return -99;
38
+    }
39
+  }
40
+
41
+  return UNEXPECTED_EOT;
42
+}
43
+
44
+static int parse_header_async(sip_header* hdr, parser_state* pst, char* end)
45
+{
46
+    //
47
+    // Header states
48
+    //
49
+    enum {
50
+	H_NAME=0,
51
+	H_HCOLON,
52
+	H_VALUE_SWS,
53
+	H_VALUE,
54
+    };
55
+
56
+    int& st = pst->st;
57
+    int& saved_st = pst->saved_st;
58
+
59
+    char** c = &(pst->c);
60
+    char*& begin = pst->beg;
61
+
62
+    for(;**c && (*c < end);(*c)++){
63
+
64
+	switch(st){
65
+
66
+	case H_NAME:
67
+	    switch(**c){
68
+
69
+	    case_CR_LF;
70
+
71
+	    case HCOLON:
72
+		st = H_VALUE_SWS;
73
+		hdr->name.set(begin,*c-begin);
74
+		break;
75
+
76
+	    case SP:
77
+	    case HTAB:
78
+		st = H_HCOLON;
79
+		hdr->name.set(begin,*c-begin);
80
+		break;
81
+	    }
82
+	    break;
83
+
84
+	case H_VALUE_SWS:
85
+	    switch(**c){
86
+
87
+	    case_CR_LF;
88
+
89
+	    case SP:
90
+	    case HTAB:
91
+		break;
92
+
93
+	    default:
94
+		st = H_VALUE;
95
+		begin = *c;
96
+		break;
97
+		
98
+	    };
99
+	    break;
100
+
101
+	case H_VALUE:
102
+	    switch(**c){
103
+		case_CR_LF;
104
+	    };
105
+	    break;
106
+
107
+	case H_HCOLON:
108
+	    switch(**c){
109
+	    case HCOLON:
110
+		st = H_VALUE_SWS;
111
+		break;
112
+
113
+	    case SP:
114
+	    case HTAB:
115
+		break;
116
+
117
+	    default:
118
+		DBG("Missing ':' after header name\n");
119
+		return MALFORMED_SIP_MSG;
120
+	    }
121
+	    break;
122
+
123
+	case_ST_CR(**c);
124
+
125
+	case ST_LF:
126
+	case ST_CRLF:
127
+	    switch(saved_st){
128
+
129
+	    case H_NAME:
130
+		if((*c-(st==ST_CRLF?2:1))-begin == 0){
131
+		    //DBG("Detected end of headers\n");
132
+		    return 0;
133
+		}
134
+ 		DBG("Illegal CR or LF in header name: <%.*s>\n",
135
+		    (int)(*c-begin),begin);
136
+ 		return MALFORMED_SIP_MSG;
137
+
138
+	    case H_VALUE_SWS:
139
+		if(!IS_WSP(**c)){
140
+		    DBG("Malformed header: <%.*s>\n",(int)(*c-begin),begin);
141
+		    return MALFORMED_SIP_MSG;
142
+		}
143
+		break;
144
+
145
+	    case H_VALUE:
146
+		if(!IS_WSP(**c)){
147
+		    hdr->value.set(begin,(*c-(st==ST_CRLF?2:1))-begin);
148
+
149
+		    //DBG("hdr: \"%.*s: %.*s\"\n",
150
+		    //     hdr->name.len,hdr->name.s,
151
+		    //     hdr->value.len,hdr->value.s);
152
+		    return 0;
153
+		}
154
+		break;
155
+
156
+	    default:
157
+		DBG("Oooops! st=%i\n",saved_st);
158
+		break;
159
+	    }
160
+
161
+	    st = saved_st;
162
+	    break;
163
+	}
164
+    }
165
+
166
+    switch(st){
167
+
168
+    case H_NAME:
169
+    case H_VALUE:
170
+	return UNEXPECTED_EOT;
171
+
172
+    case ST_LF:
173
+    case ST_CRLF:
174
+	switch(saved_st){
175
+	    
176
+	case H_NAME:
177
+	    if((*c-(st==ST_CRLF?2:1))-begin == 0){
178
+		//DBG("Detected end of headers\n");
179
+		return 0;
180
+	    }
181
+	    DBG("Illegal CR or LF in header name\n");
182
+	    return MALFORMED_SIP_MSG;
183
+	}
184
+	break;
185
+    }
186
+    
187
+    DBG("Incomplete header (st=%i;saved_st=%i)\n",st,saved_st);
188
+    return UNEXPECTED_EOT;
189
+}
190
+
191
+int parse_headers_async(parser_state* pst, char* end)
192
+{
193
+  char*& c = pst->c;
194
+  int& st = pst->st;
195
+  int& saved_st = pst->saved_st;
196
+  sip_header* hdr = &(pst->hdr);
197
+
198
+  while(c < end) {
199
+
200
+    int err = parse_header_async(hdr, pst, end);
201
+    if(err) return err;
202
+
203
+    if(hdr->name.len && hdr->value.len) {
204
+      int type = parse_header_type(hdr);
205
+      if(type == sip_header::H_CONTENT_LENGTH)
206
+	str2int(c2stlstr(hdr->value),pst->content_len);
207
+    }
208
+
209
+    if(!hdr->name.len && !hdr->value.len) {
210
+      // end-of-headers
211
+      return 0;
212
+    }
213
+
214
+    // reset header struct
215
+    memset(hdr,0,sizeof(sip_header));
216
+    st = 0;
217
+    saved_st = 0;
218
+    pst->beg = c;
219
+  }
220
+
221
+  return UNEXPECTED_EOT;
222
+}
223
+
224
+int skip_sip_msg_async(parser_state* pst, char* end)
225
+{
226
+  enum {
227
+    ST_FL=0,
228
+    ST_HDRS,
229
+    ST_BODY
230
+  };
231
+
232
+  int err=0;
233
+
234
+  char*& c = pst->c;
235
+  int& stage = pst->stage;
236
+  int& st = pst->st;
237
+  int& saved_st = pst->saved_st;
238
+
239
+  while(c < end) {
240
+
241
+    switch(stage) {
242
+    case ST_FL:
243
+      err = skip_line_async(pst,end);
244
+      break;
245
+
246
+    case ST_HDRS:
247
+      err = parse_headers_async(pst,end);
248
+      break;
249
+
250
+    case ST_BODY:
251
+      if(!pst->content_len)
252
+	return 0;
253
+      if(pst->content_len > end-c)
254
+	return UNEXPECTED_EOT;
255
+      else
256
+	return 0;
257
+      break;
258
+
259
+    default:
260
+      ERROR("unkown state!!!");
261
+      return -1;
262
+    }
263
+
264
+    if(!err) {
265
+      switch(stage) {
266
+      case ST_FL:
267
+	stage = ST_HDRS;
268
+	pst->reset_hdr_parser();
269
+	break;
270
+      case ST_HDRS:
271
+	if(!pst->hdr.name.len && !pst->hdr.value.len) {
272
+	  // End-of-Header found
273
+	  stage = ST_BODY;
274
+	  continue;
275
+	}
276
+	else {
277
+	  // End-of-one-Header
278
+	  pst->reset_hdr_parser();
279
+	  continue;
280
+	}
281
+	break;
282
+      }
283
+    }
284
+    else {
285
+      return err;
286
+    }
287
+  }
288
+
289
+  return err;
290
+}
0 291
new file mode 100644
... ...
@@ -0,0 +1,40 @@
1
+#ifndef _sip_parser_async_h_
2
+#define _sip_parser_async_h_
3
+
4
+#include "parse_header.h"
5
+
6
+struct parser_state
7
+{
8
+  char* orig_buf;
9
+  char* c; // cursor
10
+  char* beg; // last marker for field start
11
+
12
+  int stage;
13
+  int st; // parser state (within stage)
14
+  int saved_st; // saved parser state (within stage)
15
+  sip_header hdr; // temporary header struct
16
+  
17
+  int content_len; // detected body content-length
18
+
19
+  parser_state()
20
+    : orig_buf(NULL),c(NULL),beg(NULL),
21
+      stage(0),st(0),saved_st(0),
22
+      content_len(0)
23
+  {}
24
+
25
+  void reset(char* buf) {
26
+    c = orig_buf = buf;
27
+    reset_hdr_parser();
28
+    stage = content_len = 0;
29
+  }
30
+
31
+  void reset_hdr_parser() {
32
+    memset(&hdr,0,sizeof(sip_header));
33
+    st = saved_st = 0;
34
+    beg = c;
35
+  }
36
+};
37
+
38
+int skip_sip_msg_async(parser_state* pst, char* end);
39
+
40
+#endif
0 41
new file mode 100644
... ...
@@ -0,0 +1,594 @@
1
+#include "tcp_trsp.h"
2
+#include "ip_util.h"
3
+#include "parse_common.h"
4
+#include "sip_parser.h"
5
+#include "trans_layer.h"
6
+
7
+#include "AmUtils.h"
8
+
9
+#include <netdb.h>
10
+#include <event2/event.h>
11
+#include <string.h>
12
+#include <fcntl.h>
13
+#include <sys/ioctl.h>
14
+
15
+
16
+void tcp_trsp_socket::on_sock_read(int fd, short ev, void* arg)
17
+{
18
+  if(ev & EV_READ){
19
+    ((tcp_trsp_socket*)arg)->on_read(ev);
20
+  }
21
+}
22
+
23
+void tcp_trsp_socket::on_sock_write(int fd, short ev, void* arg)
24
+{
25
+  if(ev & EV_WRITE){
26
+    ((tcp_trsp_socket*)arg)->on_write(ev);
27
+  }
28
+}
29
+
30
+tcp_trsp_socket::tcp_trsp_socket(tcp_server_socket* server_sock,
31
+				 int sd, const sockaddr_storage* sa,
32
+				 struct event_base* evbase)
33
+  : trsp_socket(server_sock->get_if(),0,0,sd),
34
+    server_sock(server_sock), closed(false), connected(false),
35
+    input_len(0), evbase(evbase),
36
+    read_ev(NULL), write_ev(NULL)
37
+{
38
+  // local address
39
+  ip = server_sock->get_ip();
40
+  port = server_sock->get_port();
41
+  server_sock->copy_addr_to(&addr);
42
+
43
+  // peer address
44
+  memcpy(&peer_addr,sa,sizeof(sockaddr_storage));
45
+
46
+  char host[NI_MAXHOST] = "";
47
+  peer_ip = am_inet_ntop(&peer_addr,host,NI_MAXHOST);
48
+  peer_port = am_get_port(&peer_addr);
49
+
50
+  // async parser state
51
+  pst.reset((char*)input_buf);
52
+
53
+  if(sd > 0) {
54
+    connected = true;
55
+    add_read_event();
56
+  }
57
+}
58
+
59
+void tcp_trsp_socket::add_read_event()
60
+{
61
+  if(!read_ev) {
62
+    read_ev = event_new(evbase, sd, EV_READ|EV_PERSIST,
63
+			tcp_trsp_socket::on_sock_read,
64
+			(void *)this);
65
+
66
+    // TODO: add connection idle-timeout here
67
+    struct timeval idle_timer;
68
+    idle_timer.tv_sec = 10;
69
+    idle_timer.tv_usec = 0;
70
+
71
+    event_add(read_ev, &idle_timer);
72
+  }
73
+}
74
+
75
+void tcp_trsp_socket::add_write_event(struct timeval* timeout)
76
+{
77
+  if(!write_ev) {
78
+    write_ev = event_new(evbase, sd, EV_WRITE,
79
+			 tcp_trsp_socket::on_sock_write,
80
+			 (void *)this);
81
+  }
82
+
83
+  event_add(write_ev, timeout);
84
+}
85
+
86
+tcp_trsp_socket::~tcp_trsp_socket()
87
+{
88
+  DBG("********* connection destructor ***********");
89
+}
90
+
91
+void tcp_trsp_socket::copy_peer_addr(sockaddr_storage* sa)
92
+{
93
+  memcpy(sa,&peer_addr,sizeof(sockaddr_storage));
94
+}
95
+
96
+tcp_trsp_socket::msg_buf::msg_buf(const sockaddr_storage* sa, const char* msg, 
97
+				  const int msg_len)
98
+  : msg_len(msg_len)
99
+{
100
+  memcpy(&addr,sa,sizeof(sockaddr_storage));
101
+  cursor = this->msg = new char[msg_len];
102
+  memcpy(this->msg,msg,msg_len);
103
+}
104
+
105
+tcp_trsp_socket::msg_buf::~msg_buf()
106
+{
107
+  delete [] msg;
108
+}
109
+
110
+int tcp_trsp_socket::on_connect(short ev)
111
+{
112
+  DBG("************ on_connect() ***********");
113
+
114
+  if(ev & EV_TIMEOUT) {
115
+    DBG("********** connection timeout on sd=%i ************\n",sd);
116
+    close();
117
+    return -1;
118
+  }
119
+
120
+  socklen_t len = sizeof(int);
121
+  int error = 0;
122
+  if(getsockopt(sd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
123
+    ERROR("getsockopt: %s",strerror(errno));
124
+    return -1;
125
+  }
126
+
127
+  if(error != 0) {
128
+    DBG("*********** connection error (sd=%i): %s *********",
129
+	sd,strerror(error));
130
+    close();
131
+    return -1;
132
+  }
133
+
134
+  connected = true;
135
+  add_read_event();
136
+
137
+  return 0;
138
+}
139
+
140
+int tcp_trsp_socket::connect()
141
+{
142
+  if(sd > 0) {
143
+    ERROR("pending connection request: close first.");
144
+    return -1;
145
+  }
146
+
147
+  if((sd = socket(peer_addr.ss_family,SOCK_STREAM,0)) == -1){
148
+    ERROR("socket: %s\n",strerror(errno));
149
+    return -1;
150
+  } 
151
+
152
+  int true_opt = 1;
153
+  if(ioctl(sd, FIONBIO , &true_opt) == -1) {
154
+    ERROR("could not make new connection non-blocking: %s\n",strerror(errno));
155
+    ::close(sd);
156
+    return -1;
157
+  }
158
+
159
+  DBG("connecting to %s:%i...",
160
+      am_inet_ntop(&peer_addr).c_str(),
161
+      am_get_port(&peer_addr));
162
+
163
+  int ret = ::connect(sd, (const struct sockaddr*)&peer_addr, 
164
+		      SA_len(&peer_addr));
165
+  if(ret < 0) {
166
+    if(errno != EINPROGRESS && errno != EALREADY) {
167
+      ERROR("could not connect: %s",strerror(errno));
168
+      ::close(sd);
169
+      return -1;
170
+    }
171
+
172
+    struct timeval connect_timeout;
173
+    connect_timeout.tv_sec = 0;
174
+    connect_timeout.tv_usec = 200000; /* 200ms */
175
+    add_write_event(&connect_timeout);
176
+    DBG("connect event added...");
177
+  }
178
+  else {
179
+    // connect succeeded immediatly
180
+    connected = true;
181
+    add_read_event();
182
+  }
183
+  
184
+  return 0;
185
+}
186
+
187
+int tcp_trsp_socket::send(const sockaddr_storage* sa, const char* msg, 
188
+			  const int msg_len)
189
+{
190
+  AmLock _l(sock_mut);
191
+
192
+  if((sd < 0) && (connect() < 0)){
193
+    return -1;
194
+  }
195
+
196
+  // async send
197
+  // TODO: do we need a sync-send as well???
198
+  //   (for ex., when sending errors from recv-thread?)
199
+  DBG("stacking message on send-queue");
200
+  send_q.push_back(new msg_buf(sa,msg,msg_len));
201
+
202
+  DBG("add write event");
203
+  add_write_event();
204
+    
205
+  return 0;
206
+}
207
+
208
+void tcp_trsp_socket::close()
209
+{
210
+  // TODO:
211
+  // - remove the socket object from 
212
+  //   server mapping and from memory.
213
+
214
+  closed = true;
215
+  DBG("********* closing connection ***********");
216
+
217
+  if(read_ev) {
218
+    event_free(read_ev);
219
+    read_ev = NULL;
220
+  }
221
+
222
+  if(write_ev) {
223
+    event_free(write_ev);
224
+    write_ev = NULL;
225
+  }
226
+
227
+  ::close(sd);
228
+  sd = -1;
229
+
230
+  server_sock->remove_connection(this);
231
+}
232
+
233
+void tcp_trsp_socket::on_read(short ev)
234
+{
235
+  if(ev & EV_TIMEOUT) {
236
+    DBG("************ idle timeout **********");
237
+    return;
238
+  }
239
+
240
+  AmLock _l(sock_mut);
241
+  DBG("on_read (connected = %i)",connected);
242
+
243
+  char* old_cursor = (char*)get_input();
244
+
245
+  int bytes = ::read(sd,get_input(),get_input_free_space());
246
+  if(bytes < 0) {
247
+    switch(errno) {
248
+    case EAGAIN:
249
+      return; // nothing to read
250
+
251
+    case ECONNRESET:
252
+    case ENOTCONN:
253
+      DBG("connection has been closed (sd=%i)",sd);
254
+      close();
255
+      return;
256
+
257
+    case ETIMEDOUT:
258
+      DBG("transmission timeout (sd=%i)",sd);
259
+      close();
260
+      return;
261
+
262
+    default:
263
+      DBG("unknown error (%i): %s",errno,strerror(errno));
264
+      close();
265
+      return;
266
+    }
267
+  }
268
+  else if(bytes == 0) {
269
+    // connection closed
270
+    DBG("connection has been closed (sd=%i)",sd);
271
+    close();
272
+    return;
273
+  }
274
+
275
+  input_len += bytes;
276
+
277
+  DBG("received: <%.*s>",bytes,old_cursor);
278
+
279
+  // ... and parse it
280
+  if(parse_input() < 0) {
281
+    DBG("Error while parsing input: closing connection!");
282
+    close();
283
+  }
284
+}
285
+
286
+int tcp_trsp_socket::parse_input()
287
+{
288
+  int err = skip_sip_msg_async(&pst, (char*)(input_buf+input_len));
289
+  if(err) {
290
+
291
+    if((err == UNEXPECTED_EOT) &&
292
+       get_input_free_space()) {
293
+
294
+      return 0;
295
+    }
296
+
297
+    if(!get_input_free_space()) {
298
+      DBG("message way too big! should drop connection...");
299
+    }
300
+
301
+    //TODO: drop connection
302
+    // close connection? let's see...
303
+    ERROR("parsing error %i",err);
304
+
305
+    pst.reset((char*)input_buf);
306
+    reset_input();
307
+
308
+    return -1;
309
+  }
310
+
311
+  int msg_len = pst.c - (char*)input_buf + pst.content_len;
312
+  DBG("received msg:\n%.*s",msg_len,input_buf);
313
+
314
+  sip_msg* s_msg = new sip_msg((const char*)input_buf,msg_len);
315
+  pst.reset((char*)input_buf);
316
+  reset_input();
317
+
318
+  copy_peer_addr(&s_msg->remote_ip);
319
+  copy_addr_to(&s_msg->local_ip);
320
+
321
+  s_msg->local_socket = this;
322
+  inc_ref(this);
323
+
324
+  // pass message to the parser / transaction layer
325
+  trans_layer::instance()->received_msg(s_msg);
326
+
327
+  return 0;
328
+}
329
+
330
+void tcp_trsp_socket::on_write(short ev)
331
+{
332
+  AmLock _l(sock_mut);
333
+  DBG("on_write (connected = %i)",connected);
334
+
335
+  if(!connected && (on_connect(ev) != 0)) {
336
+    return;
337
+  }
338
+
339
+  while(!send_q.empty()) {
340
+
341
+    msg_buf* msg = send_q.front();
342
+    if(!msg || !msg->bytes_left()) {
343
+      send_q.pop_front();
344
+      delete msg;
345
+      continue;
346
+    }
347
+
348
+    // send msg
349
+    int bytes = write(sd,msg->cursor,msg->bytes_left());
350
+    if(bytes < 0) {
351
+      DBG("error on write: %i",bytes);
352
+      switch(errno){
353
+      case EINTR:
354
+      case EAGAIN: // would block
355
+	break;
356
+
357
+      default: // unforseen error: close connection
358
+	ERROR("unforseen error: close connection (%i/%s)",
359
+	      errno,strerror(errno));
360
+	close();
361
+	break;
362
+      }
363
+
364
+      return;
365
+    }
366
+
367
+    DBG("bytes written: <%.*s>",bytes,msg->cursor);
368
+
369
+    if(bytes < msg->bytes_left()) {
370
+      msg->cursor += bytes;
371
+      return;
372
+    }
373
+
374
+    send_q.pop_front();
375
+    delete msg;
376
+  }
377
+}
378
+
379
+tcp_server_socket::tcp_server_socket(unsigned short if_num)
380
+  : trsp_socket(if_num,0),
381
+    evbase(NULL)
382
+{
383
+}
384
+
385
+int tcp_server_socket::bind(const string& bind_ip, unsigned short bind_port)
386
+{
387
+  if(sd){
388
+    WARN("re-binding socket\n");
389
+    close(sd);
390
+  }
391
+
392
+  if(am_inet_pton(bind_ip.c_str(),&addr) == 0){
393
+	
394
+    ERROR("am_inet_pton(%s): %s\n",bind_ip.c_str(),strerror(errno));
395
+    return -1;
396
+  }
397
+    
398
+  if( ((addr.ss_family == AF_INET) && 
399
+       (SAv4(&addr)->sin_addr.s_addr == INADDR_ANY)) ||
400
+      ((addr.ss_family == AF_INET6) && 
401
+       IN6_IS_ADDR_UNSPECIFIED(&SAv6(&addr)->sin6_addr)) ){
402
+
403
+    ERROR("Sorry, we cannot bind to 'ANY' address\n");
404
+    return -1;
405
+  }
406
+
407
+  am_set_port(&addr,bind_port);
408
+
409
+  if((sd = socket(addr.ss_family,SOCK_STREAM,0)) == -1){
410
+    ERROR("socket: %s\n",strerror(errno));
411
+    return -1;
412
+  } 
413
+
414
+  int true_opt = 1;
415
+  if(setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
416
+		(void*)&true_opt, sizeof (true_opt)) == -1) {
417
+    
418
+    ERROR("%s\n",strerror(errno));
419
+    close(sd);
420
+    return -1;
421
+  }
422
+
423
+  if(ioctl(sd, FIONBIO , &true_opt) == -1) {
424
+    ERROR("setting non-blocking: %s\n",strerror(errno));
425
+    close(sd);
426
+    return -1;
427
+  }
428
+
429
+  if(::bind(sd,(const struct sockaddr*)&addr,SA_len(&addr)) < 0) {
430
+
431
+    ERROR("bind: %s\n",strerror(errno));
432
+    close(sd);
433
+    return -1;
434
+  }
435
+
436
+  if(::listen(sd, 16) < 0) {
437
+    ERROR("listen: %s\n",strerror(errno));
438
+    close(sd);
439
+    return -1;
440
+  }
441
+
442
+  port = bind_port;
443
+  ip   = bind_ip;
444
+
445
+  DBG("TCP transport bound to %s/%i\n",ip.c_str(),port);
446
+
447
+  return 0;
448
+}
449
+
450
+static void on_accept(int fd, short ev, void* arg)
451
+{
452
+  tcp_server_socket* trsp = (tcp_server_socket*)arg;
453
+  trsp->on_accept(fd,ev);
454
+}
455
+
456
+void tcp_server_socket::add_event(struct event_base *evbase)
457
+{
458
+  this->evbase = evbase;
459
+
460
+  if(!ev_accept) {
461
+    ev_accept = event_new(evbase, sd, EV_READ|EV_PERSIST,
462
+			  ::on_accept, (void *)this);
463
+    event_add(ev_accept, NULL); // no timeout
464
+  }
465
+}
466
+
467
+void tcp_server_socket::add_connection(tcp_trsp_socket* client_sock)
468
+{
469
+  string conn_id = client_sock->get_peer_ip()
470
+    + ":" + int2str(client_sock->get_peer_port());
471
+
472
+  DBG("new TCP connection from %s:%u",
473
+      client_sock->get_peer_ip().c_str(),
474
+      client_sock->get_peer_port());
475
+
476
+  connections_mut.lock();
477
+
478
+  map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id);
479
+  if(sock_it != connections.end()) {
480
+    dec_ref(sock_it->second);
481
+    sock_it->second = client_sock;
482
+  }
483
+  else {
484
+    connections[conn_id] = client_sock;
485
+  }
486
+  inc_ref(client_sock);
487
+
488
+  connections_mut.unlock();
489
+}
490
+
491
+void tcp_server_socket::remove_connection(tcp_trsp_socket* client_sock)
492
+{
493
+  string conn_id = client_sock->get_peer_ip()
494
+    + ":" + int2str(client_sock->get_peer_port());
495
+
496
+  DBG("removing TCP connection from %s:%u",
497
+      client_sock->get_peer_ip().c_str(), client_sock->get_peer_port());
498
+
499
+  connections_mut.lock();
500
+
501
+  map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id);
502
+  if(sock_it != connections.end()) {
503
+    dec_ref(sock_it->second);
504
+    connections.erase(sock_it);
505
+  }
506
+
507
+  connections_mut.unlock();
508
+}
509
+
510
+void tcp_server_socket::on_accept(int sd, short ev)
511
+{
512
+  sockaddr_storage src_addr;
513
+  socklen_t        src_addr_len = sizeof(sockaddr_storage);
514
+
515
+  int connection_sd = accept(sd,(sockaddr*)&src_addr,&src_addr_len);
516
+  if(connection_sd < 0) {
517
+    WARN("error while accepting connection");
518
+    return;
519
+  }
520
+
521
+  int true_opt = 1;
522
+  if(ioctl(connection_sd, FIONBIO , &true_opt) == -1) {
523
+    ERROR("could not make new connection non-blocking: %s\n",strerror(errno));
524
+    close(connection_sd);
525
+    return;
526
+  }
527
+
528
+  // in case of thread pooling, do following in worker thread
529
+  tcp_trsp_socket* client_sock =
530
+    new tcp_trsp_socket(this,connection_sd,&src_addr,evbase); 
531
+  add_connection(client_sock);
532
+}
533
+
534
+int tcp_server_socket::send(const sockaddr_storage* sa, const char* msg, const int msg_len)
535
+{
536
+  char host_buf[NI_MAXHOST];
537
+  string dest = am_inet_ntop(sa,host_buf,NI_MAXHOST);
538
+  dest += ":" + int2str(am_get_port(sa));
539
+
540
+  int ret=0;
541
+  connections_mut.lock();
542
+  map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(dest);
543
+  if(sock_it != connections.end()) {
544
+    ret = sock_it->second->send(sa,msg,msg_len);
545
+  }
546
+  else {
547
+    tcp_trsp_socket* new_sock = new tcp_trsp_socket(this,-1,sa,evbase);
548
+    connections[dest] = new_sock;
549
+    inc_ref(new_sock);
550
+    ret = new_sock->send(sa,msg,msg_len);
551
+  }
552
+  
553
+  connections_mut.unlock();
554
+
555
+  return ret;
556
+}
557
+
558
+/** @see trsp_socket */
559
+
560
+tcp_trsp::tcp_trsp(tcp_server_socket* sock)
561
+    : transport(sock)
562
+{
563
+  evbase = event_base_new();
564
+  sock->add_event(evbase);
565
+}
566
+
567
+tcp_trsp::~tcp_trsp()
568
+{
569
+  if(evbase) {
570
+    event_base_free(evbase);
571
+  }
572
+}
573
+
574
+/** @see AmThread */
575
+void tcp_trsp::run()
576
+{
577
+  int server_sd = sock->get_sd();
578
+  if(server_sd <= 0){
579
+    ERROR("Transport instance not bound\n");
580
+    return;
581
+  }
582
+
583
+  INFO("Started SIP server TCP transport on %s:%i\n",
584
+       sock->get_ip(),sock->get_port());
585
+
586
+  /* Start the event loop. */
587
+  event_base_dispatch(evbase);
588
+}
589
+
590
+/** @see AmThread */
591
+void tcp_trsp::on_stop()
592
+{
593
+}
594
+
0 595
new file mode 100644
... ...
@@ -0,0 +1,177 @@
1
+#ifndef _tcp_trsp_h_
2
+#define _tcp_trsp_h_
3
+
4
+#include "transport.h"
5
+#include "sip_parser_async.h"
6
+
7
+/**
8
+ * Maximum message length for TCP
9
+ * not including terminating '\0'
10
+ */
11
+#define MAX_TCP_MSGLEN 65535
12
+
13
+#include <sys/socket.h>
14
+#include <event2/event.h>
15
+
16
+#include <map>
17
+#include <deque>
18
+#include <string>
19
+using std::map;
20
+using std::deque;
21
+using std::string;
22
+
23
+class tcp_server_socket;