Browse code

Merge pull request #2662 from fgaisnon/master

support subscribe dialog (topos + topos_redis)

Daniel-Constantin Mierla authored on 09/03/2021 09:32:10 • GitHub committed on 09/03/2021 09:32:10
Showing 8 changed files
... ...
@@ -484,7 +484,7 @@ static int ht_rm_items(sip_msg_t* msg, str* hname, str* op, str *val,
484 484
 		case 2:
485 485
 			if(strncmp(op->s, "re", 2)==0) {
486 486
 				isval.s = *val;
487
-				if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, &ht->name, NULL,
487
+				if ((ht->dmqreplicate > 0) && ht_dmq_replicate_action(HT_DMQ_RM_CELL_RE, &ht->name, NULL,
488 488
 							AVP_VAL_STR, &isval, mkey)!=0) {
489 489
 					LM_ERR("dmq relication failed (op %d)\n", mkey);
490 490
 				}
... ...
@@ -494,7 +494,7 @@ static int ht_rm_items(sip_msg_t* msg, str* hname, str* op, str *val,
494 494
 				return 1;
495 495
 			} else if(strncmp(op->s, "sw", 2)==0) {
496 496
 				isval.s = *val;
497
-				if (ht_dmq_replicate_action(HT_DMQ_RM_CELL_SW, &ht->name, NULL,
497
+				if ((ht->dmqreplicate > 0) &&ht_dmq_replicate_action(HT_DMQ_RM_CELL_SW, &ht->name, NULL,
498 498
 							AVP_VAL_STR, &isval, mkey)!=0) {
499 499
 					LM_ERR("dmq relication failed (op %d)\n", mkey);
500 500
 				}
... ...
@@ -23,11 +23,20 @@
23 23
 		<surname>Mierla</surname>
24 24
 		<email>miconda@gmail.com</email>
25 25
 	    </editor>
26
+            <editor>
27
+                <firstname>Frederic</firstname>
28
+                <surname>Gaisnon</surname>
29
+                <email>frederic.gaisnon@gmail.com</email>
30
+            </editor>
26 31
 	</authorgroup>
27 32
 	<copyright>
28 33
 	    <year>2016</year>
29 34
 	    <holder>&fhg;</holder>
30 35
 	</copyright>
36
+        <copyright>
37
+            <year>2021</year>
38
+            <holder>MomentTech</holder>
39
+        </copyright>
31 40
     </bookinfo>
32 41
     <toc></toc>
33 42
     
... ...
@@ -30,8 +30,8 @@
30 30
 		a dialog -- record_route() must be used for them as well, the
31 31
 		headers are not going to be in the messages sent to the network, they
32 32
 		are needed to know local addresses used to communicate with each side.
33
-		At this moment it is not designed to work for presence (SUBSCRIBE-based)
34
-		dialogs. The REGISTER and PUBLISH requests are skipped from processing
33
+                This module is designed to work for presence (SUBSCRIBE-based) dialogs too.
34
+		The REGISTER and PUBLISH requests are skipped from processing
35 35
 		by this module, expected to be terminated on a local SIP server.
36 36
 	</para>
37 37
 	</section>
... ...
@@ -199,6 +199,9 @@ modparam("topos", "branch_expire", 300)
199 199
 			after the initial call setup on re-INVITEs or other in-dialog
200 200
 			messages. So set a large enough value (according your longest call
201 201
 			duration) to prevent problems in re-writing messages.
202
+			This key is only relevant for INVITE dialog. 
203
+                        SUBSCRIBE dialog records lifetime are based on value set in expires
204
+                        header. Moreover each re-SUBSCRIBEs update the dialog timestamp.
202 205
 		</para>
203 206
 		<para>
204 207
 		<emphasis>
... ...
@@ -905,6 +905,11 @@ int tps_request_received(sip_msg_t *msg, int dialog)
905 905
 				goto error;
906 906
 			}
907 907
 		}
908
+		if((get_cseq(msg)->method_id)&(METHOD_SUBSCRIBE)) {
909
+			if(tps_storage_update_dialog(msg, &mtsd, &stsd, TPS_DBU_CONTACT|TPS_DBU_TIME)<0) {
910
+				goto error;
911
+			}
912
+		}
908 913
 	}
909 914
 	return 0;
910 915
 
... ...
@@ -41,6 +41,7 @@
41 41
 #include "../../core/parser/contact/parse_contact.h"
42 42
 #include "../../core/parser/parse_from.h"
43 43
 #include "../../core/parser/parse_to.h"
44
+#include "../../core/parser/parse_expires.h"
44 45
 
45 46
 #include "../../lib/srdb1/db.h"
46 47
 #include "../../core/utils/sruid.h"
... ...
@@ -336,8 +337,11 @@ int tps_storage_fill_contact(sip_msg_t *msg, tps_data_t *td, str *uuid, int dir,
336 337
 				td->cp += pv_val.rs.len;
337 338
 			}
338 339
 		}
339
-		*td->cp = '@';
340
-		td->cp++;
340
+
341
+		if (!((ctmode == 1) && (dir==TPS_DIR_DOWNSTREAM) && (curi.user.len <= 0))) {
342
+			*td->cp = '@';
343
+			td->cp++;
344
+		}
341 345
 
342 346
 		if (_tps_contact_host.len) { // using configured hostname in the contact header
343 347
 			memcpy(td->cp, _tps_contact_host.s, _tps_contact_host.len);
... ...
@@ -465,8 +469,8 @@ int tps_storage_link_msg(sip_msg_t *msg, tps_data_t *td, int dir)
465 469
 
466 470
 	/* extract the contact address */
467 471
 	if(parse_headers(msg, HDR_CONTACT_F, 0)<0 || msg->contact==NULL) {
468
-		if(td->s_method_id != METHOD_INVITE) {
469
-			/* no mandatory contact unless is INVITE - done */
472
+		if((td->s_method_id != METHOD_INVITE) && (td->s_method_id != METHOD_SUBSCRIBE)){
473
+			/* no mandatory contact unless is INVITE or SUBSCRIBE - done */
470 474
 			return 0;
471 475
 		}
472 476
 		if(msg->first_line.type==SIP_REPLY) {
... ...
@@ -504,6 +508,13 @@ int tps_storage_link_msg(sip_msg_t *msg, tps_data_t *td, int dir)
504 508
 		}
505 509
 	}
506 510
 
511
+	if  (td->s_method_id == METHOD_SUBSCRIBE) {
512
+		if(msg->expires && (msg->expires->body.len > 0) && (msg->expires->parsed || (parse_expires(msg->expires) >= 0))) {
513
+			td->expires = ((exp_body_t *)msg->expires->parsed)->val;
514
+		}
515
+	}
516
+
517
+
507 518
 	LM_DBG("downstream: %s - acontact: [%.*s] - bcontact: [%.*s]\n",
508 519
 			(dir==TPS_DIR_DOWNSTREAM)?"yes":"no",
509 520
 			td->a_contact.len, (td->a_contact.len>0)?td->a_contact.s:"",
... ...
@@ -1036,6 +1047,8 @@ int tps_db_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1036 1047
 	db_key_t db_cols[TPS_NR_KEYS];
1037 1048
 	db1_res_t* db_res = NULL;
1038 1049
 	str sinv = str_init("INVITE");
1050
+	str ssub = str_init("SUBSCRIBE");
1051
+	int bInviteDlg = 1;
1039 1052
 	int nr_keys;
1040 1053
 	int nr_cols;
1041 1054
 	int n;
... ...
@@ -1047,6 +1060,10 @@ int tps_db_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1047 1060
 	nr_keys = 0;
1048 1061
 	nr_cols = 0;
1049 1062
 
1063
+	if((get_cseq(msg)->method_id == METHOD_SUBSCRIBE) || ((get_cseq(msg)->method_id == METHOD_NOTIFY) && (msg->event->len > 0))) {
1064
+		bInviteDlg = 0;
1065
+	}
1066
+
1050 1067
 	if(mode==0) {
1051 1068
 		/* load same transaction using Via branch */
1052 1069
 		db_keys[nr_keys]=&tt_col_x_vbranch;
... ...
@@ -1075,7 +1092,7 @@ int tps_db_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1075 1092
 		db_ops[nr_keys]=OP_EQ;
1076 1093
 		db_vals[nr_keys].type = DB1_STR;
1077 1094
 		db_vals[nr_keys].nul = 0;
1078
-		db_vals[nr_keys].val.str_val = sinv;
1095
+		db_vals[nr_keys].val.str_val = bInviteDlg ? sinv : ssub;
1079 1096
 		nr_keys++;
1080 1097
 	}
1081 1098
 
... ...
@@ -1407,7 +1424,7 @@ int tps_storage_update_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1407 1424
 	if(msg==NULL || md==NULL || sd==NULL)
1408 1425
 		return -1;
1409 1426
 
1410
-	if(md->s_method_id != METHOD_INVITE) {
1427
+	if((md->s_method_id != METHOD_INVITE) && (md->s_method_id != METHOD_SUBSCRIBE)) {
1411 1428
 		return 0;
1412 1429
 	}
1413 1430
 
... ...
@@ -1514,6 +1531,14 @@ int tps_db_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1514 1531
 			}
1515 1532
 		}
1516 1533
 	}
1534
+	if ((mode & TPS_DBU_TIME) && ((sd->b_tag.len > 0)
1535
+			&& ((md->direction == TPS_DIR_UPSTREAM) && (msg->first_line.type==SIP_REQUEST))
1536
+			&& (msg->first_line.u.request.method_value == METHOD_SUBSCRIBE))) {
1537
+		db_ucols[nr_ucols] = &td_col_rectime;
1538
+		db_uvals[nr_ucols].type = DB1_DATETIME;
1539
+		db_uvals[nr_ucols].val.time_val = time(NULL);
1540
+		nr_ucols++;
1541
+	}
1517 1542
 
1518 1543
 	if(nr_ucols==0) {
1519 1544
 		return 0;
... ...
@@ -1543,7 +1568,7 @@ int tps_storage_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1543 1568
 	if(msg==NULL || md==NULL || sd==NULL)
1544 1569
 		return -1;
1545 1570
 
1546
-	if(md->s_method_id != METHOD_INVITE) {
1571
+	if((md->s_method_id != METHOD_INVITE) && (md->s_method_id != METHOD_SUBSCRIBE)) {
1547 1572
 		return 0;
1548 1573
 	}
1549 1574
 	if(msg->first_line.type==SIP_REPLY) {
... ...
@@ -1575,7 +1600,7 @@ int tps_db_end_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
1575 1600
 	if(msg==NULL || md==NULL || sd==NULL || _tps_db_handle==NULL)
1576 1601
 		return -1;
1577 1602
 
1578
-	if(md->s_method_id != METHOD_BYE) {
1603
+	if((md->s_method_id != METHOD_BYE) && !((md->s_method_id == METHOD_SUBSCRIBE) && (md->expires == 0))) {
1579 1604
 		return 0;
1580 1605
 	}
1581 1606
 
... ...
@@ -41,6 +41,7 @@
41 41
 #define TPS_DBU_RPLATTRS	(1<<1)
42 42
 #define TPS_DBU_ARR		(1<<2)
43 43
 #define TPS_DBU_BRR		(1<<3)
44
+#define TPS_DBU_TIME		(1<<4)
44 45
 #define TPS_DBU_ALL		(0xffffffff)
45 46
 
46 47
 #define TPS_DATA_SIZE	8192
... ...
@@ -79,6 +80,7 @@ typedef struct tps_data {
79 80
 	int32_t iflags;
80 81
 	int32_t direction;
81 82
 	uint32_t s_method_id;
83
+	int32_t expires;
82 84
 } tps_data_t;
83 85
 
84 86
 int tps_storage_dialog_find(sip_msg_t *msg, tps_data_t *td);
... ...
@@ -23,6 +23,11 @@
23 23
 		<surname>Mierla</surname>
24 24
 		<email>miconda@gmail.com</email>
25 25
 	    </editor>
26
+	    <editor>
27
+		<firstname>Frederic</firstname>
28
+		<surname>Gaisnon</surname>
29
+		<email>frederic.gaisnon@gmail.com</email>
30
+	    </editor>
26 31
 	</authorgroup>
27 32
 	<copyright>
28 33
 	    <year>2017</year>
... ...
@@ -32,6 +37,10 @@
32 37
 	    <year>2017</year>
33 38
 	    <holder>flowroute.com</holder>
34 39
 	</copyright>
40
+	<copyright>
41
+	    <year>2021</year>
42
+	    <holder>MomentTech</holder>
43
+	</copyright>
35 44
     </bookinfo>
36 45
     <toc></toc>
37 46
 
... ...
@@ -265,7 +265,12 @@ int tps_redis_insert_dialog(tps_data_t *td)
265 265
 	argvlen[argc] = rkey.len;
266 266
 	argc++;
267 267
 
268
-	lval = (unsigned long)_tps_api.get_dialog_expire();
268
+	if(td->s_method.len==9 && strncmp(td->s_method.s, "SUBSCRIBE", 9)==0) {
269
+		lval = (unsigned long)td->expires;
270
+	} else {
271
+		lval = (unsigned long)_tps_api.get_dialog_expire();
272
+  }
273
+
269 274
 	if(lval==0) {
270 275
 		return 0;
271 276
 	}
... ...
@@ -297,7 +302,7 @@ int tps_redis_clean_dialogs(void)
297 302
 /**
298 303
  *
299 304
  */
300
-int tps_redis_insert_invite_branch(tps_data_t *td)
305
+int tps_redis_insert_initial_method_branch(tps_data_t *td)
301 306
 {
302 307
 	char* argv[TPS_REDIS_NR_KEYS];
303 308
 	size_t argvlen[TPS_REDIS_NR_KEYS];
... ...
@@ -328,8 +333,9 @@ int tps_redis_insert_invite_branch(tps_data_t *td)
328 333
 
329 334
 	rp = _tps_redis_cbuf;
330 335
 	rkey.len = snprintf(rp, TPS_REDIS_DATA_SIZE-128,
331
-					"%.*sINVITE:%.*s:%.*s",
336
+					"%.*s%.*s:%.*s:%.*s",
332 337
 					_tps_redis_bprefix.len, _tps_redis_bprefix.s,
338
+					td->s_method.len, td->s_method.s,
333 339
 					td->a_callid.len, td->a_callid.s,
334 340
 					td->b_tag.len, td->b_tag.s);
335 341
 	if(rkey.len<0 || rkey.len>=TPS_REDIS_DATA_SIZE-128) {
... ...
@@ -360,8 +366,8 @@ int tps_redis_insert_invite_branch(tps_data_t *td)
360 366
 		}
361 367
 		return -1;
362 368
 	}
363
-	LM_DBG("inserting invite branch record for [%.*s] with argc %d\n",
364
-			rkey.len, rkey.s, argc);
369
+	LM_DBG("inserting %.*s branch record for [%.*s] with argc %d\n",
370
+			td->s_method.len, td->s_method.s,rkey.len, rkey.s, argc);
365 371
 
366 372
 	freeReplyObject(rrpl);
367 373
 
... ...
@@ -552,7 +558,7 @@ int tps_redis_clean_branches(void)
552 558
 /**
553 559
  *
554 560
  */
555
-int tps_redis_load_invite_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
561
+int tps_redis_load_initial_method_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
556 562
 {
557 563
 	char* argv[TPS_REDIS_NR_KEYS];
558 564
 	size_t argvlen[TPS_REDIS_NR_KEYS];
... ...
@@ -588,8 +594,9 @@ int tps_redis_load_invite_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
588 594
 	rp = _tps_redis_cbuf;
589 595
 
590 596
 	rkey.len = snprintf(rp, TPS_REDIS_DATA_SIZE,
591
-					"%.*sINVITE:%.*s:%.*s",
597
+					"%.*s%.*s:%.*s:%.*s",
592 598
 					_tps_redis_bprefix.len, _tps_redis_bprefix.s,
599
+					md->s_method.len, md->s_method.s,
593 600
 					md->a_callid.len, md->a_callid.s,
594 601
 					md->b_tag.len, md->b_tag.s);
595 602
 	if(rkey.len<0 || rkey.len>=TPS_REDIS_DATA_SIZE) {
... ...
@@ -733,9 +740,9 @@ int tps_redis_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
733 740
 		/* load same transaction using Via branch */
734 741
 		xvbranch1 = &md->x_vbranch1;
735 742
 	} else {
736
-		/* load corresponding INVITE transaction using call-id + to-tag */
737
-		if(tps_redis_load_invite_branch(msg, md, &id)<0) {
738
-			LM_ERR("failed to load the INVITE branch value\n");
743
+		/* load corresponding INVITE or SUBSCRIBE transaction using call-id + to-tag */
744
+		if(tps_redis_load_initial_method_branch(msg, md, &id)<0) {
745
+			LM_ERR("failed to load the %.*s branch value\n", md->s_method.len, md->s_method.s);
739 746
 			return -1;
740 747
 		}
741 748
 		xvbranch1 = &id.x_vbranch1;
... ...
@@ -1122,11 +1129,18 @@ int tps_redis_update_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1122 1129
 	}
1123 1130
 
1124 1131
 	if(md->s_method.len==6 && strncmp(md->s_method.s, "INVITE", 6)==0) {
1125
-		if(tps_redis_insert_invite_branch(md)<0) {
1132
+		if(tps_redis_insert_initial_method_branch(md)<0) {
1126 1133
 			LM_ERR("failed to insert INVITE extra branch data\n");
1127 1134
 			return -1;
1128 1135
 		}
1129 1136
 	}
1137
+	if(md->s_method.len==9 && strncmp(md->s_method.s, "SUBSCRIBE", 9)==0) {
1138
+		if(tps_redis_insert_initial_method_branch(md)<0) {
1139
+			LM_ERR("failed to insert SUBSCRIBE extra branch data\n");
1140
+			return -1;
1141
+		}
1142
+	}
1143
+
1130 1144
 	rsrv = _tps_redis_api.get_server(&_topos_redis_serverid);
1131 1145
 	if(rsrv==NULL) {
1132 1146
 		LM_ERR("cannot find redis server [%.*s]\n",
... ...
@@ -1209,6 +1223,7 @@ int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1209 1223
 	redisc_server_t *rsrv = NULL;
1210 1224
 	redisReply *rrpl = NULL;
1211 1225
 	int32_t liflags;
1226
+	unsigned long lval = 0;
1212 1227
 
1213 1228
 	if(sd->a_uuid.len<=0 && sd->b_uuid.len<=0) {
1214 1229
 		LM_INFO("no uuid for this message\n");
... ...
@@ -1297,6 +1312,11 @@ int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1297 1312
 		}
1298 1313
 	}
1299 1314
 
1315
+	if (mode & TPS_DBU_TIME) {
1316
+		lval = (unsigned long)time(NULL);
1317
+		TPS_REDIS_SET_ARGN(lval, rp, &rval, argc, &td_key_rectime, argv, argvlen);
1318
+	}
1319
+
1300 1320
 	if(argc<=2) {
1301 1321
 		return 0;
1302 1322
 	}
... ...
@@ -1313,6 +1333,37 @@ int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd,
1313 1333
 			rkey.len, rkey.s, argc);
1314 1334
 	freeReplyObject(rrpl);
1315 1335
 
1336
+	if (mode & TPS_DBU_TIME) {
1337
+		/* reset expire for the key */
1338
+		argc = 0;
1339
+
1340
+		argv[argc]    = "EXPIRE";
1341
+		argvlen[argc] = 6;
1342
+		argc++;
1343
+
1344
+		argv[argc]    = rkey.s;
1345
+		argvlen[argc] = rkey.len;
1346
+		argc++;
1347
+
1348
+		lval = (unsigned long)md->expires;
1349
+		if(lval==0) {
1350
+			return 0;
1351
+		}
1352
+		TPS_REDIS_SET_ARGNV(lval, rp, &rval, argc, argv, argvlen);
1353
+
1354
+		rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen);
1355
+		if(rrpl==NULL) {
1356
+			LM_ERR("failed to execute expire redis command\n");
1357
+			if(rsrv->ctxRedis->err) {
1358
+				LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr);
1359
+			}
1360
+			return -1;
1361
+		}
1362
+		LM_DBG("expire %lu set on dialog record for [%.*s] with argc %d\n", lval,
1363
+			rkey.len, rkey.s, argc);
1364
+		freeReplyObject(rrpl);
1365
+	}
1366
+
1316 1367
 	return 0;
1317 1368
 }
1318 1369
 
... ...
@@ -1333,7 +1384,7 @@ int tps_redis_end_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd)
1333 1384
 	int32_t liflags;
1334 1385
 	unsigned long lval = 0;
1335 1386
 
1336
-	if(md->s_method_id != METHOD_BYE) {
1387
+	if((md->s_method_id != METHOD_BYE) && !((md->s_method_id == METHOD_SUBSCRIBE) && (md->expires == 0))) {
1337 1388
 		return 0;
1338 1389
 	}
1339 1390