Browse code

rtp_media_server: bridging refactoring

support bridging after answer, play, etc.

Julien Chavanton authored on 11/03/2019 16:39:40
Showing 6 changed files
... ...
@@ -99,6 +99,8 @@ void rms_media_destroy(call_leg_media_t *m)
99 99
 
100 100
 int create_call_leg_media(call_leg_media_t *m)
101 101
 {
102
+//	if (m->ms_factory) return 0;
103
+	if (m) rms_stop_media(m);
102 104
 	m->ms_factory = rms_create_factory();
103 105
 	// create caller RTP session
104 106
 	LM_INFO("RTP session [%s:%d]<>[%s:%d]\n", m->local_ip.s, m->local_port,
... ...
@@ -175,6 +177,9 @@ int rms_start_media(call_leg_media_t *m, char *file_name)
175 177
 	MSConnectionHelper h;
176 178
 	int channels = 1;
177 179
 	int file_sample_rate = 8000;
180
+
181
+	if (m) rms_stop_media(m);
182
+
178 183
 	m->ms_ticker = rms_create_ticker(NULL);
179 184
 	if(!m->ms_ticker)
180 185
 		goto error;
... ...
@@ -118,18 +118,6 @@ void rms_sdp_info_init(rms_sdp_info_t *sdp_info)
118 118
 	memset(sdp_info, 0, sizeof(rms_sdp_info_t));
119 119
 }
120 120
 
121
-int rms_sdp_info_clone(rms_sdp_info_t *dst, rms_sdp_info_t *src)
122
-{
123
-	rms_sdp_info_init(dst);
124
-	if(!rms_str_dup(&dst->remote_ip, &src->remote_ip, 1))
125
-		return 0;
126
-	if(!rms_str_dup(&dst->payloads, &src->payloads, 1))
127
-		return 0;
128
-	if(!rms_str_dup(&dst->new_body, &src->new_body, 1))
129
-		return 0;
130
-	return 1;
131
-}
132
-
133 121
 void rms_sdp_info_free(rms_sdp_info_t *sdp_info)
134 122
 {
135 123
 	if(sdp_info->remote_ip.s) {
... ...
@@ -1,5 +1,5 @@
1 1
 /*
2
- * Copyright (C) 2017-2018 Julien Chavanton jchavanton@gmail.com
2
+ * Copyright (C) 2017-2019 Julien Chavanton jchavanton@gmail.com
3 3
  *
4 4
  * This file is part of Kamailio, a free SIP server.
5 5
  *
... ...
@@ -20,7 +20,7 @@
20 20
 
21 21
 #include "rtp_media_server.h"
22 22
 extern rms_session_info_t *rms_session_list;
23
-
23
+extern int in_rms_process;
24 24
 
25 25
 static void rms_action_free(rms_session_info_t *si)
26 26
 {
... ...
@@ -94,16 +94,24 @@ rms_session_info_t *rms_session_search_sync(struct sip_msg *msg)
94 94
 
95 95
 void rms_session_add(rms_session_info_t *si)
96 96
 {
97
-	lock(&session_list_mutex);
98
-	clist_append(rms_session_list, si, next, prev);
99
-	unlock(&session_list_mutex);
97
+	if (in_rms_process) {
98
+		clist_append(rms_session_list, si, next, prev);
99
+	} else {
100
+		lock(&session_list_mutex);
101
+		clist_append(rms_session_list, si, next, prev);
102
+		unlock(&session_list_mutex);
103
+	}
100 104
 }
101 105
 
102 106
 void rms_session_rm(rms_session_info_t *si)
103 107
 {
104
-	lock(&session_list_mutex);
105
-	clist_rm(si, next, prev);
106
-	unlock(&session_list_mutex);
108
+	if (in_rms_process) {
109
+		clist_append(rms_session_list, si, next, prev);
110
+	} else {
111
+		lock(&session_list_mutex);
112
+		clist_rm(si, next, prev);
113
+		unlock(&session_list_mutex);
114
+	}
107 115
 }
108 116
 
109 117
 int rms_session_free(rms_session_info_t *si)
... ...
@@ -112,10 +120,7 @@ int rms_session_free(rms_session_info_t *si)
112 120
 	rms_sdp_info_free(&si->sdp_info_offer);
113 121
 	rms_sdp_info_free(&si->sdp_info_answer);
114 122
 	if(si->media.pt) {
115
-		// payload_type_destroy(si->media.pt);
116
-		shm_free(
117
-				si->media
118
-						.pt); // TODO: should be destroyed in  compatible way from MS manager process
123
+		shm_free(si->media.pt); // TODO: should be destroyed in  compatible way from MS manager process
119 124
 		si->media.pt = NULL;
120 125
 	}
121 126
 	if(si->callid.s) {
... ...
@@ -152,6 +157,36 @@ int rms_check_msg(struct sip_msg *msg)
152 157
 	return 1;
153 158
 }
154 159
 
160
+rms_session_info_t *rms_session_new_bleg(struct sip_msg *msg)
161
+{
162
+	if(!rms_check_msg(msg))
163
+		return NULL;
164
+	rms_session_info_t *si = shm_malloc(sizeof(rms_session_info_t));
165
+	if(!si) {
166
+		LM_ERR("can not allocate session info !\n");
167
+		goto error;
168
+	}
169
+	memset(si, 0, sizeof(rms_session_info_t));
170
+
171
+	if(!rms_str_dup(&si->callid, &msg->callid->body, 1)) {
172
+		LM_ERR("can not get callid .\n");
173
+		goto error;
174
+	}
175
+	if(!rms_str_dup(&si->remote_uri, &msg->from->body, 1))
176
+		goto error;
177
+	str ip;
178
+	ip.s = ip_addr2a(&msg->rcv.dst_ip);
179
+	ip.len = strlen(ip.s);
180
+	if(!rms_str_dup(&si->local_ip, &ip, 1))
181
+		goto error;
182
+	clist_init(&si->action, next, prev);
183
+	return si;
184
+error:
185
+	LM_ERR("can not create session.\n");
186
+	rms_session_free(si);
187
+	return NULL;
188
+}
189
+
155 190
 rms_session_info_t *rms_session_new(struct sip_msg *msg)
156 191
 {
157 192
 	struct hdr_field *hdr = NULL;
... ...
@@ -65,6 +65,7 @@ void rms_session_add(rms_session_info_t *si);
65 65
 void rms_session_rm(rms_session_info_t *si);
66 66
 int rms_session_free(rms_session_info_t *si);
67 67
 rms_session_info_t *rms_session_new(struct sip_msg *msg);
68
+rms_session_info_t *rms_session_new_bleg(struct sip_msg *msg);
68 69
 int rms_sessions_dump_f(struct sip_msg *msg, char *param1, char *param2);
69 70
 rms_session_info_t *rms_get_session_list(void);
70 71
 
... ...
@@ -29,14 +29,13 @@ static void mod_destroy(void);
29 29
 static int child_init(int);
30 30
 str playback_fn = {0, 0};
31 31
 str log_fn = {0, 0};
32
-
33 32
 static char *rms_bridge_default_route = "rms:bridged";
34 33
 static char *rms_answer_default_route = "rms:start";
35 34
 
35
+int in_rms_process;
36
+rms_t *rms;
36 37
 
37
-static rms_t rms;
38
-
39
-static rms_session_info_t *rms_session_create_leg(rms_session_info_t *si);
38
+static rms_session_info_t *rms_session_create_leg(rms_session_info_t *si, struct sip_msg *msg);
40 39
 static int fixup_rms_action_play(void **param, int param_no);
41 40
 static int fixup_rms_bridge(void **param, int param_no);
42 41
 static int fixup_rms_answer(void **param, int param_no);
... ...
@@ -51,9 +50,8 @@ static int rms_session_check_f(struct sip_msg *);
51 50
 static int rms_hangup_f(struct sip_msg *);
52 51
 static int rms_bridge_f(struct sip_msg *, char *, char *);
53 52
 
54
-static int rms_update_call_sdp(struct sip_msg *msg,
55
-		const rms_session_info_t *si, call_leg_media_t *m,
56
-		rms_sdp_info_t *sdp_info);
53
+static int rms_update_media_sockets(struct sip_msg *msg,
54
+		rms_session_info_t *si, rms_sdp_info_t *sdp_info);
57 55
 
58 56
 static cmd_export_t cmds[] = {
59 57
 		{"rms_answer", (cmd_function)rms_answer_f, 1, fixup_rms_answer, 0, EVENT_ROUTE},
... ...
@@ -166,10 +164,12 @@ static int fixup_rms_action_play(void **param, int param_no)
166 164
 static int mod_init(void)
167 165
 {
168 166
 	LM_INFO("RTP media server module init\n");
169
-	rms.udp_start_port = 50000;
167
+
168
+	rms = shm_malloc(sizeof(rms_t));
169
+	rms->udp_start_port = 50000;
170 170
 	LM_INFO("RTP media server module init\n");
171
-	rms.udp_end_port = 60000;
172
-	rms.udp_last_port = 50000 + rand() % 10000;
171
+	rms->udp_end_port = 60000;
172
+	rms->udp_last_port = 50000 + rand() % 10000;
173 173
 	rms_media_init();
174 174
 
175 175
 	if(!init_rms_session_list()) {
... ...
@@ -233,8 +233,13 @@ static rms_session_info_t *rms_stop(rms_session_info_t *si)
233 233
 static rms_session_info_t *rms_session_action_check(rms_session_info_t *si)
234 234
 {
235 235
 	rms_action_t *a;
236
+		if (!si)
237
+			LM_ERR("session  NULL\n");
236 238
 	clist_foreach(&si->action, a, next)
237 239
 	{
240
+		if (!a)
241
+			LM_ERR("session action NULL\n");
242
+
238 243
 		if(a->type == RMS_HANGUP) {
239 244
 			LM_INFO("session action RMS_HANGUP [%s]\n", si->callid.s);
240 245
 			rms_hangup_call(si);
... ...
@@ -243,8 +248,7 @@ static rms_session_info_t *rms_session_action_check(rms_session_info_t *si)
243 248
 			a->type = RMS_STOP;
244 249
 			return si;
245 250
 		} else if(a->type == RMS_BRIDGING) {
246
-			LM_INFO("session action RMS_BRIDGING [%s][%p]\n", si->callid.s,
247
-					a->cell->uas.request);
251
+			LM_INFO("session action RMS_BRIDGING [%s]\n", si->callid.s);
248 252
 			rms_bridging_call(si, a);
249 253
 			a->type = RMS_NONE;
250 254
 			shm_free(a->param.s);
... ...
@@ -282,11 +286,16 @@ static rms_session_info_t *rms_session_action_check(rms_session_info_t *si)
282 286
 			}
283 287
 			return si;
284 288
 		} else if(a->type == RMS_START) {
289
+			LM_INFO("session action RMS_START\n");
285 290
 			create_call_leg_media(&si->media);
286 291
 			LM_INFO("session action RMS_START [%s]\n", si->callid.s);
292
+			rms_action_t *tmp = a->prev;
293
+			clist_rm(a, next, prev);
287 294
 			rms_start_media(&si->media, a->param.s);
288 295
 			run_action_route(si, a->route.s);
289
-			a->type = RMS_NONE;
296
+			shm_free(a);
297
+			a = tmp;
298
+			LM_INFO("session action RMS_START[done]\n");
290 299
 			return si;
291 300
 		}
292 301
 	}
... ...
@@ -300,12 +309,14 @@ static rms_session_info_t *rms_session_action_check(rms_session_info_t *si)
300 309
  */
301 310
 static void rms_session_manage_loop()
302 311
 {
312
+	in_rms_process = 1;
303 313
 	while(1) {
304 314
 		lock(&session_list_mutex);
305 315
 		rms_session_info_t *si;
306 316
 		clist_foreach(rms_session_list, si, next)
307 317
 		{
308 318
 			si = rms_session_action_check(si);
319
+			//LM_INFO("next ... si[%p]\n", si);
309 320
 		}
310 321
 		unlock(&session_list_mutex);
311 322
 		usleep(10000);
... ...
@@ -354,6 +365,9 @@ static int parse_from(struct sip_msg *msg, rms_session_info_t *si)
354 365
 static int rms_sip_reply(
355 366
 		struct cell *cell, rms_session_info_t *si, int code, char *_reason)
356 367
 {
368
+	if (si->state == RMS_ST_CONNECTED) {
369
+		return 1;
370
+	}
357 371
 	str reason = str_init(_reason);
358 372
 	if(si->remote_tag.len == 0) {
359 373
 		LM_ERR("can not find from tag\n");
... ...
@@ -381,6 +395,9 @@ static int rms_answer_call(
381 395
 	char buffer[128];
382 396
 	str reason = str_init("OK");
383 397
 	str contact_hdr;
398
+	if (si->state == RMS_ST_CONNECTED) {
399
+		return 1;
400
+	}
384 401
 
385 402
 	LM_INFO("[%s][%d]\n", sdp_info->new_body.s, sdp_info->udp_local_port);
386 403
 
... ...
@@ -518,7 +535,7 @@ static void bridge_cb(struct cell *ptrans, int ntype, struct tmcb_params *pcbp)
518 535
 		goto error;
519 536
 	}
520 537
 	si->media.pt = rms_sdp_check_payload(sdp_info);
521
-	rms_update_call_sdp(pcbp->rpl, si, &si->media, sdp_info);
538
+	rms_update_media_sockets(pcbp->rpl, si, &si->sdp_info_answer);
522 539
 	LM_INFO("[%p][%s:%d]\n", si, sdp_info->local_ip.s,
523 540
 			sdp_info->udp_local_port);
524 541
 	a->type = RMS_BRIDGED;
... ...
@@ -589,7 +606,7 @@ static int rms_bridging_call(rms_session_info_t *si, rms_action_t *a)
589 606
 	}
590 607
 	dialog->rem_target.s = param_uri->s;
591 608
 	dialog->rem_target.len = param_uri->len - 1;
592
-	rms_sdp_info_t *sdp_info = &si->sdp_info_offer;
609
+	rms_sdp_info_t *sdp_info = &si->bridged_si->sdp_info_offer;
593 610
 
594 611
 	set_uac_req(&uac_r, &method_invite, &headers, &sdp_info->new_body, dialog,
595 612
 			TMCB_LOCAL_COMPLETED | TMCB_LOCAL_RESPONSE_IN | TMCB_ON_FAILURE, bridge_cb, a);
... ...
@@ -647,11 +664,11 @@ static int rms_hangup_call(rms_session_info_t *si)
647 664
 /*
648 665
  * Create a new session info that will be used for bridging
649 666
  */
650
-static rms_session_info_t *rms_session_create_leg(rms_session_info_t *si)
667
+static rms_session_info_t *rms_session_create_leg(rms_session_info_t *si, struct sip_msg *msg)
651 668
 {
652 669
 	if(!si)
653 670
 		return NULL;
654
-	si->bridged_si = shm_malloc(sizeof(rms_session_info_t));
671
+	si->bridged_si = rms_session_new_bleg(msg);
655 672
 	if(!si->bridged_si) {
656 673
 		LM_ERR("can not allocate session info !\n");
657 674
 		goto error;
... ...
@@ -667,8 +684,8 @@ static rms_session_info_t *rms_session_create_leg(rms_session_info_t *si)
667 684
 	if(!rms_str_dup(&si->bridged_si->local_ip, &si->local_ip, 1))
668 685
 		goto error;
669 686
 
670
-	si->bridged_si->sdp_info_offer.remote_port = 0;
671
-	si->bridged_si->sdp_info_offer.udp_local_port = 0;
687
+	rms_update_media_sockets(msg, si->bridged_si, &si->bridged_si->sdp_info_offer);
688
+	rms_sdp_prepare_new_body(&si->bridged_si->sdp_info_offer, si->media.pt->type);
672 689
 	clist_init(&si->bridged_si->action, next, prev);
673 690
 	return si->bridged_si;
674 691
 error:
... ...
@@ -680,17 +697,19 @@ error:
680 697
 static int rms_get_udp_port(void)
681 698
 {
682 699
 	// RTP UDP port
683
-	rms.udp_last_port += 3;
684
-	if(rms.udp_last_port > rms.udp_end_port)
685
-		rms.udp_last_port = rms.udp_start_port;
686
-	LM_INFO("port[%d]\n", rms.udp_last_port);
687
-	return rms.udp_last_port;
700
+	rms->udp_last_port += 3;
701
+	if(rms->udp_last_port > rms->udp_end_port)
702
+		rms->udp_last_port = rms->udp_start_port;
703
+	LM_INFO("port[%d]\n", rms->udp_last_port);
704
+	return rms->udp_last_port;
688 705
 }
689 706
 
690
-static int rms_update_call_sdp(struct sip_msg *msg,
691
-		const rms_session_info_t *si, call_leg_media_t *m,
692
-		rms_sdp_info_t *sdp_info)
707
+
708
+
709
+// update media IP and port
710
+static int rms_update_media_sockets(struct sip_msg *msg, rms_session_info_t *si, rms_sdp_info_t *sdp_info)
693 711
 {
712
+	call_leg_media_t *m = &si->media;
694 713
 	if(!m->local_port)
695 714
 		m->local_port = rms_get_udp_port();
696 715
 	sdp_info->udp_local_port = m->local_port;
... ...
@@ -703,9 +722,9 @@ static int rms_update_call_sdp(struct sip_msg *msg,
703 722
 	m->remote_ip.len = sdp_info->remote_ip.len;
704 723
 	m->si = si;
705 724
 
706
-	LM_INFO("remote_socket[%s:%d] local_socket[%s:%d] pt[%s]\n",
725
+	LM_INFO("remote_socket[%s:%d] local_socket[%s:%d]\n",
707 726
 			sdp_info->remote_ip.s, sdp_info->remote_port, m->local_ip.s,
708
-			m->local_port, si->media.pt->mime_type);
727
+			m->local_port);
709 728
 	return 1;
710 729
 }
711 730
 
... ...
@@ -810,7 +829,7 @@ static int rms_bridge_f(struct sip_msg *msg, char *_target, char *_route)
810 829
 		if (si->state == RMS_ST_CONNECTED) {
811 830
 			LM_INFO("already connected, bridging\n");
812 831
 		} else {
813
-			LM_ERR("bridging an existing session that is not connected.\n");
832
+			LM_ERR("Can not bridge an existing call leg that is not connected.\n");
814 833
 			return -1;
815 834
 		}
816 835
 	} else {
... ...
@@ -821,8 +840,10 @@ static int rms_bridge_f(struct sip_msg *msg, char *_target, char *_route)
821 840
 		si = rms_session_new(msg);
822 841
 		if(!si)
823 842
 			return -1;
843
+		si->local_port = msg->rcv.dst_port;
824 844
 	}
825 845
 
846
+	// parameter 1 : target URI
826 847
 	if(get_str_fparam(&target, msg, (gparam_p)_target) != 0) {
827 848
 		if (si->state != RMS_ST_CONNECTED) {
828 849
 			LM_ERR("rms_bridge: missing target\n");
... ...
@@ -830,6 +851,7 @@ static int rms_bridge_f(struct sip_msg *msg, char *_target, char *_route)
830 851
 		}
831 852
 		return -1;
832 853
 	}
854
+	// parameter 2 : route call-back
833 855
 	if(get_str_fparam(&route, msg, (gparam_p)_route) != 0) {
834 856
 		route.len = strlen(rms_bridge_default_route);
835 857
 		route.s = rms_bridge_default_route;
... ...
@@ -837,31 +859,28 @@ static int rms_bridge_f(struct sip_msg *msg, char *_target, char *_route)
837 859
 
838 860
 	LM_NOTICE("rms_bridge[%s][%d]\n", target.s, target.len);
839 861
 
840
-	str to_tag;
841
-	{
862
+	if (si->state == RMS_ST_DEFAULT) {
863
+		str to_tag;
842 864
 		parse_from(msg, si);
843 865
 		tmb.t_get_reply_totag(msg, &to_tag);
844 866
 		rms_str_dup(&si->local_tag, &to_tag, 1);
845 867
 		LM_INFO("local_uri[%s]local_tag[%s]\n", si->local_uri.s,
846 868
 				si->local_tag.s);
847
-		//
848
-		rms_sdp_info_t *sdp_info = &si->sdp_info_offer;
849
-		rms_update_call_sdp(msg, si, &si->media, sdp_info);
850
-		// create b_leg
851
-		si->bridged_si = rms_session_create_leg(si);
852
-		if(!si->bridged_si) {
853
-			LM_ERR("can not create session b_leg !\n");
854
-			goto error;
855
-		}
856
-		si->bridged_si->media.local_port = rms_get_udp_port();
857
-		sdp_info->udp_local_port = si->bridged_si->media.local_port;
858
-		rms_sdp_prepare_new_body(sdp_info, si->media.pt->type);
859
-		LM_NOTICE("payload[%d]\n", si->media.pt->type);
869
+		rms_update_media_sockets(msg, si, &si->sdp_info_offer);
860 870
 	}
871
+	// Prepare the body of the SDP offer for the current Payload type
872
+	// Both call legs will have the same offer.
873
+	LM_NOTICE("payload[%d]\n", si->media.pt->type);
874
+	rms_sdp_prepare_new_body(&si->sdp_info_offer, si->media.pt->type);
861 875
 
876
+	// create b_leg
877
+	si->bridged_si = rms_session_create_leg(si, msg);
862 878
 
879
+	if(!si->bridged_si) {
880
+		LM_ERR("can not create session b_leg !\n");
881
+		goto error;
882
+	}
863 883
 
864
-	si->local_port = msg->rcv.dst_port;
865 884
 	rms_action_t *a = rms_action_new(RMS_BRIDGING);
866 885
 	if(!a)
867 886
 		return -1;
... ...
@@ -881,19 +900,18 @@ static int rms_bridge_f(struct sip_msg *msg, char *_target, char *_route)
881 900
 			LM_ERR("t_suspend() failed\n");
882 901
 			goto error;
883 902
 		}
903
+		LM_INFO("transaction request[%p]\n", a->cell->uas.request);
904
+	} else {
905
+		a->cell = NULL;
884 906
 	}
885
-
886
-	LM_INFO("transaction request[%p]\n", a->cell->uas.request);
907
+	LM_INFO("adding action\n");
887 908
 	rms_action_add(si, a);
888 909
 
889
-	if(!rms_sdp_info_clone(
890
-			   &si->bridged_si->sdp_info_offer, &si->sdp_info_offer)) {
891
-		LM_ERR("rms_sdp_info_clone\n");
892
-		goto error;
893
-	}
910
+	LM_INFO("adding b_leg session\n");
894 911
 	rms_session_add(si->bridged_si);
895 912
 	LM_INFO("si_1[%p]si_2[%p]\n", si, si->bridged_si);
896
-	rms_session_add(si);
913
+	if (si->state != RMS_ST_CONNECTED)
914
+		rms_session_add(si);
897 915
 	return 0;
898 916
 error:
899 917
 	rms_session_rm(si);
... ...
@@ -1081,19 +1099,16 @@ static int rms_answer_f(struct sip_msg *msg, char * _route)
1081 1099
 	if(!si)
1082 1100
 		return -1;
1083 1101
 	rms_session_add(si);
1084
-	rms_sdp_info_t *sdp_info = &si->sdp_info_offer;
1085
-	rms_update_call_sdp(msg, si, &si->media, sdp_info);
1086
-
1087
-	//
1102
+	rms_update_media_sockets(msg, si, &si->sdp_info_offer);
1088 1103
 	parse_from(msg, si);
1089 1104
 	tmb.t_get_reply_totag(msg, &to_tag);
1090 1105
 	rms_str_dup(&si->local_tag, &to_tag, 1);
1091 1106
 	LM_INFO("local_uri[%s]local_tag[%s]\n", si->local_uri.s, si->local_tag.s);
1092
-	if(!rms_sdp_prepare_new_body(sdp_info, si->media.pt->type)) {
1107
+	if(!rms_sdp_prepare_new_body(&si->sdp_info_offer, si->media.pt->type)) {
1093 1108
 		LM_ERR("error preparing SDP body\n");
1094 1109
 		goto error;
1095 1110
 	}
1096
-	//
1111
+
1097 1112
 	si->local_port = msg->rcv.dst_port;
1098 1113
 	if(rms_answer_call(NULL, si, &si->sdp_info_offer) < 1) {
1099 1114
 		goto error;
... ...
@@ -41,6 +41,8 @@
41 41
 
42 42
 ser_lock_t session_list_mutex;
43 43
 
44
+
45
+
44 46
 typedef struct rms
45 47
 {
46 48
 	int udp_start_port;