Browse code

sip: try next target on TCP immediate errors

TODO: handle also the async case (msg stacked on send queue).

Raphael Coeffic authored on 19/11/2013 12:32:03
Showing 2 changed files
... ...
@@ -105,9 +105,44 @@ void _trans_layer::clear_transports()
105 105
     transports.clear();
106 106
 }
107 107
 
108
+int _trans_layer::set_trsp_socket(sip_msg* msg, const cstring& next_trsp,
109
+				  int out_interface)
110
+{
111
+    if((out_interface < 0)
112
+       || ((unsigned int)out_interface >= transports.size())) {
113
+
114
+	out_interface = find_outbound_if(&msg->remote_ip);
115
+	if(out_interface < 0) {
116
+	    DBG("could not find any suitable outbound interface");
117
+	    return -1;
118
+	}
119
+    }
120
+
121
+    if(transports[out_interface].empty()) {
122
+	ERROR("no transport for this interface");
123
+	return -1;
124
+    }
125
+
126
+    prot_collection::iterator prot_sock_it =
127
+	transports[out_interface].find(c2stlstr(next_trsp));
128
+
129
+    // if we couldn't find anything, take whatever is there...
130
+    if(prot_sock_it == transports[out_interface].end()) {
131
+	prot_sock_it = transports[out_interface].begin();
132
+    }
133
+
134
+    if(msg->local_socket) dec_ref(msg->local_socket);
135
+    msg->local_socket = prot_sock_it->second;
136
+    inc_ref(msg->local_socket);
137
+
138
+    return 0;
139
+}
140
+
108 141
 static int patch_contact_transport(sip_header* contact, const cstring& trsp,
109 142
 				   string& n_contact)
110 143
 {
144
+    DBG("contact: <%.*s>", contact->value.len, contact->value.s);
145
+
111 146
     list<cstring> contact_list;
112 147
     if(parse_nameaddr_list(contact_list, contact->value.s,
113 148
 			   contact->value.len) < 0) {
... ...
@@ -124,7 +159,7 @@ static int patch_contact_transport(sip_header* contact, const cstring& trsp,
124 159
 	sip_nameaddr na;
125 160
 	const char* c = ct_it->s;
126 161
 	if(parse_nameaddr_uri(&na,&c,ct_it->len) < 0) {
127
-	    DBG("Could not parse nameaddr & URI\n");
162
+	    DBG("Could not parse nameaddr & URI (%.*s)\n",ct_it->len,ct_it->s);
128 163
 	    return -1;
129 164
 	}
130 165
 
... ...
@@ -1075,8 +1110,9 @@ static int generate_and_parse_new_msg(sip_msg* msg, sip_msg*& p_msg)
1075 1110
     // patch Contact-HF transport parameter
1076 1111
     vector<string> contact_buffers(msg->contacts.size());
1077 1112
     vector<string>::iterator contact_buf_it = contact_buffers.begin();
1078
-
1079 1113
     list<sip_header*> n_contacts;
1114
+    
1115
+    //TODO: patch copies of the Contact-HF instead of the original HFs
1080 1116
     for(list<sip_header*>::iterator contact_it = msg->contacts.begin();
1081 1117
 	contact_it != msg->contacts.end(); contact_it++, contact_buf_it++) {
1082 1118
 	
... ...
@@ -1128,7 +1164,7 @@ static int generate_and_parse_new_msg(sip_msg* msg, sip_msg*& p_msg)
1128 1164
  	c += msg->body.len;
1129 1165
     }
1130 1166
     *c++ = '\0';
1131
- 
1167
+
1132 1168
     // and parse it
1133 1169
     char* err_msg=0;
1134 1170
     if(parse_sip_msg(p_msg,err_msg)){
... ...
@@ -1147,7 +1183,7 @@ static int generate_and_parse_new_msg(sip_msg* msg, sip_msg*& p_msg)
1147 1183
     return 0;
1148 1184
 }
1149 1185
  
1150
-int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt, 
1186
+int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt,
1151 1187
 			       const cstring& dialog_id,
1152 1188
 			       const cstring& _next_hop, 
1153 1189
 			       int out_interface, unsigned int flags,
... ...
@@ -1173,7 +1209,8 @@ int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt,
1173 1209
 
1174 1210
 	res = parse_next_hop(_next_hop,dest_list);
1175 1211
 	if(res || dest_list.empty()) {
1176
-	    DBG("parse_next_hop %.*s failed (%i)\n",_next_hop.len, _next_hop.s, res);
1212
+	    DBG("parse_next_hop %.*s failed (%i)\n",
1213
+		_next_hop.len, _next_hop.s, res);
1177 1214
 	    return res;
1178 1215
 	}
1179 1216
     }
... ...
@@ -1186,78 +1223,18 @@ int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt,
1186 1223
 	dest_list.push_back(dest);
1187 1224
     }
1188 1225
 
1189
-    string uri_buffer; // must have the same scope as 'msg'
1190
-    prepare_strict_routing(msg,uri_buffer);
1191
-
1192 1226
     auto_ptr<sip_target_set> targets(new sip_target_set());
1193
-    res = resolve_targets(dest_list,targets.get());
1227
+    res = resolver::instance()->resolve_targets(dest_list,targets.get());
1194 1228
     if(res < 0){
1195 1229
 	DBG("resolve_targets failed\n");
1196 1230
 	return res;
1197 1231
     }
1198 1232
 
1199 1233
     targets->debug();
1200
-
1201
-    cstring next_trsp;
1202 1234
     targets->reset_iterator();
1203
-    do {
1204
-	if(!targets->has_next()) {
1205
-	    DBG("next_ip(): no more destinations! reply 500");
1206
-	    sip_msg err;
1207
-	    set_err_reply_from_req(&err,msg,500,
1208
-				   "No destination available");
1209
-	    ua->handle_sip_reply(c2stlstr(dialog_id),&err);
1210
-	    return 0;
1211
-	}
1212
-
1213
-	targets->copy_next(&msg->remote_ip,&next_trsp);
1214
-	targets->next();
1215
-
1216
-	//TODO: take care of port manipulation in SRV case
1217
-	//      and the likes
1218
-	//
1219
-	// //If a SRV record is involved, the port number
1220
-	// // should have been set by h_dns.next_ip(...).
1221
-	// if(!am_get_port(&sa)){
1222
-	//     //Else, we copy the old port number
1223
-	//     am_set_port(&sa,am_get_port(&msg->remote_ip));
1224
-	// }
1225
-    } while(!(flags & TR_FLAG_DISABLE_BL) &&
1226
-	    tr_blacklist::instance()->exist(&msg->remote_ip));
1227
-
1228
-    if((out_interface < 0) 
1229
-       || ((unsigned int)out_interface >= transports.size())) {
1230
-
1231
-	out_interface = find_outbound_if(&msg->remote_ip);
1232
-	if(out_interface < 0) {
1233
-	    DBG("could not find any suitable outbound interface");
1234
-	    return -1;
1235
-	}
1236
-    }
1237
-
1238
-    if(transports[out_interface].empty()) {
1239
-	ERROR("no transport for this interface");
1240
-	return -1;
1241
-    }
1242
-
1243
-    // set default transport to UDP
1244
-    if(!next_trsp.len)
1245
-	next_trsp = cstring("udp");
1246
-
1247
-    prot_collection::iterator prot_sock_it =
1248
-	transports[out_interface].find(c2stlstr(next_trsp));
1249
-
1250
-    // if we couldn't find anything, take whatever is there...
1251
-    if(prot_sock_it == transports[out_interface].end()) {
1252
-	prot_sock_it = transports[out_interface].begin();
1253
-    }
1254 1235
 
1255
-    if(msg->local_socket) dec_ref(msg->local_socket);
1256
-    msg->local_socket = prot_sock_it->second;
1257
-    inc_ref(msg->local_socket);
1258
-
1259
-    tt->_bucket = 0;
1260
-    tt->_t = 0;
1236
+    string uri_buffer; // must have the same scope as 'msg'
1237
+    prepare_strict_routing(msg,uri_buffer);
1261 1238
 
1262 1239
     if(!msg->u.request->ruri_str.len ||
1263 1240
        !msg->u.request->method_str.len) {
... ...
@@ -1271,30 +1248,52 @@ int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt,
1271 1248
 	    msg->u.request->ruri_str.s);
1272 1249
     }
1273 1250
 
1251
+    int err = 0;
1274 1252
     string ruri; // buffer needs to be @ function scope
1253
+    cstring next_trsp;
1254
+    sip_msg* p_msg=NULL;
1255
+
1256
+    tt->_bucket = 0;
1257
+    tt->_t = 0;
1258
+
1259
+ try_next_dest:
1260
+    if(targets->get_next(&msg->remote_ip,next_trsp,flags) < 0) {
1261
+	DBG("next_ip(): no more destinations! reply 500");
1262
+	sip_msg err;
1263
+	set_err_reply_from_req(&err,msg,500,
1264
+			       "No destination available");
1265
+	ua->handle_sip_reply(c2stlstr(dialog_id),&err);
1266
+	return 0;
1267
+    }
1268
+
1269
+    if(set_trsp_socket(msg,next_trsp,out_interface) < 0)
1270
+	return -1;
1271
+
1275 1272
     if((flags & TR_FLAG_NEXT_HOP_RURI) &&
1276 1273
        (patch_ruri_with_remote_ip(ruri,msg) < 0)) {
1277 1274
  	return -1;
1278 1275
     }
1279 1276
 
1280 1277
     // generate new msg and parse it
1281
-    sip_msg* p_msg=NULL;
1282
-    int err = generate_and_parse_new_msg(msg,p_msg);
1278
+    err = generate_and_parse_new_msg(msg,p_msg);
1283 1279
     if(err != 0) { return err; }
1284 1280
 
1285 1281
     DBG("Sending to %s:%i <%.*s...>\n",
1286 1282
 	get_addr_str(&p_msg->remote_ip).c_str(),
1287 1283
 	ntohs(((sockaddr_in*)&p_msg->remote_ip)->sin_port),
1288
-	50 /* preview - instead of p_msg->len */,p_msg->buf);
1284
+	p_msg->len,p_msg->buf);
1289 1285
 
1290 1286
     tt->_bucket = get_trans_bucket(p_msg->callid->value,
1291 1287
 				   get_cseq(p_msg)->num_str);
1292 1288
     tt->_bucket->lock();
1293 1289
     
1294
-    int send_err = p_msg->send(flags);
1295
-    if(send_err < 0){
1290
+    err = p_msg->send(flags);
1291
+    if(err < 0){
1296 1292
 	ERROR("Error from transport layer\n");
1297 1293
 	delete p_msg;
1294
+	p_msg = NULL;
1295
+	tt->_bucket->unlock();
1296
+	goto try_next_dest;
1298 1297
     }
1299 1298
     else {
1300 1299
         stats.inc_sent_requests();
... ...
@@ -1304,12 +1303,12 @@ int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt,
1304 1303
 	int method = p_msg->u.request->method;
1305 1304
 
1306 1305
 	DBG("update_uac_request tt->_t =%p\n", tt->_t);
1307
-	send_err = update_uac_request(tt->_bucket,tt->_t,p_msg);
1308
-	if(send_err < 0){
1306
+	err = update_uac_request(tt->_bucket,tt->_t,p_msg);
1307
+	if(err < 0){
1309 1308
 	    DBG("Could not update UAC state for request\n");
1310 1309
 	    delete p_msg;
1311 1310
 	    tt->_bucket->unlock();
1312
-	    return send_err;
1311
+	    return err;
1313 1312
 	}
1314 1313
 
1315 1314
 	if(tt->_t) {
... ...
@@ -1362,7 +1361,7 @@ int _trans_layer::send_request(sip_msg* msg, trans_ticket* tt,
1362 1361
 
1363 1362
     tt->_bucket->unlock();
1364 1363
     
1365
-    return send_err;
1364
+    return err;
1366 1365
 }
1367 1366
 
1368 1367
 int _trans_layer::cancel(trans_ticket* tt, const cstring& dialog_id,
... ...
@@ -2556,34 +2555,12 @@ int _trans_layer::try_next_ip(trans_bucket* bucket, sip_trans* tr,
2556 2555
     cstring next_trsp;
2557 2556
     sockaddr_storage sa;
2558 2557
 
2559
-    do {
2560
-	memset(&sa,0,sizeof(sockaddr_storage));
2561
-    
2562
-	// get the next ip
2563
-	if(!tr->targets->has_next()){
2564
-	    DBG("no more destinations!");
2565
-	    return -1;
2566
-	}
2567
-
2568
-	tr->targets->copy_next(&sa,&next_trsp);
2569
-	tr->targets->next();
2570
-
2571
-	//TODO: take care of port manipulation in SRV case
2572
-	//      and the likes
2573
-	//
2574
-	// //If a SRV record is involved, the port number
2575
-	// // should have been set by h_dns.next_ip(...).
2576
-	// if(!am_get_port(&sa)){
2577
-	//     //Else, we copy the old port number
2578
-	//     am_set_port(&sa,am_get_port(&tr->msg->remote_ip));
2579
-	// }
2580
-    } while(!(tr->flags & TR_FLAG_DISABLE_BL) &&
2581
-	    tr_blacklist::instance()->exist(&sa));
2582
-
2583
-    //TODO: support transport changes
2584
-    // -> potentially find another socket
2585
-    //    if next_trsp is different from the 
2586
-    //    current socket's transport
2558
+ try_next_dest:
2559
+    // get the next ip
2560
+    if(tr->targets->get_next(&sa,next_trsp,tr->flags) < 0){
2561
+	DBG("no more destinations!");
2562
+	return -1;
2563
+    }
2587 2564
 
2588 2565
     if(use_new_trans) {
2589 2566
 	string   n_uri;
... ...
@@ -2603,10 +2580,14 @@ int _trans_layer::try_next_ip(trans_bucket* bucket, sip_trans* tr,
2603 2580
 	// backup R-URI before possible update
2604 2581
 	old_uri = tr->msg->u.request->ruri_str;
2605 2582
 
2583
+	int out_interface = tmp_msg.local_socket->get_if();
2584
+	tmp_msg.local_socket = NULL;
2585
+	if(set_trsp_socket(&tmp_msg,next_trsp,out_interface) < 0)
2586
+	    return -1;
2587
+
2606 2588
 	if(n_tr->flags & TR_FLAG_NEXT_HOP_RURI) {
2607 2589
 	    // patch R-URI, generate& parse new message
2608 2590
 	    if(patch_ruri_with_remote_ip(n_uri, &tmp_msg)) {
2609
-		// TODO: error handling!
2610 2591
 		ERROR("could not patch R-URI with new destination");
2611 2592
 		tmp_msg.release();
2612 2593
 		return -1;
... ...
@@ -2615,7 +2596,6 @@ int _trans_layer::try_next_ip(trans_bucket* bucket, sip_trans* tr,
2615 2596
 
2616 2597
 	sip_msg* p_msg=NULL;
2617 2598
 	if(generate_and_parse_new_msg(&tmp_msg,p_msg)) {
2618
-	    // TODO: error handling!
2619 2599
 	    ERROR("could not generate&parse new message");
2620 2600
 	    tmp_msg.release();
2621 2601
 	    return -1;
... ...
@@ -2650,6 +2630,11 @@ int _trans_layer::try_next_ip(trans_bucket* bucket, sip_trans* tr,
2650 2630
 	// copy the new address back
2651 2631
 	memcpy(&tr->msg->remote_ip,&sa,sizeof(sockaddr_storage));
2652 2632
 
2633
+	trsp_socket* old_sock = tr->msg->local_socket;
2634
+	int out_interface = old_sock->get_if();
2635
+	if(set_trsp_socket(tr->msg,next_trsp,out_interface) < 0)
2636
+	    return -1;
2637
+
2653 2638
 	if(tr->flags & TR_FLAG_NEXT_HOP_RURI) {
2654 2639
 	    string   n_uri;
2655 2640
 	    sip_msg* p_msg=NULL;
... ...
@@ -2664,17 +2649,35 @@ int _trans_layer::try_next_ip(trans_bucket* bucket, sip_trans* tr,
2664 2649
 	    delete tr->msg;
2665 2650
 	    tr->msg = p_msg;
2666 2651
 	}
2652
+	else if(old_sock != tr->msg->local_socket) {
2653
+	    string   n_uri;
2654
+	    sip_msg* p_msg=NULL;
2655
+
2656
+	    // patch R-URI, generate & parse new message
2657
+	    if(generate_and_parse_new_msg(tr->msg,p_msg)) {
2658
+		return -1;
2659
+	    }
2660
+
2661
+	    delete tr->msg;
2662
+	    tr->msg = p_msg;
2663
+	}
2667 2664
 	else {
2668 2665
 	    // only create new branch tag
2666
+	    // -> patched directly in the msg's buffer
2669 2667
 	    compute_branch((char*)(tr->msg->via_p1->branch.s+MAGIC_BRANCH_LEN),
2670 2668
 			   tr->msg->callid->value,tr->msg->cseq->value);
2671 2669
 	}
2672 2670
     }
2673 2671
    
2674
-    stats.inc_sent_requests();
2675 2672
 
2676 2673
     // and re-send
2677
-    tr->msg->send(tr->flags);
2674
+    if(tr->msg->send(tr->flags) < 0) {
2675
+	ERROR("Error from transport layer\n");
2676
+	use_new_trans = false;
2677
+	goto try_next_dest;
2678
+    }
2679
+
2680
+    stats.inc_sent_requests();
2678 2681
     
2679 2682
     if(tr->logger) {
2680 2683
 	sockaddr_storage src_ip;
... ...
@@ -253,7 +253,12 @@ protected:
253 253
 		     unsigned short* next_port, cstring* next_trsp);
254 254
 
255 255
     /**
256
+     * Fills the local_socket attribute using the given
257
+     * transport and interface. If out_interface == -1,
258
+     * we will try hard to find an interface based on msg->remote_ip.
256 259
      */
260
+    int set_trsp_socket(sip_msg* msg, const cstring& next_trsp,
261
+			int out_interface);
257 262
 
258 263
     sip_trans* copy_uac_trans(sip_trans* tr);
259 264