Browse code

db_postgres: add new module vars "timeout" and "tcp_keepalive"

The "timeout" option enables a query timeout as well as a connection
timeout, set to the specified number of seconds.

The "tcp_keepalive" option enables the kernel's TCP keepalive packets
on the PGSQL socket and sets the keepalive timer to the specified number
of seconds. Only supported if the platform supports the TCP_KEEPIDLE
socket option.

Richard Fuchs authored on 16/12/2014 14:10:10
Showing 7 changed files
... ...
@@ -24,12 +24,16 @@ Greg Fausak
24 24
         3. Parameters
25 25
 
26 26
               3.1. retries (integer)
27
+              3.2. timeout (integer)
28
+              3.3. tcp_keepalive (integer)
27 29
 
28 30
         4. Functions
29 31
 
30 32
    List of Examples
31 33
 
32 34
    1.1. Set retries parameter
35
+   1.2. Set timeout parameter
36
+   1.3. Set tcp_keepalive parameter
33 37
 
34 38
 Chapter 1. Admin Guide
35 39
 
... ...
@@ -44,6 +48,8 @@ Chapter 1. Admin Guide
44 44
    3. Parameters
45 45
 
46 46
         3.1. retries (integer)
47
+        3.2. timeout (integer)
48
+        3.3. tcp_keepalive (integer)
47 49
 
48 50
    4. Functions
49 51
 
... ...
@@ -73,6 +79,8 @@ Chapter 1. Admin Guide
73 73
 3. Parameters
74 74
 
75 75
    3.1. retries (integer)
76
+   3.2. timeout (integer)
77
+   3.3. tcp_keepalive (integer)
76 78
 
77 79
 3.1. retries (integer)
78 80
 
... ...
@@ -88,6 +96,37 @@ Chapter 1. Admin Guide
88 88
 modparam("db_postgres", "retries", 3)
89 89
 ...
90 90
 
91
+3.2. timeout (integer)
92
+
93
+   Setting this variable to any value larger than zero (which is the
94
+   default value) enables both a connection timeout and a query timeout.
95
+   If a connection attempt or a query takes longer than this many seconds,
96
+   the operation will be aborted and an error will be returned.
97
+
98
+   Note that this timeout is applied to each underlying operation (i.e.
99
+   for each connection attempt), so depending on circumstances and on the
100
+   value of the “retries” variable, a single query from the SIP proxy's
101
+   point of view can take longer than the “timeout”.
102
+
103
+   Example 1.2. Set timeout parameter
104
+...
105
+modparam("db_postgres", "timeout", 10)
106
+...
107
+
108
+3.3. tcp_keepalive (integer)
109
+
110
+   Enable the TCP keepalive timer and set the number of seconds the
111
+   connection must be idle before to start sending keepalive packets.
112
+   Defaults to zero, which disables TCP keepalive packets.
113
+
114
+   Only supported on platforms which understand and support the
115
+   “TCP_KEEPIDLE” socket option.
116
+
117
+   Example 1.3. Set tcp_keepalive parameter
118
+...
119
+modparam("db_postgres", "tcp_keepalive", 600)
120
+...
121
+
91 122
 4. Functions
92 123
 
93 124
    NONE
... ...
@@ -81,6 +81,55 @@ modparam("db_postgres", "retries", 3)
81 81
 </programlisting>
82 82
 		</example>
83 83
 	</section>
84
+
85
+	<section>
86
+		<title><varname>timeout</varname> (integer)</title>
87
+		<para>
88
+			Setting this variable to any value larger than zero (which is the
89
+			default value) enables both a connection timeout and a query
90
+			timeout. If a connection attempt or a query takes longer than this
91
+			many seconds, the operation will be aborted and an error will be
92
+			returned.
93
+		</para>
94
+		<para>
95
+			Note that this timeout is applied to each underlying operation
96
+			(i.e. for each connection attempt), so depending on circumstances
97
+			and on the value of the <quote>retries</quote> variable, a single
98
+			query from the &sip; proxy's point of view can take longer than the
99
+			<quote>timeout</quote>.
100
+		</para>
101
+		<example>
102
+		<title>Set <varname>timeout</varname> parameter</title>
103
+		<programlisting format="linespecific">
104
+...
105
+modparam("db_postgres", "timeout", 10)
106
+...
107
+</programlisting>
108
+		</example>
109
+	</section>
110
+
111
+	<section>
112
+		<title><varname>tcp_keepalive</varname> (integer)</title>
113
+		<para>
114
+			Enable the TCP keepalive timer and set the number of seconds the
115
+			connection must be idle before to start sending keepalive packets.
116
+			Defaults to zero, which disables TCP keepalive packets.
117
+		</para>
118
+		<para>
119
+		<emphasis>
120
+			Only supported on platforms which understand and support the
121
+			<quote>TCP_KEEPIDLE</quote> socket option.
122
+		</emphasis>
123
+		</para>
124
+		<example>
125
+		<title>Set <varname>tcp_keepalive</varname> parameter</title>
126
+		<programlisting format="linespecific">
127
+...
128
+modparam("db_postgres", "tcp_keepalive", 600)
129
+...
130
+</programlisting>
131
+		</example>
132
+	</section>
84 133
 	</section>
85 134
 
86 135
 	<section>
... ...
@@ -167,6 +167,10 @@ static int db_postgres_submit_query(const db1_con_t* _con, const str* _s)
167 167
 	int i, retries;
168 168
 	ExecStatusType pqresult;
169 169
 	PGresult *res = NULL;
170
+	int sock, ret;
171
+	fd_set fds;
172
+	time_t max_time;
173
+	struct timeval wait_time;
170 174
 
171 175
 	if(! _con || !_s || !_s->s)
172 176
 	{
... ...
@@ -217,6 +221,44 @@ static int db_postgres_submit_query(const db1_con_t* _con, const str* _s)
217 217
 		/* exec the query */
218 218
 
219 219
 		if (PQsendQuery(CON_CONNECTION(_con), s)) {
220
+			if (pg_timeout <= 0)
221
+				goto do_read;
222
+
223
+			max_time = time(NULL) + pg_timeout;
224
+
225
+			while (1) {
226
+				sock = PQsocket(CON_CONNECTION(_con));
227
+				FD_ZERO(&fds);
228
+				FD_SET(sock, &fds);
229
+
230
+				wait_time.tv_usec = 0;
231
+				wait_time.tv_sec = max_time - time(NULL);
232
+				if (wait_time.tv_sec <= 0 || wait_time.tv_sec > 0xffffff)
233
+					goto timeout;
234
+
235
+				ret = select(sock + 1, &fds, NULL, NULL, &wait_time);
236
+				if (ret < 0) {
237
+					if (errno == EINTR)
238
+						continue;
239
+					LM_WARN("select() error\n");
240
+					goto reset;
241
+				}
242
+				if (!ret) {
243
+timeout:
244
+					LM_WARN("timeout waiting for postgres reply\n");
245
+					goto reset;
246
+				}
247
+
248
+				if (!PQconsumeInput(CON_CONNECTION(_con))) {
249
+					LM_WARN("error reading data from postgres server: %s\n",
250
+							PQerrorMessage(CON_CONNECTION(_con)));
251
+					goto reset;
252
+				}
253
+				if (!PQisBusy(CON_CONNECTION(_con)))
254
+					break;
255
+			}
256
+
257
+do_read:
220 258
 			/* Get the result of the query */
221 259
 			while ((res = PQgetResult(CON_CONNECTION(_con))) != NULL) {
222 260
 				db_postgres_free_query(_con);
... ...
@@ -239,6 +281,7 @@ static int db_postgres_submit_query(const db1_con_t* _con, const str* _s)
239 239
 				PQerrorMessage(CON_CONNECTION(_con)));
240 240
 		if(PQstatus(CON_CONNECTION(_con))!=CONNECTION_OK)
241 241
 		{
242
+reset:
242 243
 			LM_DBG("reseting the connection to postgress server\n");
243 244
 			PQreset(CON_CONNECTION(_con));
244 245
 		}
... ...
@@ -26,11 +26,14 @@
26 26
  */
27 27
 
28 28
 #include "km_pg_con.h"
29
+#include "pg_mod.h"
29 30
 #include "../../mem/mem.h"
30 31
 #include "../../dprint.h"
31 32
 #include "../../ut.h"
32 33
 #include <string.h>
33 34
 #include <time.h>
35
+#include <netinet/in.h>
36
+#include <netinet/tcp.h>
34 37
 
35 38
 
36 39
 /*!
... ...
@@ -45,6 +48,9 @@ struct pg_con* db_postgres_new_connection(struct db_id* id)
45 45
 {
46 46
 	struct pg_con* ptr;
47 47
 	char *ports;
48
+	int i = 0;
49
+	const char *keywords[10], *values[10];
50
+	char to[16];
48 51
 
49 52
 	LM_DBG("db_id = %p\n", id);
50 53
  
... ...
@@ -66,6 +72,8 @@ struct pg_con* db_postgres_new_connection(struct db_id* id)
66 66
 
67 67
 	if (id->port) {
68 68
 		ports = int2str(id->port, 0);
69
+		keywords[i] = "port";
70
+		values[i++] = ports;
69 71
 		LM_DBG("opening connection: postgres://xxxx:xxxx@%s:%d/%s\n", ZSW(id->host),
70 72
 			id->port, ZSW(id->database));
71 73
 	} else {
... ...
@@ -74,8 +82,24 @@ struct pg_con* db_postgres_new_connection(struct db_id* id)
74 74
 			ZSW(id->database));
75 75
 	}
76 76
 
77
- 	ptr->con = PQsetdbLogin(id->host, ports, NULL, NULL, id->database, id->username, id->password);
78
-	LM_DBG("PQsetdbLogin(%p)\n", ptr->con);
77
+	keywords[i] = "host";
78
+	values[i++] = id->host;
79
+	keywords[i] = "dbname";
80
+	values[i++] = id->database;
81
+	keywords[i] = "user";
82
+	values[i++] = id->username;
83
+	keywords[i] = "password";
84
+	values[i++] = id->password;
85
+	if (pg_timeout > 0) {
86
+		snprintf(to, sizeof(to)-1, "%d", pg_timeout + 3);
87
+		keywords[i] = "connect_timeout";
88
+		values[i++] = to;
89
+	}
90
+
91
+	keywords[i] = values[i] = NULL;
92
+
93
+	ptr->con = PQconnectdbParams(keywords, values, 1);
94
+	LM_DBG("PQconnectdbParams(%p)\n", ptr->con);
79 95
 
80 96
 	if( (ptr->con == 0) || (PQstatus(ptr->con) != CONNECTION_OK) )
81 97
 	{
... ...
@@ -88,6 +112,14 @@ struct pg_con* db_postgres_new_connection(struct db_id* id)
88 88
 	ptr->timestamp = time(0);
89 89
 	ptr->id = id;
90 90
 
91
+#if defined(SO_KEEPALIVE) && defined(TCP_KEEPIDLE)
92
+	if (pg_keepalive) {
93
+		i = 1;
94
+		setsockopt(PQsocket(ptr->con), SOL_SOCKET, SO_KEEPALIVE, &i, sizeof(i));
95
+		setsockopt(PQsocket(ptr->con), IPPROTO_TCP, TCP_KEEPIDLE, &pg_keepalive, sizeof(pg_keepalive));
96
+	}
97
+#endif
98
+
91 99
 	return ptr;
92 100
 
93 101
  err:
... ...
@@ -39,6 +39,7 @@
39 39
 #include "pg_con.h"
40 40
 #include "pg_uri.h"
41 41
 #include "pg_sql.h"
42
+#include "pg_mod.h"
42 43
 
43 44
 #include "../../mem/mem.h"
44 45
 #include "../../dprint.h"
... ...
@@ -47,6 +48,7 @@
47 47
 #include <stdlib.h>
48 48
 #include <string.h>
49 49
 #include <netinet/in.h>
50
+#include <netinet/tcp.h>
50 51
 #include <time.h>
51 52
 
52 53
 
... ...
@@ -237,7 +239,9 @@ int pg_con_connect(db_con_t* con)
237 237
 	struct pg_con* pcon;
238 238
 	struct pg_uri* puri;
239 239
 	char* port_str;
240
-	int ret;
240
+	int ret, i = 0;
241
+	const char *keywords[10], *values[10];
242
+	char to[16];
241 243
 	
242 244
 	pcon = DB_GET_PAYLOAD(con);
243 245
 	puri = DB_GET_PAYLOAD(con->uri);
... ...
@@ -251,6 +255,8 @@ int pg_con_connect(db_con_t* con)
251 251
 
252 252
 	if (puri->port > 0) {
253 253
 		port_str = int2str(puri->port, 0);
254
+		keywords[i] = "port";
255
+		values[i++] = port_str;
254 256
 	} else {
255 257
 		port_str = NULL;
256 258
 	}
... ...
@@ -260,12 +266,26 @@ int pg_con_connect(db_con_t* con)
260 260
 		pcon->con = NULL;
261 261
 	}
262 262
 
263
-	pcon->con = PQsetdbLogin(puri->host, port_str,
264
-							 NULL, NULL, puri->database,
265
-							 puri->username, puri->password);
263
+	keywords[i] = "host";
264
+	values[i++] = puri->host;
265
+	keywords[i] = "dbname";
266
+	values[i++] = puri->database;
267
+	keywords[i] = "user";
268
+	values[i++] = puri->username;
269
+	keywords[i] = "password";
270
+	values[i++] = puri->password;
271
+	if (pg_timeout > 0) {
272
+		snprintf(to, sizeof(to)-1, "%d", pg_timeout + 3);
273
+		keywords[i] = "connect_timeout";
274
+		values[i++] = to;
275
+	}
276
+
277
+	keywords[i] = values[i] = NULL;
278
+
279
+	pcon->con = PQconnectdbParams(keywords, values, 1);
266 280
 	
267 281
 	if (pcon->con == NULL) {
268
-		ERR("postgres: PQsetdbLogin ran out of memory\n");
282
+		ERR("postgres: PQconnectdbParams ran out of memory\n");
269 283
 		goto error;
270 284
 	}
271 285
 	
... ...
@@ -285,6 +305,14 @@ int pg_con_connect(db_con_t* con)
285 285
 	    PQprotocolVersion(pcon->con), 0 );
286 286
 #endif
287 287
 
288
+#if defined(SO_KEEPALIVE) && defined(TCP_KEEPIDLE)
289
+	if (pg_keepalive) {
290
+		i = 1;
291
+		setsockopt(PQsocket(pcon->con), SOL_SOCKET, SO_KEEPALIVE, &i, sizeof(i));
292
+		setsockopt(PQsocket(pcon->con), IPPROTO_TCP, TCP_KEEPIDLE, &pg_keepalive, sizeof(pg_keepalive));
293
+	}
294
+#endif
295
+
288 296
 	ret = timestamp_format(pcon->con);
289 297
 	if (ret == 1 || ret == -1) {
290 298
 		/* Assume INT8 representation if detection fails */
... ...
@@ -61,6 +61,8 @@ int pg_retries = 2;  /* How many times should the module try re-execute failed c
61 61
 					  * 0 disables reconnecting */
62 62
 
63 63
 int pg_lockset = 4;
64
+int pg_timeout = 0; /* default = no timeout */
65
+int pg_keepalive = 0;
64 66
 
65 67
 /*
66 68
  * Postgres module interface
... ...
@@ -92,6 +94,8 @@ static cmd_export_t cmds[] = {
92 92
 static param_export_t params[] = {
93 93
 	{"retries",         PARAM_INT, &pg_retries },
94 94
 	{"lockset",         PARAM_INT, &pg_lockset },
95
+	{"timeout",         PARAM_INT, &pg_timeout },
96
+	{"tcp_keepalive",   PARAM_INT, &pg_keepalive },
95 97
 	{0, 0, 0}
96 98
 };
97 99
 
... ...
@@ -41,6 +41,8 @@
41 41
  */
42 42
 
43 43
 extern int pg_retries;
44
+extern int pg_timeout;
45
+extern int pg_keepalive;
44 46
 
45 47
 /** @} */
46 48