Browse code

db_text: implement fetch and memory constraints

when dealing with large db_text files, pkg_memory is not suitable for
operating the database.

implementing fetch allows modules like presence & registrar & usrloc
to query large tables without constraints on pkg_memory.

creates tmp tables in shared memory for query results

(cherry picked from commit 18c64d2c9ff1527655055f75aa22e7d68c307874)

Conflicts:
src/modules/db_text/db_text.c
src/modules/db_text/dbt_api.c
src/modules/db_text/dbt_base.c
src/modules/db_text/dbt_lib.c

lazedo authored on 18/01/2017 10:00:22 • Luis Azedo committed on 18/01/2017 10:30:32
Showing 10 changed files
... ...
@@ -38,6 +38,7 @@ static int mod_init(void);
38 38
 static void destroy(void);
39 39
 
40 40
 #define DEFAULT_DB_TEXT_READ_BUFFER_SIZE 16384
41
+#define DEFAULT_MAX_RESULT_ROWS 100000;
41 42
 
42 43
 /*
43 44
  * Module parameter variables
... ...
@@ -45,6 +46,7 @@ static void destroy(void);
45 46
 int db_mode = 0;  /* Database usage mode: 0 = cache, 1 = no cache */
46 47
 int empty_string = 0;  /* Treat empty string as "" = 0, 1 = NULL */
47 48
 int _db_text_read_buffer_size = DEFAULT_DB_TEXT_READ_BUFFER_SIZE;
49
+int _db_text_max_result_rows = DEFAULT_MAX_RESULT_ROWS;
48 50
 
49 51
 int dbt_bind_api(db_func_t *dbb);
50 52
 
... ...
@@ -64,6 +66,7 @@ static param_export_t params[] = {
64 66
 	{"db_mode", INT_PARAM, &db_mode},
65 67
 	{"emptystring", INT_PARAM, &empty_string},
66 68
 	{"file_buffer_size", INT_PARAM, &_db_text_read_buffer_size},
69
+	{"max_result_rows", INT_PARAM, &_db_text_max_result_rows},
67 70
 	{0, 0, 0}
68 71
 };
69 72
 
... ...
@@ -108,7 +111,7 @@ static int mod_init(void)
108 111
 static void destroy(void)
109 112
 {
110 113
 	LM_DBG("destroy ...\n");
111
-	dbt_cache_print(0);
114
+	dbt_cache_print2(0, 0);
112 115
 	dbt_cache_destroy();
113 116
 }
114 117
 
... ...
@@ -125,6 +128,7 @@ int dbt_bind_api(db_func_t *dbb)
125 128
 	dbb->init        = dbt_init;
126 129
 	dbb->close       = dbt_close;
127 130
 	dbb->query       = (db_query_f)dbt_query;
131
+	dbb->fetch_result = (db_fetch_result_f) dbt_fetch_result;
128 132
 	dbb->free_result = dbt_free_result;
129 133
 	dbb->insert      = (db_insert_f)dbt_insert;
130 134
 	dbb->delete      = (db_delete_f)dbt_delete;
... ...
@@ -132,8 +136,7 @@ int dbt_bind_api(db_func_t *dbb)
132 136
 	dbb->replace     = (db_replace_f)dbt_replace;
133 137
 	dbb->affected_rows = (db_affected_rows_f) dbt_affected_rows;
134 138
 	dbb->raw_query   = (db_raw_query_f) dbt_raw_query;
135
-	dbb->cap         = DB_CAP_ALL | DB_CAP_AFFECTED_ROWS | DB_CAP_RAW_QUERY
136
-							| DB_CAP_REPLACE;
139
+	dbb->cap         = DB_CAP_ALL | DB_CAP_AFFECTED_ROWS | DB_CAP_RAW_QUERY | DB_CAP_REPLACE | DB_CAP_FETCH;
137 140
 
138 141
 	return 0;
139 142
 }
... ...
@@ -56,6 +56,10 @@ int dbt_free_result(db1_con_t* _h, db1_res_t* _r);
56 56
 int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
57 57
 			db_key_t* _c, int _n, int _nc, db_key_t _o, db1_res_t** _r);
58 58
 
59
+/*
60
+ * fetch result
61
+ */
62
+int dbt_fetch_result(db1_con_t* _h, db1_res_t** _r, const int nrows);
59 63
 
60 64
 /*
61 65
  * Raw SQL query
... ...
@@ -39,7 +39,7 @@ int dbt_use_table(db1_con_t* _h, const str* _t)
39 39
 /*
40 40
  * Get and convert columns from a result
41 41
  */
42
-static int dbt_get_columns(db1_res_t* _r, dbt_result_p _dres)
42
+static int dbt_get_columns(db1_res_t* _r, dbt_table_p _dres)
43 43
 {
44 44
 	int col;
45 45
 
... ...
@@ -73,10 +73,10 @@ static int dbt_get_columns(db1_res_t* _r, dbt_result_p _dres)
73 73
 		LM_DBG("allocate %d bytes for RES_NAMES[%d] at %p\n",
74 74
 				(int)sizeof(str), col,
75 75
 				RES_NAMES(_r)[col]);
76
-		RES_NAMES(_r)[col]->s = _dres->colv[col].name.s;
77
-		RES_NAMES(_r)[col]->len = _dres->colv[col].name.len;
76
+		RES_NAMES(_r)[col]->s = _dres->colv[col]->name.s;
77
+		RES_NAMES(_r)[col]->len = _dres->colv[col]->name.len;
78 78
 
79
-		switch(_dres->colv[col].type)
79
+		switch(_dres->colv[col]->type)
80 80
 		{
81 81
 			case DB1_STR:
82 82
 			case DB1_STRING:
... ...
@@ -84,12 +84,12 @@ static int dbt_get_columns(db1_res_t* _r, dbt_result_p _dres)
84 84
 			case DB1_INT:
85 85
 			case DB1_DATETIME:
86 86
 			case DB1_DOUBLE:
87
-				RES_TYPES(_r)[col] = _dres->colv[col].type;
87
+				RES_TYPES(_r)[col] = _dres->colv[col]->type;
88 88
 			break;
89 89
 			default:
90 90
 				LM_WARN("unhandled data type column (%.*s) type id (%d), "
91 91
 						"use STR as default\n", RES_NAMES(_r)[col]->len,
92
-						RES_NAMES(_r)[col]->s, _dres->colv[col].type);
92
+						RES_NAMES(_r)[col]->s, _dres->colv[col]->type);
93 93
 				RES_TYPES(_r)[col] = DB1_STR;
94 94
 			break;
95 95
 		}
... ...
@@ -173,7 +173,7 @@ static int dbt_convert_row(db1_res_t* _res, db_row_t* _r, dbt_row_p _r1)
173 173
 			break;
174 174
 
175 175
 			default:
176
-				LM_ERR("val type [%d] not supported\n", RES_TYPES(_res)[i]);
176
+				LM_ERR("val type [%d] for column %i not supported\n", RES_TYPES(_res)[i], i);
177 177
 				return -1;
178 178
 		}
179 179
 	}
... ...
@@ -184,25 +184,31 @@ static int dbt_convert_row(db1_res_t* _res, db_row_t* _r, dbt_row_p _r1)
184 184
 /*
185 185
  * Convert rows from internal to db API representation
186 186
  */
187
-static int dbt_convert_rows(db1_res_t* _r, dbt_result_p _dres)
187
+static int dbt_convert_rows(db1_res_t* _r, dbt_table_p _dres, int offset, int nrows)
188 188
 {
189
-	int row;
189
+	int row = 0, c = 0;
190 190
 	dbt_row_p _rp = NULL;
191 191
 	if (!_r || !_dres) {
192 192
 		LM_ERR("invalid parameter\n");
193 193
 		return -1;
194 194
 	}
195
-	RES_ROW_N(_r) = _dres->nrrows;
196
-	if (!RES_ROW_N(_r)) {
195
+
196
+	if (nrows == 0) {
197 197
 		return 0;
198 198
 	}
199
+
199 200
 	if (db_allocate_rows(_r) < 0) {
200 201
 		LM_ERR("could not allocate rows\n");
201 202
 		return -2;
202 203
 	}
203
-	row = 0;
204
+
204 205
 	_rp = _dres->rows;
205
-	while(_rp) {
206
+	while(_rp && c < offset) {
207
+		c++;
208
+		_rp = _rp->next;
209
+	}
210
+
211
+	while(_rp && row < nrows) {
206 212
 		if (dbt_convert_row(_r, &(RES_ROWS(_r)[row]), _rp) < 0) {
207 213
 			LM_ERR("failed to convert row #%d\n", row);
208 214
 			RES_ROW_N(_r) = row;
... ...
@@ -212,36 +218,76 @@ static int dbt_convert_rows(db1_res_t* _r, dbt_result_p _dres)
212 218
 		row++;
213 219
 		_rp = _rp->next;
214 220
 	}
221
+	RES_ROW_N(_r) = row;
222
+	RES_LAST_ROW(_r) = c + row;
215 223
 	return 0;
216 224
 }
217 225
 
226
+static int dbt_convert_all_rows(db1_res_t* _r, dbt_table_p _dres)
227
+{
228
+	if (!_r || !_dres) {
229
+		LM_ERR("invalid parameter\n");
230
+		return -1;
231
+	}
232
+	RES_ROW_N(_r) = _dres->nrrows;
233
+	return dbt_convert_rows(_r, _dres, 0, _dres->nrrows);
234
+}
235
+
236
+
218 237
 
219 238
 /*
220 239
  * Fill the structure with data from database
221 240
  */
222
-static int dbt_convert_result(db1_res_t* _r, dbt_result_p _dres)
241
+//static int dbt_convert_result(db1_res_t* _r, dbt_table_p _dres)
242
+//{
243
+//	if (!_r || !_dres) {
244
+//		LM_ERR("invalid parameter\n");
245
+//		return -1;
246
+//	}
247
+//	if (dbt_get_columns(_r, _dres) < 0) {
248
+//		LM_ERR("failed to get column names\n");
249
+//		return -2;
250
+//	}
251
+//
252
+//	if (dbt_convert_all_rows(_r, _dres) < 0) {
253
+//		LM_ERR("failed to convert rows\n");
254
+//		db_free_columns(_r);
255
+//		return -3;
256
+//	}
257
+//	return 0;
258
+//}
259
+
260
+/*
261
+ * Retrieve result set
262
+ */
263
+int dbt_get_result(db1_res_t** _r, dbt_table_p _dres)
223 264
 {
224
-	if (!_r || !_dres) {
225
-		LM_ERR("invalid parameter\n");
226
-		return -1;
265
+	int res = dbt_init_result(_r, _dres);
266
+	if ( res != 0) {
267
+		return res;
227 268
 	}
228
-	if (dbt_get_columns(_r, _dres) < 0) {
229
-		LM_ERR("failed to get column names\n");
230
-		return -2;
269
+
270
+	if (dbt_convert_all_rows(*_r, _dres) < 0) {
271
+		LM_ERR("failed to convert rows\n");
272
+		db_free_columns(*_r);
273
+		return -3;
231 274
 	}
232 275
 
233
-	if (dbt_convert_rows(_r, _dres) < 0) {
276
+	return 0;
277
+}
278
+
279
+int dbt_get_next_result(db1_res_t** _r, int offset, int rows)
280
+{
281
+	dbt_table_p _dres = (dbt_table_p)(*_r)->ptr;
282
+	if (dbt_convert_rows(*_r, _dres, offset, rows) < 0) {
234 283
 		LM_ERR("failed to convert rows\n");
235
-		db_free_columns(_r);
284
+		db_free_columns(*_r);
236 285
 		return -3;
237 286
 	}
238 287
 	return 0;
239 288
 }
240 289
 
241
-/*
242
- * Retrieve result set
243
- */
244
-int dbt_get_result(db1_res_t** _r, dbt_result_p _dres)
290
+int dbt_init_result(db1_res_t** _r, dbt_table_p _dres)
245 291
 {
246 292
 	if ( !_r) {
247 293
 		LM_ERR("invalid parameter value\n");
... ...
@@ -262,13 +308,12 @@ int dbt_get_result(db1_res_t** _r, dbt_result_p _dres)
262 308
 		return -2;
263 309
 	}
264 310
 
265
-	if (dbt_convert_result(*_r, _dres) < 0)
266
-	{
267
-		LM_ERR("failed to convert result\n");
268
-		pkg_free(*_r);
269
-		return -4;
311
+	if (dbt_get_columns(*_r, _dres) < 0) {
312
+		LM_ERR("failed to get column names\n");
313
+		return -2;
270 314
 	}
271 315
 
316
+	RES_NUM_ROWS(*_r) = _dres->nrrows;
272 317
 	(*_r)->ptr = _dres;
273 318
 	return 0;
274 319
 }
... ...
@@ -34,7 +34,11 @@
34 34
 /*
35 35
  * Retrieve result set
36 36
  */
37
-int dbt_get_result(db1_res_t** _r, dbt_result_p _dres);
37
+//int dbt_get_result(db1_res_t** _r, dbt_result_p _dres);
38
+int dbt_get_result(db1_res_t** _r, dbt_table_p _dres);
39
+int dbt_init_result(db1_res_t** _r, dbt_table_p _dres);
40
+int dbt_get_next_result(db1_res_t** _r, int offset, int rows);
41
+
38 42
 
39 43
 int dbt_use_table(db1_con_t* _h, const str* _t);
40 44
 
... ...
@@ -131,7 +131,7 @@ int dbt_free_result(db1_con_t* _h, db1_res_t* _r)
131 131
 	if (!_r)
132 132
 		return 0;
133 133
 
134
-	if(dbt_result_free((dbt_result_p)_r->ptr) < 0)
134
+	if(dbt_result_free(_h, (dbt_table_p)_r->ptr) < 0)
135 135
 	{
136 136
 		LM_ERR("unable to free internal structure\n");
137 137
 	}
... ...
@@ -145,6 +145,7 @@ int dbt_free_result(db1_con_t* _h, db1_res_t* _r)
145 145
 	return 0;
146 146
 }
147 147
 
148
+static dbt_table_p last_temp_table = NULL; 
148 149
 
149 150
 /*
150 151
  * Query table for specified rows
... ...
@@ -162,9 +163,13 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
162 163
 			db_key_t* _c, int _n, int _nc, db_key_t _o, db1_res_t** _r)
163 164
 {
164 165
 	dbt_table_p _tbc = NULL;
166
+	dbt_table_p _tbc_temp = NULL;
165 167
 	dbt_row_p _drp = NULL;
166
-	dbt_result_p _dres = NULL;
168
+	dbt_row_p *_res = NULL;
169
+//	dbt_result_p _dres = NULL;
167 170
 	int result = 0;
171
+	int counter = 0;
172
+	int i=0;
168 173
 
169 174
 	int *lkey=NULL, *lres=NULL;
170 175
 
... ...
@@ -172,15 +177,16 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
172 177
 	char *_o_op=NULL;       /* operators for oder-by */
173 178
 	int _o_n;               /* no of elements in order-by */
174 179
 	int *_o_l=NULL;         /* column selection for order-by */
175
-	int _o_nc;              /* no of elements in _o_l but not lres */
180
+//	int _o_nc;              /* no of elements in _o_l but not lres */
176 181
 
177
-	if ((!_h) || (!_r) || !CON_TABLE(_h))
182
+	if(_r)
183
+		*_r = NULL;
184
+
185
+	if ((!_h) || !CON_TABLE(_h))
178 186
 	{
179 187
 		LM_ERR("invalid parameters\n");
180 188
 		return -1;
181 189
 	}
182
-	*_r = NULL;
183
-
184 190
 
185 191
 	if (_o)
186 192
 	{
... ...
@@ -188,11 +194,19 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
188 194
 			return -1;
189 195
 	}
190 196
 
197
+	_tbc_temp = dbt_db_get_temp_table(DBT_CON_CONNECTION(_h));
198
+	if(!_tbc_temp)
199
+	{
200
+		LM_ERR("unable to allocate temp table\n");
201
+		return -1;
202
+	}
203
+
191 204
 	/* lock database */
192 205
 	_tbc = dbt_db_get_table(DBT_CON_CONNECTION(_h), CON_TABLE(_h));
193 206
 	if(!_tbc)
194 207
 	{
195 208
 		LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
209
+		dbt_db_del_table(DBT_CON_CONNECTION(_h), &_tbc_temp->name, 0);
196 210
 		return -1;
197 211
 	}
198 212
 
... ...
@@ -220,72 +234,117 @@ int dbt_query(db1_con_t* _h, db_key_t* _k, db_op_t* _op, db_val_t* _v,
220 234
 		if (!_o_l)
221 235
 			goto error;
222 236
 		/* enlarge select-columns lres by all order-by columns, _o_nc is how many */
223
-		if (dbt_mangle_columnselection(&lres, &_nc, &_o_nc, _o_l, _o_n) < 0)
224
-			goto error;
237
+//		if (dbt_mangle_columnselection(&lres, &_nc, &_o_nc, _o_l, _o_n) < 0)
238
+//			goto error;
225 239
 	}
226 240
 
241
+/*
227 242
 	LM_DBG("new res with %d cols\n", _nc);
228 243
 	_dres = dbt_result_new(_tbc, lres, _nc);
229 244
 
230 245
 	if(!_dres)
231 246
 		goto error;
247
+*/
248
+
249
+		dbt_column_p pPrevCol = NULL;
250
+		_tbc_temp->colv = (dbt_column_p*) shm_malloc(_nc*sizeof(dbt_column_p));
251
+		for(i=0; i < _nc; i++) {
252
+			dbt_column_p pCol = dbt_column_new(_tbc->colv[ lres[i] ]->name.s, _tbc->colv[ lres[i] ]->name.len);
253
+			pCol->type = _tbc->colv[ lres[i] ]->type;
254
+			pCol->flag = _tbc->colv[ lres[i] ]->flag;
255
+			if(pPrevCol)
256
+			{
257
+				pCol->prev = pPrevCol;
258
+				pPrevCol->next = pCol;
259
+			}
260
+			else
261
+				_tbc_temp->cols = pCol;
262
+
263
+			_tbc_temp->colv[i] = pCol;
264
+			pPrevCol = pCol;
265
+			_tbc_temp->nrcols++;
266
+		}
267
+
268
+	_res = (dbt_row_p*) pkg_malloc(_db_text_max_result_rows * sizeof(dbt_row_p));
269
+	if(!_res) {
270
+		LM_ERR("no more space to allocate for query rows\n");
271
+		goto error;
272
+	}
273
+
232 274
 
233 275
 	_drp = _tbc->rows;
234
-	while(_drp)
276
+	while(_drp && counter < _db_text_max_result_rows)
235 277
 	{
236 278
 		if(dbt_row_match(_tbc, _drp, lkey, _op, _v, _n))
237 279
 		{
238
-			if(dbt_result_extract_fields(_tbc, _drp, lres, _dres))
239
-			{
240
-				LM_ERR("failed to extract result fields!\n");
241
-				goto clean;
242
-			}
280
+			_res[counter] = _drp;
281
+//			if(dbt_result_extract_fields(_tbc, _drp, lres, _dres))
282
+//			{
283
+//				LM_ERR("failed to extract result fields!\n");
284
+//				goto clean;
285
+//			}
286
+			counter++;
243 287
 		}
244 288
 		_drp = _drp->next;
245 289
 	}
246 290
 
247
-	dbt_table_update_flags(_tbc, DBT_TBFL_ZERO, DBT_FL_IGN, 1);
248
-
249
-	/* unlock database */
250
-	dbt_release_table(DBT_CON_CONNECTION(_h), CON_TABLE(_h));
251
-
252 291
 	if (_o_l)
253 292
 	{
254
-		if (_dres->nrrows > 1)
293
+		if (counter > 1)
255 294
 		{
256
-			if (dbt_sort_result(_dres, _o_l, _o_op, _o_n, lres, _nc) < 0)
257
-				goto error_nounlock;
295
+			if (dbt_sort_result_temp(_res, counter, _o_l, _o_op, _o_n) < 0)
296
+				goto error;
258 297
 		}
259 298
 
260 299
 		/* last but not least, remove surplus columns */
261
-		if (_o_nc)
262
-			dbt_project_result(_dres, _o_nc);
300
+//		if (_o_nc)
301
+//			dbt_project_result(_dres, _o_nc);
263 302
 	}
264 303
 
304
+	// copy results to temp table
305
+	_tbc_temp->rows = dbt_result_extract_results(_tbc, _res, counter, lres, _nc);
306
+	_tbc_temp->nrrows = (_tbc_temp->rows == NULL ? 0 : counter);
265 307
 
266
-	/* dbt_result_print(_dres); */
308
+	dbt_table_update_flags(_tbc, DBT_TBFL_ZERO, DBT_FL_IGN, 1);
309
+
310
+	/* unlock database */
311
+	dbt_release_table(DBT_CON_CONNECTION(_h), CON_TABLE(_h));
312
+
313
+// 	DBT_CON_TEMP_TABLE(_h) = _tbc_temp;
314
+	last_temp_table = _tbc_temp;
315
+//	dbt_release_table(DBT_CON_CONNECTION(_h), &_tbc_temp->name);
316
+
317
+//	dbt_result_print(_tbc_temp);
267 318
 
268 319
 	if(lkey)
269 320
 		pkg_free(lkey);
270 321
 	if(lres)
271 322
 		pkg_free(lres);
272 323
 	if(_o_k)
273
-		pkg_free(_o_k);
274
-	if(_o_op)
275
-		pkg_free(_o_op);
276
-	if(_o_l)
277
-		pkg_free(_o_l);
278
-
279
-	result = dbt_get_result(_r, _dres);
280
-	if(result != 0)
281
-		dbt_result_free(_dres);
324
+ 		pkg_free(_o_k);
325
+ 	if(_o_op)
326
+ 		pkg_free(_o_op);
327
+ 	if(_o_l)
328
+ 		pkg_free(_o_l);
329
+ 	if(_res)
330
+ 		pkg_free(_res);
331
+
332
+ 	if(_r) {
333
+ 		result = dbt_get_result(_r, _tbc_temp);
334
+// 		dbt_db_del_table(DBT_CON_CONNECTION(_h), &_tbc_temp->name, 1);
335
+		if(result != 0)
336
+ 			dbt_result_free(_h, _tbc_temp);
337
+ 	}
282 338
 
283 339
 	return result;
284 340
 
285 341
 error:
286
-	/* unlock database */
287
-	dbt_release_table(DBT_CON_CONNECTION(_h), CON_TABLE(_h));
288
-error_nounlock:
342
+    /* unlock database */
343
+    dbt_release_table(DBT_CON_CONNECTION(_h), CON_TABLE(_h));
344
+    /* delete temp table */
345
+    dbt_db_del_table(DBT_CON_CONNECTION(_h), &_tbc_temp->name, 1);
346
+	if(_res)
347
+		pkg_free(_res);
289 348
 	if(lkey)
290 349
 		pkg_free(lkey);
291 350
 	if(lres)
... ...
@@ -296,14 +355,12 @@ error_nounlock:
296 355
 		pkg_free(_o_op);
297 356
 	if(_o_l)
298 357
 		pkg_free(_o_l);
299
-	if(_dres)
300
-		dbt_result_free(_dres);
301 358
 	LM_ERR("failed to query the table!\n");
302 359
 
303 360
 	return -1;
304 361
 
362
+/*
305 363
 clean:
306
-	/* unlock database */
307 364
 	dbt_release_table(DBT_CON_CONNECTION(_h), CON_TABLE(_h));
308 365
 	if(lkey)
309 366
 		pkg_free(lkey);
... ...
@@ -315,10 +372,54 @@ clean:
315 372
 		pkg_free(_o_op);
316 373
 	if(_o_l)
317 374
 		pkg_free(_o_l);
318
-	if(_dres)
319
-		dbt_result_free(_dres);
320 375
 
321 376
 	return -1;
377
+*/
378
+}
379
+
380
+
381
+int dbt_fetch_result(db1_con_t* _h, db1_res_t** _r, const int nrows)
382
+{
383
+	int rows;
384
+
385
+	if (!_h || !_r || nrows < 0) {
386
+		LM_ERR("Invalid parameter value\n");
387
+		return -1;
388
+	}
389
+
390
+	/* exit if the fetch count is zero */
391
+	if (nrows == 0) {
392
+		dbt_free_result(_h, *_r);
393
+		*_r = 0;
394
+		return 0;
395
+	}
396
+
397
+	if(*_r==0) {
398
+		/* Allocate a new result structure */
399
+		dbt_init_result(_r, last_temp_table);
400
+	} else {
401
+		/* free old rows */
402
+		if(RES_ROWS(*_r)!=0)
403
+			db_free_rows(*_r);
404
+		RES_ROWS(*_r) = 0;
405
+		RES_ROW_N(*_r) = 0;
406
+	}
407
+
408
+	/* determine the number of rows remaining to be processed */
409
+	rows = RES_NUM_ROWS(*_r) - RES_LAST_ROW(*_r);
410
+
411
+	/* If there aren't any more rows left to process, exit */
412
+	if(rows<=0)
413
+		return 0;
414
+
415
+	/* if the fetch count is less than the remaining rows to process                 */
416
+	/* set the number of rows to process (during this call) equal to the fetch count */
417
+	if(nrows < rows)
418
+		rows = nrows;
419
+	
420
+	RES_ROW_N(*_r) = rows;
421
+
422
+	return dbt_get_next_result(_r, RES_LAST_ROW(*_r), rows);
322 423
 }
323 424
 
324 425
 /*
... ...
@@ -40,6 +40,8 @@ static gen_lock_t *_dbt_cachesem = NULL;
40 40
 
41 41
 static dbt_tbl_cachel_p _dbt_cachetbl = NULL;
42 42
 
43
+extern int is_main;
44
+
43 45
 #define DBT_CACHETBL_SIZE	16
44 46
 
45 47
 /**
... ...
@@ -265,6 +267,7 @@ dbt_table_p dbt_db_get_table(dbt_cache_p _dc, const str *_s)
265 267
 	hash = core_hash(&_dc->name, _s, DBT_CACHETBL_SIZE);
266 268
 	hashidx = hash % DBT_CACHETBL_SIZE;
267 269
 
270
+	if(!is_main)
268 271
 	lock_get(&_dbt_cachetbl[hashidx].sem);
269 272
 
270 273
 	_tbc = _dbt_cachetbl[hashidx].dtp;
... ...
@@ -380,7 +383,7 @@ int dbt_cache_destroy(void)
380 383
 /**
381 384
  *
382 385
  */
383
-int dbt_cache_print(int _f)
386
+int dbt_cache_print2(int _f, int _lock)
384 387
 {
385 388
 	int i;
386 389
 	dbt_table_p _tbc;
... ...
@@ -390,10 +393,12 @@ int dbt_cache_print(int _f)
390 393
 
391 394
 	for(i=0; i< DBT_CACHETBL_SIZE; i++)
392 395
 	{
393
-		lock_get(&_dbt_cachetbl[i].sem);
396
+		if(_lock)
397
+			lock_get(&_dbt_cachetbl[i].sem);
394 398
 		_tbc = _dbt_cachetbl[i].dtp;
395 399
 		while(_tbc)
396 400
 		{
401
+			if(! (_tbc->flag & DBT_TBFL_TEMP)) {
397 402
 			if(_f)
398 403
 				fprintf(stdout, "\n--- Database [%.*s]\n", _tbc->dbname.len,
399 404
 								_tbc->dbname.s);
... ...
@@ -412,14 +417,21 @@ int dbt_cache_print(int _f)
412 417
 					dbt_table_update_flags(_tbc,DBT_TBFL_MODI, DBT_FL_UNSET, 0);
413 418
 				}
414 419
 			}
420
+			}
415 421
 			_tbc = _tbc->next;
416 422
 		}
417
-		lock_release(&_dbt_cachetbl[i].sem);
423
+		if(_lock)
424
+			lock_release(&_dbt_cachetbl[i].sem);
418 425
 	}
419 426
 
420 427
 	return 0;
421 428
 }
422 429
 
430
+int dbt_cache_print(int _f)
431
+{
432
+	return dbt_cache_print2(_f, !is_main);
433
+}	
434
+
423 435
 int dbt_is_neq_type(db_type_t _t0, db_type_t _t1)
424 436
 {
425 437
 	// LM_DBG("t0=%d t1=%d!\n", _t0, _t1);
... ...
@@ -461,3 +473,47 @@ int dbt_is_neq_type(db_type_t _t0, db_type_t _t1)
461 473
 	return 1;
462 474
 }
463 475
 
476
+static int tmp_table_number = 0;
477
+
478
+dbt_table_p dbt_db_get_temp_table(dbt_cache_p _dc)
479
+{
480
+	dbt_table_p _tbc = NULL;
481
+	str _s;
482
+	char buf[30];
483
+	int hash;
484
+	int hashidx;
485
+
486
+
487
+	if(!_dbt_cachetbl || !_dc) {
488
+		LM_ERR("invalid parameter\n");
489
+		return NULL;
490
+	}
491
+
492
+	sprintf(buf, "tmp-%i-%i", my_pid(), ++tmp_table_number);
493
+	_s.s = buf;
494
+	_s.len = strlen(buf);
495
+
496
+	hash = core_hash(&_dc->name, &_s, DBT_CACHETBL_SIZE);
497
+	hashidx = hash % DBT_CACHETBL_SIZE;
498
+
499
+	lock_get(&_dbt_cachetbl[hashidx].sem);
500
+
501
+	_tbc = _dbt_cachetbl[hashidx].dtp;
502
+
503
+
504
+
505
+	_tbc = dbt_table_new(&_s, &(_dc->name), NULL);
506
+
507
+	_tbc->hash = hash;
508
+	_tbc->next = _dbt_cachetbl[hashidx].dtp;
509
+	if(_dbt_cachetbl[hashidx].dtp)
510
+		_dbt_cachetbl[hashidx].dtp->prev = _tbc;
511
+
512
+	_dbt_cachetbl[hashidx].dtp = _tbc;
513
+
514
+	dbt_table_update_flags(_tbc, DBT_TBFL_TEMP, DBT_FL_SET, 0);
515
+
516
+
517
+	lock_release(&_dbt_cachetbl[hashidx].sem);
518
+	return _tbc;
519
+}
... ...
@@ -35,6 +35,7 @@
35 35
 
36 36
 #define DBT_TBFL_ZERO	0
37 37
 #define DBT_TBFL_MODI	1
38
+#define DBT_TBFL_TEMP	2
38 39
 
39 40
 #define DBT_FL_IGN		-1
40 41
 #define DBT_FL_SET		0
... ...
@@ -50,6 +51,7 @@
50 51
 extern int db_mode; /* Database usage mode: 0 = no cache, 1 = cache */
51 52
 extern int empty_string; /* If TRUE, an empty string is an empty string, otherwise NULL */
52 53
 extern int _db_text_read_buffer_size; /* size of the buffer to allocate when reading file */
54
+extern int _db_text_max_result_rows; /* max result rows */
53 55
 
54 56
 typedef db_val_t dbt_val_t, *dbt_val_p;
55 57
 
... ...
@@ -109,6 +111,7 @@ typedef struct _dbt_cache
109 111
 int dbt_init_cache(void);
110 112
 int dbt_cache_destroy(void);
111 113
 int dbt_cache_print(int);
114
+int dbt_cache_print2(int, int);
112 115
 
113 116
 dbt_cache_p dbt_cache_get_db(str*);
114 117
 int dbt_cache_check_db(str*);
... ...
@@ -121,12 +124,13 @@ int dbt_cache_free(dbt_cache_p);
121 124
 dbt_column_p dbt_column_new(char*, int);
122 125
 dbt_row_p dbt_row_new(int);
123 126
 dbt_table_p dbt_table_new(const str*, const str*, const char*);
127
+dbt_table_p dbt_db_get_temp_table(dbt_cache_p _dc);
124 128
 
125 129
 int dbt_row_free(dbt_table_p, dbt_row_p);
126 130
 int dbt_column_free(dbt_column_p);
127 131
 int dbt_table_free_rows(dbt_table_p);
128 132
 int dbt_table_free(dbt_table_p);
129
-
133
+int dbt_db_del_table(dbt_cache_p _dc, const str *_s, int sync);
130 134
 
131 135
 int dbt_row_set_val(dbt_row_p, dbt_val_p, int, int);
132 136
 int dbt_row_update_val(dbt_row_p, dbt_val_p, int, int);
... ...
@@ -92,7 +92,7 @@ clean:
92 92
 	return NULL;
93 93
 }
94 94
 
95
-int dbt_result_free(dbt_result_p _dres)
95
+int _dbt_result_free(dbt_result_p _dres)
96 96
 {
97 97
 	dbt_row_p _rp=NULL, _rp0=NULL;
98 98
 	int i;
... ...
@@ -134,6 +134,22 @@ int dbt_result_free(dbt_result_p _dres)
134 134
 	return 0;
135 135
 }
136 136
 
137
+int dbt_result_free(db1_con_t* _h, dbt_table_p _dres)
138
+{
139
+	if ((!_h))
140
+	{
141
+		LM_ERR("invalid parameter value\n");
142
+		return -1;
143
+	}
144
+
145
+	if (!_dres)
146
+		return 0;
147
+
148
+	dbt_db_del_table(DBT_CON_CONNECTION(_h), &_dres->name, 1);
149
+
150
+	return 0;
151
+}
152
+
137 153
 int dbt_result_add_row(dbt_result_p _dres, dbt_row_p _drp)
138 154
 {
139 155
 	if(!_dres || !_drp)
... ...
@@ -316,45 +332,48 @@ clean:
316 332
 	return -1;
317 333
 }
318 334
 
319
-int dbt_result_print(dbt_result_p _dres)
335
+int dbt_result_print(dbt_table_p _dres)
320 336
 {
321
-#if 0
322 337
 	int i;
323
-	FILE *fout = stdout;
338
+	FILE *fout = stderr;
324 339
 	dbt_row_p rowp = NULL;
325 340
 	char *p;
326 341
 
327
-	if(!_dres || _dres->nrcols<=0)
342
+	if(!_dres || _dres->nrcols<=0) {
343
+		LM_INFO("NO PRINT\n");
328 344
 		return -1;
345
+	}
329 346
 
330 347
 	fprintf(fout, "\nContent of result\n");
331 348
 
332 349
 	for(i=0; i<_dres->nrcols; i++)
333 350
 	{
334
-		switch(_dres->colv[i].type)
351
+		switch(_dres->colv[i]->type)
335 352
 		{
336 353
 			case DB1_INT:
337
-				fprintf(fout, "%.*s(int", _dres->colv[i].name.len,
338
-								_dres->colv[i].name.s);
339
-				if(_dres->colv[i].flag & DBT_FLAG_NULL)
354
+				fprintf(fout, "%.*s(int", _dres->colv[i]->name.len,
355
+								_dres->colv[i]->name.s);
356
+				if(_dres->colv[i]->flag & DBT_FLAG_NULL)
340 357
 					fprintf(fout, ",null");
341 358
 				fprintf(fout, ") ");
342 359
 			break;
343 360
 			case DB1_DOUBLE:
344
-				fprintf(fout, "%.*s(double", _dres->colv[i].name.len,
345
-							_dres->colv[i].name.s);
346
-				if(_dres->colv[i].flag & DBT_FLAG_NULL)
361
+				fprintf(fout, "%.*s(double", _dres->colv[i]->name.len,
362
+							_dres->colv[i]->name.s);
363
+				if(_dres->colv[i]->flag & DBT_FLAG_NULL)
347 364
 					fprintf(fout, ",null");
348 365
 				fprintf(fout, ") ");
349 366
 			break;
350 367
 			case DB1_STR:
351
-				fprintf(fout, "%.*s(str", _dres->colv[i].name.len,
352
-						_dres->colv[i].name.s);
353
-				if(_dres->colv[i].flag & DBT_FLAG_NULL)
368
+			case DB1_STRING:
369
+				fprintf(fout, "%.*s(str", _dres->colv[i]->name.len,
370
+						_dres->colv[i]->name.s);
371
+				if(_dres->colv[i]->flag & DBT_FLAG_NULL)
354 372
 					fprintf(fout, ",null");
355 373
 				fprintf(fout, ") ");
356 374
 			break;
357 375
 			default:
376
+				LM_INFO("TYPE NOT HANDLED %i\n", _dres->colv[i]->type);
358 377
 				return -1;
359 378
 		}
360 379
 	}
... ...
@@ -364,7 +383,7 @@ int dbt_result_print(dbt_result_p _dres)
364 383
 	{
365 384
 		for(i=0; i<_dres->nrcols; i++)
366 385
 		{
367
-			switch(_dres->colv[i].type)
386
+			switch(_dres->colv[i]->type)
368 387
 			{
369 388
 				case DB1_INT:
370 389
 					if(rowp->fields[i].nul)
... ...
@@ -381,6 +400,7 @@ int dbt_result_print(dbt_result_p _dres)
381 400
 								rowp->fields[i].val.double_val);
382 401
 				break;
383 402
 				case DB1_STR:
403
+				case DB1_STRING:
384 404
 					fprintf(fout, "\"");
385 405
 					if(!rowp->fields[i].nul)
386 406
 					{
... ...
@@ -423,7 +443,6 @@ int dbt_result_print(dbt_result_p _dres)
423 443
 		fprintf(fout, "\n");
424 444
 		rowp = rowp->next;
425 445
 	}
426
-#endif
427 446
 
428 447
 	return 0;
429 448
 }
... ...
@@ -526,6 +545,29 @@ dbt_row_p dbt_result_new_row(dbt_result_p _dres)
526 545
 	return _drp;
527 546
 }
528 547
 
548
+//dbt_row_p dbt_result_new_rows(dbt_row_p* _res, int rows, int cols)
549
+//{
550
+//	dbt_row_p _drp = NULL;
551
+//	if(!_dres || _dres->nrcols<=0)
552
+//		return NULL;
553
+//
554
+//	_drp = (dbt_row_p)shm_malloc(sizeof(dbt_row_t) * rows);
555
+//	if(!_drp)
556
+//		return NULL;
557
+//	memset(_drp, 0, sizeof(dbt_row_t));
558
+//	_drp->fields = (dbt_val_p)shm_malloc(_dres->nrcols*sizeof(dbt_val_t));
559
+//	if(!_drp->fields)
560
+//	{
561
+//		shm_free(_drp);
562
+//		return NULL;
563
+//	}
564
+//	memset(_drp->fields, 0, _dres->nrcols*sizeof(dbt_val_t));
565
+//
566
+//	_drp->next = _drp->prev = NULL;
567
+//
568
+//	return _drp;
569
+//}
570
+
529 571
 
530 572
 /* The _o clause to query is not really a db_key_t, it is SQL (str).
531 573
  * db_mysql and db_postgres simply paste it into SQL, we need to parse it. */
... ...
@@ -824,3 +866,129 @@ void dbt_project_result(dbt_result_p _dres, int _o_nc)
824 866
 	_dres->nrcols -= _o_nc;
825 867
 }
826 868
 
869
+/* comparison function for qsort */
870
+int dbt_qsort_compare_temp(const void *_a, const void *_b)
871
+{
872
+	int _i, _j, _r;
873
+
874
+	for (_i=0; _i<dbt_sort_o_n; _i++)
875
+	{
876
+		_j = dbt_sort_o_l[_i];
877
+		_r = dbt_cmp_val(&(*(dbt_row_p *)_a)->fields[_j], &(*(dbt_row_p *)_b)->fields[_j]);
878
+		if (_r == 0)
879
+			continue; /* no result yet, compare next column */
880
+		if (_r == +1 || _r == -1)
881
+			return (dbt_sort_o_op[_i] == '<') ? _r : -_r; /* ASC OR DESC */
882
+		/* error */
883
+		longjmp(dbt_sort_jmpenv, _r);
884
+	}
885
+
886
+	/* no result after comparing all columns, same */
887
+	return 0;
888
+}
889
+
890
+int dbt_sort_result_temp(dbt_row_p *_res, int count, int *_o_l, char *_o_op, int _o_n)
891
+{
892
+	int _i;
893
+
894
+	/* set globals */
895
+	dbt_sort_o_l = _o_l;
896
+	dbt_sort_o_op = _o_op;
897
+	dbt_sort_o_n = _o_n;
898
+	_i = setjmp(dbt_sort_jmpenv);  /* exception handling */
899
+	if (_i)
900
+	{
901
+		/* error occured during qsort */
902
+		LM_ERR("qsort aborted\n");
903
+		return _i;
904
+	}
905
+
906
+	qsort(_res, count, sizeof(dbt_row_p), &dbt_qsort_compare_temp);
907
+
908
+	return 0;
909
+}
910
+
911
+dbt_row_p dbt_result_extract_results(dbt_table_p _dtp, dbt_row_p* pRows, int _nrows, int* _lres, int _ncols)
912
+{
913
+	dbt_row_p pRow=NULL;
914
+	dbt_row_p pTopRow=NULL;
915
+	dbt_row_p pPrvRow=NULL;
916
+	int i, n, r;
917
+
918
+	if(!_dtp || !pRows || _ncols<=0)
919
+		return NULL;
920
+
921
+	for(r=0; r < _nrows; r++) {
922
+		pRow = dbt_row_new(_ncols);
923
+
924
+		for(i=0; i<_ncols; i++)
925
+		{
926
+			n = _lres[i];
927
+			pRow->fields[i].nul = pRows[r]->fields[n].nul;
928
+			if(pRow->fields[i].nul)
929
+			{
930
+				memset(&(pRow->fields[i].val), 0, sizeof(pRow->fields[i].val));
931
+				continue;
932
+			}
933
+
934
+			switch(_dtp->colv[n]->type)
935
+			{
936
+				case DB1_INT:
937
+				case DB1_DATETIME:
938
+				case DB1_BITMAP:
939
+					pRow->fields[i].type = _dtp->colv[n]->type;
940
+					pRow->fields[i].val.int_val = pRows[r]->fields[n].val.int_val;
941
+				break;
942
+				case DB1_DOUBLE:
943
+					pRow->fields[i].type = DB1_DOUBLE;
944
+					pRow->fields[i].val.double_val=pRows[r]->fields[n].val.double_val;
945
+				break;
946
+				case DB1_STRING:
947
+				case DB1_STR:
948
+				case DB1_BLOB:
949
+					pRow->fields[i].type = _dtp->colv[n]->type;
950
+					pRow->fields[i].val.str_val.len =
951
+							pRows[r]->fields[n].val.str_val.len;
952
+					pRow->fields[i].val.str_val.s =(char*)shm_malloc(sizeof(char)*
953
+							(pRows[r]->fields[n].val.str_val.len+1));
954
+					if(!pRow->fields[i].val.str_val.s)
955
+						goto clean;
956
+					memcpy(pRow->fields[i].val.str_val.s,
957
+							pRows[r]->fields[n].val.str_val.s,
958
+							pRows[r]->fields[n].val.str_val.len);
959
+					pRow->fields[i].val.str_val.s[pRows[r]->fields[n].val.str_val.len]=0;
960
+				break;
961
+				default:
962
+					goto clean;
963
+			}
964
+		}
965
+
966
+		if(pTopRow == NULL) {
967
+			pTopRow = pRow;
968
+		} else {
969
+			pRow->prev = pPrvRow;
970
+			pPrvRow->next = pRow;
971
+		}
972
+		pPrvRow = pRow;
973
+	}
974
+
975
+	return pTopRow;
976
+
977
+clean:
978
+	LM_DBG("make clean!\n");
979
+	while(i>=0)
980
+	{
981
+		if((pRow->fields[i].type == DB1_STRING
982
+					|| pRow->fields[i].type == DB1_STR
983
+					|| pRow->fields[i].type == DB1_BLOB)
984
+				&& !pRow->fields[i].nul
985
+				&& pRow->fields[i].val.str_val.s)
986
+			shm_free(pRow->fields[i].val.str_val.s);
987
+
988
+		i--;
989
+	}
990
+	shm_free(pRow->fields);
991
+	shm_free(pRow);
992
+
993
+	return pTopRow;
994
+}
... ...
@@ -34,6 +34,7 @@ typedef struct _dbt_result
34 34
 {
35 35
 	int nrcols;
36 36
 	int nrrows;
37
+	int last_row;
37 38
 	dbt_column_p colv;
38 39
 	dbt_row_p rows;
39 40
 } dbt_result_t, *dbt_result_p;
... ...
@@ -42,17 +43,22 @@ typedef struct _dbt_con
42 43
 {
43 44
 	dbt_cache_p con;
44 45
 	int affected;
46
+	dbt_table_p last_query;
45 47
 } dbt_con_t, *dbt_con_p;
46 48
 
47 49
 #define DBT_CON_CONNECTION(db_con) (((dbt_con_p)((db_con)->tail))->con)
50
+#define DBT_CON_TEMP_TABLE(db_con) (((dbt_con_p)((db_con)->tail))->last_query)
48 51
 
49 52
 dbt_result_p dbt_result_new(dbt_table_p, int*, int);
50
-int dbt_result_free(dbt_result_p);
53
+
54
+//int dbt_result_free(dbt_result_p);
55
+int dbt_result_free(db1_con_t* _h, dbt_table_p _dres);
56
+
51 57
 int dbt_row_match(dbt_table_p _dtp, dbt_row_p _drp, int* _lkey,
52 58
 				db_op_t* _op, db_val_t* _v, int _n);
53 59
 int dbt_result_extract_fields(dbt_table_p _dtp, dbt_row_p _drp,
54 60
 				int* lres, dbt_result_p _dres);
55
-int dbt_result_print(dbt_result_p _dres);
61
+int dbt_result_print(dbt_table_p _dres);
56 62
 
57 63
 int* dbt_get_refs(dbt_table_p, db_key_t*, int);
58 64
 int dbt_cmp_val(dbt_val_p _vp, db_val_t* _v);
... ...
@@ -63,5 +69,9 @@ int dbt_mangle_columnselection(int **_lres, int *_nc, int *_o_nc, int *_o_l, int
63 69
 int dbt_sort_result(dbt_result_p _dres, int *_o_l, char *_o_op, int _o_n, int *_lres, int _nc);
64 70
 void dbt_project_result(dbt_result_p _dres, int _o_nc);
65 71
 
72
+int dbt_qsort_compare_temp(const void *_a, const void *_b);
73
+int dbt_sort_result_temp(dbt_row_p *_res, int count, int *_o_l, char *_o_op, int _o_n);
74
+dbt_row_p dbt_result_extract_results(dbt_table_p _dtp, dbt_row_p* pRows, int _nrows, int* _lres, int _ncols);
75
+
66 76
 #endif
67 77
 
... ...
@@ -137,7 +137,7 @@ dbt_table_p dbt_table_new(const str *_tbname, const str *_dbname, const char *pa
137 137
 {
138 138
 	struct stat s;
139 139
 	dbt_table_p dtp = NULL;
140
-	if(!_tbname || !_dbname || !path)
140
+	if(!_tbname || !_dbname)
141 141
 		return NULL;
142 142
 
143 143
 	dtp = (dbt_table_p)shm_malloc(sizeof(dbt_table_t));
... ...
@@ -175,7 +175,7 @@ dbt_table_p dbt_table_new(const str *_tbname, const str *_dbname, const char *pa
175 175
 	dtp->nrrows = dtp->nrcols = dtp->auto_val = 0;
176 176
 	dtp->auto_col = -1;
177 177
 	dtp->mt = 0;
178
-	if(stat(path, &s) == 0)
178
+	if(path && stat(path, &s) == 0)
179 179
 	{
180 180
 		dtp->mt = s.st_mtime;
181 181
 		LM_DBG("mtime is %d\n", (int)s.st_mtime);