Browse code

modules_k/rls: RLS full-state NOTIFY requests now sent by notifier process(es)

- Also modified the notifier process stuff to only work when in
DB only mode. This is because the full-state handling stuff
in the notifier processes relies on DB only mode.
- Leaving the full-state stuff outside of the notifier process
didn't work because there was a row update race between the
notifier process and non-notifier process when full-state and non-
full-state NOTIFY requests were generated at the same time.
- This ensures that (with default options) you get at most one NOTIFY
(or set of NOTIFYs when splitting large NOTIFYs is enabled) per
5s per watcher from RLS.
- It also helps spread out the NOTIFY generation load more evenly
across time.

Peter Dunkley authored on 18/04/2012 16:29:32
Showing 7 changed files
... ...
@@ -352,6 +352,9 @@ modparam("rls", "waitn_time", 10)
352 352
    notifier_processes) of the pending updates will be sent each time a
353 353
    notifier process runs.
354 354
 
355
+   Separate notifier processes are only run when db_mode is 2 (DB only
356
+   mode).
357
+
355 358
    Default value is “10”.
356 359
 
357 360
    Example 1.11. Set notifier_poll_rate parameter
... ...
@@ -363,6 +366,9 @@ modparam("rls", "notifier_poll_rate", 20)
363 363
 
364 364
    The number of notifier processes that should be started.
365 365
 
366
+   Separate notifier processes are only run when db_mode is 2 (DB only
367
+   mode).
368
+
366 369
    Default value is “1”.
367 370
 
368 371
    Example 1.12. Set notifier_processes parameter
... ...
@@ -325,6 +325,10 @@ modparam("rls", "waitn_time", 10)
325 325
 		pending updates will be sent each time a notifier process runs.
326 326
 		</para>
327 327
 		<para>
328
+		Separate notifier processes are only run when db_mode is 2
329
+		(DB only mode).
330
+		</para>
331
+		<para>
328 332
 		<emphasis>Default value is <quote>10</quote>.
329 333
 		</emphasis>
330 334
 		</para>
... ...
@@ -344,6 +348,10 @@ modparam("rls", "notifier_poll_rate", 20)
344 344
 		The number of notifier processes that should be started.
345 345
 		</para>
346 346
 		<para>
347
+		Separate notifier processes are only run when db_mode is 2
348
+		(DB only mode).
349
+		</para>
350
+		<para>
347 351
 		<emphasis>Default value is <quote>1</quote>.
348 352
 		</emphasis>
349 353
 		</para>
... ...
@@ -713,10 +713,13 @@ int rls_handle_notify(struct sip_msg* msg, char* c1, char* c2)
713 713
 	query_cols[n_query_cols]= &str_updated_col;
714 714
 	query_vals[n_query_cols].type = DB1_INT;
715 715
 	query_vals[n_query_cols].nul = 0;
716
-	query_vals[n_query_cols].val.int_val=
717
-		core_hash(res_id, NULL,
716
+	if (dbmode == RLS_DB_ONLY)
717
+		query_vals[n_query_cols].val.int_val=
718
+			core_hash(res_id, NULL,
718 719
 				(waitn_time * rls_notifier_poll_rate
719 720
 					* rls_notifier_processes) - 1);
721
+	else
722
+		query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
720 723
 	n_query_cols++;
721 724
 		
722 725
 	query_cols[n_query_cols]= &str_auth_state_col;
... ...
@@ -830,8 +833,151 @@ error:
830 830
 	free_to_params(&TO);
831 831
 	return -1;
832 832
 }
833
-/* callid, from_tag, to_tag parameters must be allocated */
834
-void timer_send_notify(unsigned int ticks,void *param)
833
+
834
+#define EXTRACT_STRING(strng, chars)\
835
+			do {\
836
+			strng.s = (char *) chars;\
837
+			strng.len = strlen(strng.s);\
838
+			} while(0);
839
+
840
+static void timer_send_full_state_notifies(int round)
841
+{
842
+	db_key_t query_cols[1], result_cols[20], update_cols[1];
843
+	db_val_t query_vals[1], update_vals[1], *values;
844
+	db_row_t *rows;
845
+	db1_res_t *result = NULL;
846
+	int n_result_cols = 0, i;
847
+	subs_t sub;
848
+	str ev_sname;
849
+	event_t parsed_event;
850
+	xmlDocPtr doc = NULL;
851
+	xmlNodePtr service_node = NULL;
852
+
853
+	query_cols[0] = &str_updated_col;
854
+	query_vals[0].type = DB1_INT;
855
+	query_vals[0].nul = 0;
856
+	query_vals[0].val.int_val = round;
857
+
858
+	result_cols[n_result_cols++] = &str_presentity_uri_col;
859
+	result_cols[n_result_cols++] = &str_to_user_col;
860
+	result_cols[n_result_cols++] = &str_to_domain_col;
861
+	result_cols[n_result_cols++] = &str_watcher_username_col;
862
+	result_cols[n_result_cols++] = &str_watcher_domain_col;
863
+	result_cols[n_result_cols++] = &str_callid_col;
864
+	result_cols[n_result_cols++] = &str_to_tag_col;
865
+	result_cols[n_result_cols++] = &str_from_tag_col;
866
+	result_cols[n_result_cols++] = &str_socket_info_col;
867
+	result_cols[n_result_cols++] = &str_local_contact_col;
868
+	result_cols[n_result_cols++] = &str_contact_col;
869
+	result_cols[n_result_cols++] = &str_record_route_col;
870
+	result_cols[n_result_cols++] = &str_event_id_col;
871
+	result_cols[n_result_cols++] = &str_reason_col;
872
+	result_cols[n_result_cols++] = &str_event_col;
873
+	result_cols[n_result_cols++] = &str_local_cseq_col;
874
+	result_cols[n_result_cols++] = &str_remote_cseq_col;
875
+	result_cols[n_result_cols++] = &str_status_col;
876
+	result_cols[n_result_cols++] = &str_version_col;
877
+	result_cols[n_result_cols++] = &str_expires_col;
878
+
879
+	update_cols[0] = &str_updated_col;
880
+	update_vals[0].type = DB1_INT;
881
+	update_vals[0].nul = 0;
882
+	update_vals[0].val.int_val = NO_UPDATE_TYPE;
883
+
884
+	if (rls_dbf.use_table(rls_db, &rlsubs_table) < 0)
885
+	{
886
+		LM_ERR("use table failed\n");
887
+		goto done;
888
+	}
889
+
890
+	/* Step 1: Find rls_watchers that require full-state notification */
891
+	if (rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols,
892
+				1, n_result_cols, 0, &result) < 0)
893
+	{
894
+		LM_ERR("in sql query\n");
895
+		goto done;
896
+	}
897
+	if(result== NULL || result->n<= 0)
898
+		goto done;
899
+
900
+	/* Step 2: Reset the update flag so we do not full-state notify
901
+ 	   these watchers again */
902
+	if(rls_dbf.update(rls_db, query_cols, 0, query_vals, update_cols,
903
+					update_vals, 1, 1)< 0)
904
+	{
905
+		LM_ERR("in sql update\n");
906
+		goto done;
907
+	}
908
+
909
+	/* Step 3: Full-state notify each watcher we found */
910
+	rows = RES_ROWS(result);
911
+	for (i = 0; i < RES_ROW_N(result); i++)
912
+	{
913
+		memset(&sub, 0, sizeof(subs_t));
914
+		values = ROW_VALUES(&rows[i]);
915
+		EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[0]));
916
+		EXTRACT_STRING(sub.to_user, VAL_STRING(&values[1]));
917
+		EXTRACT_STRING(sub.to_domain, VAL_STRING(&values[2]));
918
+		EXTRACT_STRING(sub.from_user, VAL_STRING(&values[3]));
919
+		EXTRACT_STRING(sub.from_domain, VAL_STRING(&values[4]));
920
+		EXTRACT_STRING(sub.callid, VAL_STRING(&values[5]));
921
+		EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[6]));
922
+		EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[7]));
923
+		EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&values[8]));
924
+		EXTRACT_STRING(sub.local_contact, VAL_STRING(&values[9]));
925
+		EXTRACT_STRING(sub.contact, VAL_STRING(&values[10]));
926
+		EXTRACT_STRING(sub.record_route, VAL_STRING(&values[11]));
927
+		EXTRACT_STRING(sub.event_id, VAL_STRING(&values[12]));
928
+		EXTRACT_STRING(sub.reason, VAL_STRING(&values[13]));
929
+		EXTRACT_STRING(ev_sname, VAL_STRING(&values[14]));
930
+		sub.event = pres_contains_event(&ev_sname, &parsed_event);
931
+		if (sub.event == NULL)
932
+		{
933
+			LM_ERR("event not found and set to NULL\n");
934
+			goto done;
935
+		}
936
+
937
+		sub.local_cseq = VAL_INT(&values[15]);
938
+		sub.remote_cseq = VAL_INT(&values[16]);
939
+		sub.status = VAL_INT(&values[17]);
940
+		sub.version = VAL_INT(&values[18]);
941
+		sub.expires = VAL_INT(&values[19]) - (int)time(NULL);
942
+		if (sub.expires < 0) sub.expires = 0;
943
+		
944
+		if (rls_get_service_list(&sub.pres_uri, &sub.from_user,
945
+			&sub.from_domain, &service_node, &doc) < 0)
946
+		{
947
+			LM_ERR("failed getting resource list\n");
948
+			goto done;
949
+		}
950
+		if (doc == NULL)
951
+		{
952
+			LM_WARN("no document returned for uri <%.*s>\n",
953
+				sub.pres_uri.len, sub.pres_uri.s);
954
+			goto done;
955
+		}
956
+
957
+		if (send_full_notify(&sub, service_node, &sub.pres_uri, 0) < 0)
958
+		{
959
+			LM_ERR("failed sending full state notify\n");
960
+			goto done;
961
+		}
962
+
963
+		if (sub.expires == 0)
964
+			delete_rlsdb(&sub.callid, &sub.to_tag, &sub.from_tag);
965
+
966
+		xmlFreeDoc(doc);
967
+		doc = NULL;
968
+	}
969
+
970
+done:
971
+	if (result != NULL)
972
+		rls_dbf.free_result(rls_db, result);
973
+	if (doc != NULL)
974
+		xmlFreeDoc(doc);
975
+}
976
+
977
+static void timer_send_update_notifies(int round)
835 978
 {
836 979
 	db_key_t query_cols[1], update_cols[1], result_cols[6];
837 980
 	db_val_t query_vals[1], update_vals[1];
... ...
@@ -839,14 +985,11 @@ void timer_send_notify(unsigned int ticks,void *param)
839 839
 		pres_state_col, content_type_col;
840 840
 	int n_result_cols= 0;
841 841
 	db1_res_t *result= NULL;
842
-	int process_num = *((int *) param);
843 842
 
844 843
 	query_cols[0]= &str_updated_col;
845 844
 	query_vals[0].type = DB1_INT;
846 845
 	query_vals[0].nul = 0;
847
-	query_vals[0].val.int_val= subset + (waitn_time * rls_notifier_poll_rate
848
-						* process_num);
849
-	if (++subset > (waitn_time * rls_notifier_poll_rate) - 1) subset = 0;
846
+	query_vals[0].val.int_val= round;
850 847
 
851 848
 	result_cols[did_col= n_result_cols++]= &str_rlsubs_did_col;
852 849
 	result_cols[resource_uri_col= n_result_cols++]= &str_resource_uri_col;
... ...
@@ -892,6 +1035,22 @@ error:
892 892
 done:
893 893
 	if(result)
894 894
 		rlpres_dbf.free_result(rlpres_db, result);
895
+
896
+}
897
+
898
+void timer_send_notify(unsigned int ticks,void *param)
899
+{
900
+	if (dbmode == RLS_DB_ONLY)
901
+	{
902
+		int process_num = *((int *) param);
903
+		int round = subset + (waitn_time * rls_notifier_poll_rate * process_num);
904
+		if (++subset > (waitn_time * rls_notifier_poll_rate) - 1) subset = 0;
905
+
906
+		timer_send_full_state_notifies(round);
907
+		timer_send_update_notifies(round);
908
+	}
909
+	else
910
+		timer_send_update_notifies(UPDATED_TYPE);
895 911
 }
896 912
 
897 913
 
... ...
@@ -60,7 +60,7 @@
60 60
 MODULE_VERSION
61 61
 
62 62
 #define P_TABLE_VERSION 1
63
-#define W_TABLE_VERSION 1
63
+#define W_TABLE_VERSION 2
64 64
 #define X_TABLE_VERSION 4
65 65
 
66 66
 /** database connection */
... ...
@@ -548,14 +548,6 @@ static int mod_init(void)
548 548
 	if(rls_notifier_processes<= 0)
549 549
 		rls_notifier_processes= 1;
550 550
 
551
-	if ((rls_notifier_id = shm_malloc(sizeof(int) * rls_notifier_processes)) == NULL)
552
-	{
553
-		LM_ERR("allocating shared memory\n");
554
-		return -1;
555
-	}
556
-
557
-	register_basic_timers(rls_notifier_processes);
558
-
559 551
 	/* bind libxml wrapper functions */
560 552
 
561 553
 	if((bind_libxml=(bind_libxml_t)find_export("bind_libxml_api", 1, 0))== NULL)
... ...
@@ -646,6 +638,19 @@ static int mod_init(void)
646 646
 	if (rlpres_clean_period > 0)
647 647
 		register_timer(rls_presentity_clean, 0, rlpres_clean_period);
648 648
 
649
+	if(dbmode == RLS_DB_ONLY)
650
+	{
651
+		if ((rls_notifier_id = shm_malloc(sizeof(int) * rls_notifier_processes)) == NULL)
652
+		{
653
+			LM_ERR("allocating shared memory\n");
654
+			return -1;
655
+		}
656
+
657
+		register_basic_timers(rls_notifier_processes);
658
+	}
659
+	else
660
+		register_timer(timer_send_notify, 0, waitn_time);
661
+
649 662
 	if ((rls_update_subs_lock = lock_alloc()) == NULL)
650 663
 	{
651 664
 		LM_ERR("Failed to alloc rls_update_subs_lock\n");
... ...
@@ -668,7 +673,7 @@ static int child_init(int rank)
668 668
 	if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
669 669
 		return 0; /* don't call child_init for main process more than once */
670 670
 
671
-	if (rank==PROC_MAIN)
671
+	if (rank==PROC_MAIN && dbmode == RLS_DB_ONLY)
672 672
 	{
673 673
 		int i;
674 674
 
... ...
@@ -164,6 +164,9 @@ extern int get_dialog_subscribe_rlsdb(subs_t *s);
164 164
 subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag);
165 165
 extern int update_all_subs_rlsdb(str *from_user, str *from_domain, str *evt);
166 166
 
167
+extern int rls_get_service_list(str *service_uri, str *user, str *domain,
168
+			xmlNodePtr *service_node, xmlDocPtr *rootdoc);
169
+
167 170
 extern str str_rlsubs_did_col;
168 171
 extern str str_resource_uri_col;
169 172
 extern str str_updated_col;
... ...
@@ -391,8 +391,8 @@ int update_dialog_subscribe_rlsdb(subs_t *subs)
391 391
 {
392 392
 	db_key_t query_cols[3];
393 393
 	db_val_t query_vals[3];
394
-	db_key_t data_cols[2];
395
-	db_val_t data_vals[2];
394
+	db_key_t data_cols[3];
395
+	db_val_t data_vals[3];
396 396
 	int n_query_cols = 0, n_data_cols=0;
397 397
 
398 398
 	if (subs==NULL) return(-1);
... ...
@@ -439,6 +439,12 @@ int update_dialog_subscribe_rlsdb(subs_t *subs)
439 439
 	data_vals[n_data_cols].val.int_val= subs->remote_cseq;
440 440
 	n_data_cols++;
441 441
 
442
+	data_cols[n_data_cols] = &str_updated_col;
443
+	data_vals[n_data_cols].type = DB1_INT;
444
+	data_vals[n_data_cols].nul = 0;
445
+	data_vals[n_data_cols].val.int_val= subs->updated;
446
+	n_data_cols++;
447
+
442 448
 	if(rls_dbf.update(rls_db, query_cols, 0, query_vals,
443 449
                     data_cols,data_vals,n_query_cols,n_data_cols) < 0)
444 450
 	{
... ...
@@ -454,8 +460,8 @@ int update_dialog_subscribe_rlsdb(subs_t *subs)
454 454
 int insert_rlsdb( subs_t *s )
455 455
 
456 456
 {
457
-	db_key_t data_cols[20];
458
-	db_val_t data_vals[20];
457
+	db_key_t data_cols[21];
458
+	db_val_t data_vals[21];
459 459
 	int n_data_cols = 0;
460 460
 
461 461
 	if (s==NULL) return(-1);
... ...
@@ -591,6 +597,12 @@ int insert_rlsdb( subs_t *s )
591 591
 	data_vals[n_data_cols].nul = 0;
592 592
 	data_vals[n_data_cols].val.int_val= s->version;
593 593
 	n_data_cols++;
594
+
595
+	data_cols[n_data_cols]=&str_updated_col;
596
+	data_vals[n_data_cols].type = DB1_INT;
597
+	data_vals[n_data_cols].nul = 0;
598
+	data_vals[n_data_cols].val.int_val= s->updated;
599
+	n_data_cols++;
594 600
 	
595 601
 	if(rls_dbf.insert(rls_db, data_cols, data_vals, n_data_cols) < 0)
596 602
 	{
... ...
@@ -426,6 +426,7 @@ int rls_handle_subscribe(struct sip_msg* msg, char* s1, char* s2)
426 426
 	param_t* ev_param = NULL;
427 427
 	str reason;
428 428
 	int rt;
429
+	str rlsubs_did = {0, 0};
429 430
 
430 431
 	memset(&subs, 0, sizeof(subs_t));
431 432
 
... ...
@@ -532,6 +533,13 @@ int rls_handle_subscribe(struct sip_msg* msg, char* s1, char* s2)
532 532
 	}
533 533
 
534 534
 	hash_code = core_hash(&subs.callid, &subs.to_tag, hash_size);
535
+	if (CONSTR_RLSUBS_DID(&subs, &rlsubs_did) < 0)
536
+	{
537
+		LM_ERR("cannot build rls subs did\n");
538
+		goto error;
539
+	}
540
+	subs.updated = core_hash(&rlsubs_did, NULL,
541
+		(waitn_time * rls_notifier_poll_rate * rls_notifier_processes) - 1);
535 542
 	
536 543
 	if(get_to(msg)->tag_value.s==NULL || get_to(msg)->tag_value.len==0)
537 544
 	{ /* initial Subscribe */
... ...
@@ -652,7 +660,6 @@ int rls_handle_subscribe(struct sip_msg* msg, char* s1, char* s2)
652 652
 				return 0;
653 653
 			}
654 654
 		}	
655
-
656 655
 		if(rls_get_service_list(&subs.pres_uri, &subs.from_user,
657 656
 					&subs.from_domain, &service_node, &doc)<0)
658 657
 		{
... ...
@@ -668,12 +675,16 @@ int rls_handle_subscribe(struct sip_msg* msg, char* s1, char* s2)
668 668
 		}
669 669
 	}
670 670
 
671
-	/* sending notify with full state */
672
-	if(send_full_notify(&subs, service_node, &subs.pres_uri, hash_code)<0)
671
+	if (dbmode != RLS_DB_ONLY)
673 672
 	{
674
-		LM_ERR("failed sending full state notify\n");
675
-		goto error;
673
+		/* sending notify with full state */
674
+		if(send_full_notify(&subs, service_node, &subs.pres_uri, hash_code)<0)
675
+		{
676
+			LM_ERR("failed sending full state notify\n");
677
+			goto error;
678
+		}
676 679
 	}
680
+
677 681
 	/* send subscribe requests for all in the list */
678 682
 	if(resource_subscriptions(&subs, service_node)< 0)
679 683
 	{
... ...
@@ -681,17 +692,8 @@ int rls_handle_subscribe(struct sip_msg* msg, char* s1, char* s2)
681 681
 		goto error;
682 682
 	}
683 683
 
684
-	if (dbmode==RLS_DB_ONLY)
685
-	{
686
-		if(subs.expires==0)
687
-		{
688
-			delete_rlsdb(&subs.callid, &subs.to_tag, &subs.from_tag);
689
-		}
690
-	}
691
-	else
692
-	{
684
+	if (dbmode !=RLS_DB_ONLY)
693 685
 		remove_expired_rlsubs(&subs, hash_code);
694
-	}
695 686
 
696 687
 done:
697 688
 	if(contact!=NULL)
... ...
@@ -707,11 +709,15 @@ done:
707 707
 		pkg_free(subs.record_route.s);
708 708
 	if(doc!=NULL)
709 709
 		xmlFreeDoc(doc);
710
+	if (rlsubs_did.s != NULL)
711
+		pkg_free(rlsubs_did.s);
710 712
 	return 1;
711 713
 
712 714
 forpresence:
713 715
 	if(subs.pres_uri.s!=NULL)
714 716
 		pkg_free(subs.pres_uri.s);
717
+	if (rlsubs_did.s != NULL)
718
+		pkg_free(rlsubs_did.s);
715 719
 	return to_presence_code;
716 720
 
717 721
 bad_event:
... ...
@@ -739,6 +745,10 @@ error:
739 739
 
740 740
 	if(doc!=NULL)
741 741
 		xmlFreeDoc(doc);
742
+
743
+	if (rlsubs_did.s != NULL)
744
+		pkg_free(rlsubs_did.s);
745
+
742 746
 	return err_ret;
743 747
 }
744 748