Browse code

tcp: send_flags support

Support for SND_F_FORCE_CON_REUSE and SND_F_CON_CLOSE added to the
tcp code.

Andrei Pelinescu-Onciul authored on 15/09/2009 15:45:41
Showing 2 changed files
... ...
@@ -170,7 +170,8 @@ struct tcp_connection{
170 170
 	struct tcp_req req; /* request data */
171 171
 	atomic_t refcnt;
172 172
 	enum sip_protos type; /* PROTO_TCP or a protocol over it, e.g. TLS */
173
-	int flags; /* connection related flags */
173
+	unsigned short flags; /* connection related flags */
174
+	unsigned short send_flags; /* special send flags */
174 175
 	enum tcp_conn_states state; /* connection state */
175 176
 	void* extra_data; /* extra data associated to the connection, 0 for tcp*/
176 177
 	struct timer_ln timer;
... ...
@@ -190,6 +191,10 @@ struct tcp_connection{
190 190
 
191 191
 /* helper macros */
192 192
 
193
+#define tcpconn_set_send_flags(c, snd_flags) ((c)->send_flags|=(snd_flags))
194
+
195
+#define tcpconn_close_after_send(c)	((c)->send_flags & SND_F_CON_CLOSE)
196
+
193 197
 #define TCP_RCV_INFO(c) (&(c)->rcv)
194 198
 
195 199
 #define TCP_RCV_LADDR(r) (&((r).dst_ip))
... ...
@@ -100,7 +100,9 @@
100 100
  *  2009-02-26  direct blacklist support (andrei)
101 101
  *  2009-03-20  s/wq_timeout/send_timeout ; send_timeout is now in ticks
102 102
  *              (andrei)
103
- * 2009-04-09  tcp ev and tcp stats macros added (andrei)
103
+ *  2009-04-09  tcp ev and tcp stats macros added (andrei)
104
+ *  2009-09-15  support for force connection reuse and close after send
105
+ *               send flags (andrei)
104 106
  */
105 107
 
106 108
 
... ...
@@ -1762,18 +1764,24 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
1762 1762
 			if (likely(port)){
1763 1763
 				/* try again w/o id */
1764 1764
 				c=tcpconn_get(0, &ip, port, from, con_lifetime);
1765
-				goto no_id;
1766 1765
 			}else{
1767 1766
 				LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
1768 1767
 						dst->id);
1769 1768
 				return -1;
1770 1769
 			}
1771
-		}else goto get_fd;
1770
+		}
1772 1771
 	}
1773
-no_id:
1774
-		if (unlikely(c==0)){
1772
+/* no_id: */
1773
+		if (unlikely((c==0) || tcpconn_close_after_send(c))){
1774
+			if (unlikely(c)){
1775
+				/* can't use c if it's marked as close-after-send  =>
1776
+				   release it and try opening new one */
1777
+				tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
1778
+				c=0;
1779
+			}
1775 1780
 			/* check if connect() is disabled */
1776
-			if (cfg_get(tcp, tcp_cfg, no_connect))
1781
+			if (unlikely((dst->send_flags & SND_F_FORCE_CON_REUSE) ||
1782
+							cfg_get(tcp, tcp_cfg, no_connect)))
1777 1783
 				return -1;
1778 1784
 			DBG("tcp_send: no open tcp connection found, opening new one\n");
1779 1785
 			/* create tcp connection */
... ...
@@ -1814,6 +1822,7 @@ no_id:
1814 1814
 					return -1;
1815 1815
 				}
1816 1816
 				c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
1817
+				tcpconn_set_send_flags(c, dst->send_flags);
1817 1818
 				atomic_set(&c->refcnt, 2); /* ref from here and from main hash
1818 1819
 											 table */
1819 1820
 				/* add it to id hash and aliases */
... ...
@@ -1918,6 +1927,14 @@ no_id:
1918 1918
 				}
1919 1919
 				LOG(L_INFO, "tcp_send: quick connect for %p\n", c);
1920 1920
 				TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
1921
+				if (unlikely(dst->send_flags & SND_F_CON_CLOSE)){
1922
+					/* if close-after-send requested, don't bother
1923
+					   sending the fd back to tcp_main, try closing it
1924
+					   immediately (no other tcp_send should use it,
1925
+					   because it is marked as close-after-send before
1926
+					   being added to the hash */
1927
+					goto conn_wait_close;
1928
+				}
1921 1929
 				c->state=S_CONN_OK;
1922 1930
 				/* send to tcp_main */
1923 1931
 				response[0]=(long)c;
... ...
@@ -1938,6 +1955,7 @@ no_id:
1938 1938
 								su2a(&dst->to, sizeof(dst->to)));
1939 1939
 				return -1;
1940 1940
 			}
1941
+			tcpconn_set_send_flags(c, dst->send_flags);
1941 1942
 			if (likely(c->state==S_CONN_OK))
1942 1943
 				TCP_STATS_ESTABLISHED(S_CONN_CONNECT);
1943 1944
 			atomic_set(&c->refcnt, 2); /* ref. from here and it will also
... ...
@@ -1962,7 +1980,7 @@ no_id:
1962 1962
 			}
1963 1963
 			goto send_it;
1964 1964
 		}
1965
-get_fd:
1965
+/* get_fd: */
1966 1966
 #ifdef TCP_ASYNC
1967 1967
 		/* if data is already queued, we don't need the fd any more */
1968 1968
 		if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
... ...
@@ -2048,6 +2066,8 @@ get_fd:
2048 2048
 send_it:
2049 2049
 	DBG("tcp_send: sending...\n");
2050 2050
 	lock_get(&c->write_lock);
2051
+	/* update connection send flags with the current ones */
2052
+	tcpconn_set_send_flags(c, dst->send_flags);
2051 2053
 #ifdef TCP_ASYNC
2052 2054
 	if (likely(cfg_get(tcp, tcp_cfg, async))){
2053 2055
 		if (_wbufq_non_empty(c)
... ...
@@ -2203,6 +2223,31 @@ error:
2203 2203
 			TCP_STATS_ESTABLISHED(c->state);
2204 2204
 			c->state=S_CONN_OK;
2205 2205
 	}
2206
+	if (unlikely(dst->send_flags & SND_F_CON_CLOSE)){
2207
+		/* close after write => send EOF request to tcp_main */
2208
+		c->state=S_CONN_BAD;
2209
+		c->timeout=get_ticks_raw();
2210
+		/* tell "main" it should drop this*/
2211
+		response[0]=(long)c;
2212
+		response[1]=CONN_EOF;
2213
+		if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
2214
+			LOG(L_CRIT, "BUG: tcp_send: error return failed (write):%s (%d)\n",
2215
+					strerror(errno), errno);
2216
+			tcpconn_chld_put(c); /* deref. it manually */
2217
+			n=-1;
2218
+		}
2219
+		/* CONN_EOF will auto-dec refcnt => we must not call tcpconn_put 
2220
+		 * if it succeeds */
2221
+#ifdef TCP_FD_CACHE
2222
+		if (unlikely(fd_cache_e)){
2223
+			tcp_fd_cache_rm(fd_cache_e);
2224
+			fd_cache_e=0;
2225
+			close(fd);
2226
+		}else
2227
+#endif /* TCP_FD_CACHE */
2228
+		if (do_close_fd) close(fd);
2229
+		goto end_no_conn;
2230
+	}
2206 2231
 end:
2207 2232
 #ifdef TCP_FD_CACHE
2208 2233
 	if (unlikely((fd_cache_e==0) && use_fd_cache)){
... ...
@@ -2216,11 +2261,14 @@ end_no_conn:
2216 2216
 	return n;
2217 2217
 #ifdef TCP_CONNECT_WAIT
2218 2218
 conn_wait_error:
2219
-	/* connect or send failed on newly created connection which was not
2220
-	 * yet sent to tcp_main (but was already hashed) => don't send to main,
2221
-	 * unhash and destroy directly (if refcnt>2 it will be destroyed when the 
2222
-	 * last sender releases the connection (tcpconn_chld_put(c))) or when
2223
-	 * tcp_main receives a CONN_ERROR it*/
2219
+	n=-1;
2220
+conn_wait_close:
2221
+	/* connect or send failed or immediate close-after-send was requested on
2222
+	 * newly created connection which was not yet sent to tcp_main (but was
2223
+	 * already hashed) => don't send to main, unhash and destroy directly
2224
+	 * (if refcnt>2 it will be destroyed when the last sender releases the
2225
+	 * connection (tcpconn_chld_put(c))) or when tcp_main receives a
2226
+	 * CONN_ERROR it*/
2224 2227
 	c->state=S_CONN_BAD;
2225 2228
 	TCPCONN_LOCK;
2226 2229
 		if (c->flags & F_CONN_HASHED){
... ...
@@ -2234,7 +2282,7 @@ conn_wait_error:
2234 2234
 			TCPCONN_UNLOCK;
2235 2235
 	/* dec refcnt -> mark it for destruction */
2236 2236
 	tcpconn_chld_put(c);
2237
-	return -1;
2237
+	return n;
2238 2238
 #endif /* TCP_CONNET_WAIT */
2239 2239
 }
2240 2240
 
... ...
@@ -3025,11 +3073,12 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
3025 3025
 			LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
3026 3026
 					" (id %d), refcnt %d\n", 
3027 3027
 					tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt));
3028
+		case CONN_EOF: /* forced EOF after full send, due to send flags */
3028 3029
 #ifdef TCP_CONNECT_WAIT
3029 3030
 			/* if the connection is pending => it might be on the way of
3030 3031
 			 * reaching tcp_main (e.g. CONN_NEW_COMPLETE or 
3031 3032
 			 *  CONN_NEW_PENDING_WRITE) =>  it cannot be destroyed here */
3032
-			if ( !(tcpconn->flags & F_CONN_PENDING) && 
3033
+			if ( !(tcpconn->flags & F_CONN_PENDING) &&
3033 3034
 					tcpconn_try_unhash(tcpconn) )
3034 3035
 				tcpconn_put(tcpconn);
3035 3036
 #else /* ! TCP_CONNECT_WAIT */
... ...
@@ -3155,7 +3204,7 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
3155 3155
 					}
3156 3156
 				}
3157 3157
 			}else{
3158
-				LOG(L_WARN, "tcp_main: hanlder_ser_child: connection %p"
3158
+				LOG(L_WARN, "tcp_main: handler_ser_child: connection %p"
3159 3159
 							" already watched for write\n", tcpconn);
3160 3160
 			}
3161 3161
 			break;
... ...
@@ -3466,8 +3515,10 @@ inline static int handle_tcpconn_ev(struct tcp_connection* tcpconn, short ev,
3466 3466
 	empty_q=0; /* warning fix */
3467 3467
 	if (unlikely((ev & (POLLOUT|POLLERR|POLLHUP)) &&
3468 3468
 					(tcpconn->flags & F_CONN_WRITE_W))){
3469
-		if (unlikely((ev & (POLLERR|POLLHUP)) || 
3470
-					(wbufq_run(tcpconn->s, tcpconn, &empty_q)<0))){
3469
+		if (unlikely((ev & (POLLERR|POLLHUP)) ||
3470
+					(wbufq_run(tcpconn->s, tcpconn, &empty_q)<0) ||
3471
+					(empty_q && tcpconn_close_after_send(tcpconn))
3472
+			)){
3471 3473
 			if (unlikely(io_watch_del(&io_h, tcpconn->s, fd_i, 0)<0)){
3472 3474
 				LOG(L_ERR, "ERROR: handle_tcpconn_ev: io_watch_del(1) failed:"
3473 3475
 							" for %p, fd %d\n", tcpconn, tcpconn->s);