Browse code

modules_k/rls: Use database row/table locking where supported in DB only mode

- Under load there are lots of DB deadlocks when using
(start|end)_transaction() with multiple presence processes and/or
servers.
- Without using (start|end)_transaction() multiple processes/servers
overwrite each others changes.
- Using row locking (where possible) and table locking (where
required) fixes these problems.
- IMPORTANT NOTE: DB only, multi-process/multi-server, presence will
only work properly under high-load when using a database driver that
supports transactions and locking (currently just db_postgres).

Peter Dunkley authored on 21/08/2012 14:41:27
Showing 4 changed files
... ...
@@ -101,10 +101,10 @@ int send_full_notify(subs_t* subs, xmlNodePtr rl_node, str* rl_uri,
101 101
 	int len_est;
102 102
 	res_param_t param;
103 103
 	int resource_added = 0; /* Flag to indicate that we have added at least one resource */
104
-	multipart_body = NULL;
104
+	multipart_body=NULL;
105
+	db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
105 106
 
106 107
 	LM_DBG("start\n");
107
-	/* query in alfabetical order */
108 108
 	
109 109
 	if(CONSTR_RLSUBS_DID(subs, &rlsubs_did)<0)
110 110
 	{
... ...
@@ -136,15 +136,15 @@ int send_full_notify(subs_t* subs, xmlNodePtr rl_node, str* rl_uri,
136 136
 
137 137
 	if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
138 138
 	{
139
-		if (rlpres_dbf.start_transaction(rlpres_db) < 0)
139
+		if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
140 140
 		{
141 141
 			LM_ERR("in start_transaction\n");
142 142
 			goto error;
143 143
 		}
144 144
 	}
145 145
 
146
-	if(rlpres_dbf.query(rlpres_db, query_cols, 0, query_vals, result_cols,
147
-					1, n_result_cols, &str_resource_uri_col, &result )< 0)
146
+	if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols,
147
+					1, n_result_cols, NULL, &result )< 0)
148 148
 	{
149 149
 		LM_ERR("in sql query\n");
150 150
 		goto error;
... ...
@@ -203,6 +203,15 @@ static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col,
203 203
 		ERR_MEM(PKG_MEM_STR);
204 204
 	}
205 205
 
206
+	if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction)
207
+	{
208
+		if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
209
+		{
210
+			LM_ERR("in start_transaction\n");
211
+			goto error;
212
+		}
213
+	}
214
+
206 215
 	LM_DBG("found %d records with updated state\n", result->n);
207 216
 	for(i= 0; i< result->n; i++)
208 217
 	{
... ...
@@ -420,9 +429,17 @@ static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col,
420 429
 		dialog= NULL;
421 430
 	}
422 431
 
423
-	
424
-error:
425 432
 done:
433
+	if (dbmode == RLS_DB_ONLY && rls_dbf.end_transaction)
434
+	{
435
+		if (rls_dbf.end_transaction(rls_db) < 0)
436
+		{
437
+			LM_ERR("in end_transaction\n");
438
+			goto error;
439
+		}
440
+	}
441
+
442
+error:
426 443
 	if(bstr.s)
427 444
 		pkg_free(bstr.s);
428 445
 
... ...
@@ -430,6 +447,13 @@ done:
430 447
 		pkg_free(buf);
431 448
 	if(dialog)
432 449
 		pkg_free(dialog);
450
+
451
+	if (dbmode == RLS_DB_ONLY && rls_dbf.abort_transaction)
452
+	{
453
+		if (rls_dbf.abort_transaction(rls_db) < 0)
454
+			LM_ERR("in abort_transaction\n");
455
+	}
456
+
433 457
 	return;
434 458
 }
435 459
 
... ...
@@ -769,7 +793,7 @@ int rls_handle_notify(struct sip_msg* msg, char* c1, char* c2)
769 793
 
770 794
 	if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
771 795
 	{
772
-		if (rlpres_dbf.start_transaction(rlpres_db) < 0)
796
+		if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
773 797
 		{
774 798
 			LM_ERR("in start_transaction\n");
775 799
 			goto error;
... ...
@@ -883,6 +907,7 @@ static void timer_send_full_state_notifies(int round)
883 907
 	xmlDocPtr doc = NULL;
884 908
 	xmlNodePtr service_node = NULL;
885 909
 	int now = (int)time(NULL);
910
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
886 911
 
887 912
 	query_cols[0] = &str_updated_col;
888 913
 	query_vals[0].type = DB1_INT;
... ...
@@ -925,7 +950,7 @@ static void timer_send_full_state_notifies(int round)
925 950
 
926 951
 	if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction)
927 952
 	{
928
-		if (rls_dbf.start_transaction(rls_db) < 0)
953
+		if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
929 954
 		{
930 955
 			LM_ERR("in start_transaction\n");
931 956
 			goto done;
... ...
@@ -933,7 +958,7 @@ static void timer_send_full_state_notifies(int round)
933 958
 	}
934 959
 
935 960
 	/* Step 1: Find rls_watchers that require full-state notification */
936
-	if (rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols,
961
+	if (query_fn(rls_db, query_cols, 0, query_vals, result_cols,
937 962
 				1, n_result_cols, 0, &result) < 0)
938 963
 	{
939 964
 		LM_ERR("in sql query\n");
... ...
@@ -1051,6 +1076,7 @@ static void timer_send_update_notifies(int round)
1051 1076
 		pres_state_col, content_type_col;
1052 1077
 	int n_result_cols= 0;
1053 1078
 	db1_res_t *result= NULL;
1079
+	db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
1054 1080
 
1055 1081
 	query_cols[0]= &str_updated_col;
1056 1082
 	query_vals[0].type = DB1_INT;
... ...
@@ -1080,14 +1106,14 @@ static void timer_send_update_notifies(int round)
1080 1106
 
1081 1107
 	if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
1082 1108
 	{
1083
-		if (rlpres_dbf.start_transaction(rlpres_db) < 0)
1109
+		if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
1084 1110
 		{
1085 1111
 			LM_ERR("in start_transaction\n");
1086 1112
 			goto done;
1087 1113
 		}
1088 1114
 	}
1089 1115
 
1090
-	if(rlpres_dbf.query(rlpres_db, query_cols, 0, query_vals, result_cols,
1116
+	if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols,
1091 1117
 					1, n_result_cols, &str_rlsubs_did_col, &result)< 0)
1092 1118
 	{
1093 1119
 		LM_ERR("in sql query\n");
... ...
@@ -124,6 +124,7 @@ int delete_expired_subs_rlsdb( void )
124 124
 	int i;
125 125
 	subs_t subs;
126 126
 	str rlsubs_did = {0, 0};
127
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
127 128
 
128 129
 	if(rls_db == NULL)
129 130
 	{
... ...
@@ -148,7 +149,16 @@ int delete_expired_subs_rlsdb( void )
148 149
 	result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col;
149 150
 	result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
150 151
 
151
-	if(rls_dbf.query(rls_db, query_cols, query_ops, query_vals, result_cols, 
152
+	if (rls_dbf.start_transaction)
153
+	{
154
+		if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
155
+		{
156
+			LM_ERR("in start_transaction\n");
157
+			goto error;
158
+		}
159
+	}
160
+
161
+	if(query_fn(rls_db, query_cols, query_ops, query_vals, result_cols, 
152 162
 				n_query_cols, n_result_cols, 0, &result )< 0)
153 163
 	{
154 164
 		LM_ERR("Can't query db\n");
... ...
@@ -213,11 +223,28 @@ int delete_expired_subs_rlsdb( void )
213 223
 	}
214 224
 
215 225
 	rls_dbf.free_result(rls_db, result);
226
+
227
+	if (rls_dbf.end_transaction)
228
+	{
229
+		if (rls_dbf.end_transaction(rls_db) < 0)
230
+		{
231
+			LM_ERR("in end_transaction\n");
232
+			goto error;
233
+		}
234
+	}
235
+
216 236
 	return 1;
217 237
 
218 238
 error:
219 239
 	if (result) rls_dbf.free_result(rls_db, result);
220 240
 	if (rlsubs_did.s) pkg_free(rlsubs_did.s);
241
+
242
+	if (rls_dbf.abort_transaction)
243
+	{
244
+		if (rls_dbf.abort_transaction(rls_db) < 0)
245
+			LM_ERR("in abort_transaction\n");
246
+	}
247
+
221 248
 	return -1;
222 249
 }
223 250
 
... ...
@@ -718,7 +745,8 @@ int get_dialog_subscribe_rlsdb(subs_t *subs)
718 745
 	int nr_rows;
719 746
 	int r_remote_cseq, r_local_cseq, r_version;
720 747
 	char *r_pres_uri, *r_record_route;
721
-
748
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
749
+	
722 750
 	if(rls_db == NULL)
723 751
 	{
724 752
 		LM_ERR("null database connection\n");
... ...
@@ -761,7 +789,7 @@ int get_dialog_subscribe_rlsdb(subs_t *subs)
761 789
 	result_cols[version_col = n_result_cols++] = &str_version_col;
762 790
 	result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
763 791
 
764
-	if(rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, 
792
+	if(query_fn(rls_db, query_cols, 0, query_vals, result_cols, 
765 793
 			n_query_cols, n_result_cols, 0, &result )< 0)
766 794
 	{
767 795
 		LM_ERR("Can't query db\n");
... ...
@@ -865,6 +893,7 @@ subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag)
865 893
 	subs_t *dest;
866 894
 	event_t parsed_event;
867 895
 	str ev_sname;
896
+	db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
868 897
 
869 898
 	if(rls_db == NULL)
870 899
 	{
... ...
@@ -919,7 +948,7 @@ subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag)
919 948
 	result_cols[r_version_col=n_result_cols++] = &str_version_col;
920 949
 	result_cols[r_expires_col=n_result_cols++] = &str_expires_col;
921 950
 
922
-	if(rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, 
951
+	if(query_fn(rls_db, query_cols, 0, query_vals, result_cols, 
923 952
 				n_query_cols, n_result_cols, 0, &result )< 0)
924 953
 	{
925 954
 		LM_ERR("Can't query db\n");
... ...
@@ -639,6 +639,15 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
639 639
 		/* search if a stored dialog */
640 640
 		if ( dbmode == RLS_DB_ONLY )
641 641
 		{
642
+			if (rls_dbf.start_transaction)
643
+			{
644
+				if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
645
+				{
646
+					LM_ERR("in start_transaction\n");
647
+					goto error;
648
+				}
649
+			}
650
+
642 651
 			rt = get_dialog_subscribe_rlsdb(&subs);
643 652
 
644 653
 			if (rt <= 0)
... ...
@@ -646,6 +655,16 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
646 655
 				LM_DBG("subscription dialog not found for <%.*s@%.*s>\n",
647 656
 						subs.watcher_user.len, subs.watcher_user.s,
648 657
 						subs.watcher_domain.len, subs.watcher_domain.s);
658
+
659
+				if (rls_dbf.end_transaction)
660
+				{
661
+					if (rls_dbf.end_transaction(rls_db) < 0)
662
+					{
663
+						LM_ERR("in end_transaction\n");
664
+						goto error;
665
+					}
666
+				}
667
+
649 668
 				goto forpresence;
650 669
 			}
651 670
 			else if(rt>=400)
... ...
@@ -657,6 +676,16 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
657 676
 					LM_ERR("while sending reply\n");
658 677
 					goto error;
659 678
 				}
679
+
680
+				if (rls_dbf.end_transaction)
681
+				{
682
+					if (rls_dbf.end_transaction(rls_db) < 0)
683
+					{
684
+						LM_ERR("in end_transaction\n");
685
+						goto error;
686
+					}
687
+				}
688
+
660 689
 				ret = 0;
661 690
 				goto stop;
662 691
 			}
... ...
@@ -670,6 +699,15 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma
670 699
 				LM_ERR("while updating resource list subscription\n");
671 700
 				goto error;
672 701
 			}
702
+
703
+			if (rls_dbf.end_transaction)
704
+			{
705
+				if (rls_dbf.end_transaction(rls_db) < 0)
706
+				{
707
+					LM_ERR("in end_transaction\n");
708
+					goto error;
709
+				}
710
+			}
673 711
 		}
674 712
 		else
675 713
 		{
... ...
@@ -795,6 +833,12 @@ error:
795 833
 	if (rlsubs_did.s != NULL)
796 834
 		pkg_free(rlsubs_did.s);
797 835
 
836
+	if (rls_dbf.abort_transaction)
837
+	{
838
+		if (rls_dbf.abort_transaction(rls_db) < 0)
839
+			LM_ERR("in abort_transaction\n");
840
+	}
841
+	
798 842
 	return err_ret;
799 843
 }
800 844