Browse code

- tcp: close connection immediately if the write buf. timeouts (timeout fixes) + some cleanups

Andrei Pelinescu-Onciul authored on 12/12/2007 19:11:25
Showing 2 changed files
... ...
@@ -135,6 +135,7 @@ struct tcp_conn_alias{
135 135
 	struct tcp_wbuffer_queue{
136 136
 		struct tcp_wbuffer* first;
137 137
 		struct tcp_wbuffer* last;
138
+		ticks_t wr_timeout; /* write timeout*/
138 139
 		unsigned int queued; /* total size */
139 140
 		unsigned int offset; /* offset in the first wbuffer were data
140 141
 								starts */
... ...
@@ -167,7 +168,6 @@ struct tcp_connection{
167 167
 	struct tcp_conn_alias con_aliases[TCP_CON_MAX_ALIASES];
168 168
 	int aliases; /* aliases number, at least 1 */
169 169
 #ifdef TCP_BUF_WRITE
170
-	ticks_t last_write; /* time when the last write took place */
171 170
 	struct tcp_wbuffer_queue wbuf_q;
172 171
 #endif
173 172
 };
... ...
@@ -88,6 +88,7 @@
88 88
  *  2007-11-28  added support for TCP_DEFER_ACCEPT, KEEPALIVE, KEEPINTVL,
89 89
  *               KEEPCNT, QUICKACK, SYNCNT, LINGER2 (andrei)
90 90
  *  2007-12-04  support for queueing write requests (andrei)
91
+ *  2007-12-12  destroy connection asap on wbuf. timeout (andrei)
91 92
  */
92 93
 
93 94
 
... ...
@@ -558,7 +559,14 @@ inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
558 558
 #ifdef TCP_BUF_WRITE
559 559
 
560 560
 
561
-inline static int wbufq_add(struct  tcp_connection* c, char* data, 
561
+/* unsafe version */
562
+#define _wbufq_empty(con) ((con)->wbuf_q.first==0)
563
+/* unsafe version */
564
+#define _wbufq_non_empty(con) ((con)->wbuf_q.first!=0)
565
+
566
+
567
+/* unsafe version, call while holding the connection write lock */
568
+inline static int _wbufq_add(struct  tcp_connection* c, char* data, 
562 569
 							unsigned int size)
563 570
 {
564 571
 	struct tcp_wbuffer_queue* q;
... ...
@@ -573,11 +581,11 @@ inline static int wbufq_add(struct  tcp_connection* c, char* data,
573 573
 	if (unlikely(	((q->queued+size)>tcp_options.tcpconn_wq_max) ||
574 574
 					((*tcp_total_wq+size)>tcp_options.tcp_wq_max) ||
575 575
 					(q->first &&
576
-					TICKS_GT(t, c->last_write+tcp_options.tcp_wq_timeout)) )){
576
+					TICKS_LT(q->wr_timeout, t)) )){
577 577
 		LOG(L_ERR, "ERROR: wbufq_add(%d bytes): write queue full or timeout "
578 578
 					" (%d, total %d, last write %d s ago)\n",
579 579
 					size, q->queued, *tcp_total_wq,
580
-					TICKS_TO_S(t-c->last_write));
580
+					TICKS_TO_S(t-q->wr_timeout-tcp_options.tcp_wq_timeout));
581 581
 		goto error;
582 582
 	}
583 583
 	
... ...
@@ -592,7 +600,7 @@ inline static int wbufq_add(struct  tcp_connection* c, char* data,
592 592
 		q->first=wb;
593 593
 		q->last_used=0;
594 594
 		q->offset=0;
595
-		c->last_write=get_ticks_raw(); /* start with the crt. time */
595
+		q->wr_timeout=get_ticks_raw()+tcp_options.tcp_wq_timeout;
596 596
 	}else{
597 597
 		wb=q->last;
598 598
 	}
... ...
@@ -626,7 +634,8 @@ error:
626 626
 
627 627
 
628 628
 
629
-inline static void wbufq_destroy( struct  tcp_wbuffer_queue* q)
629
+/* unsafe version, call while holding the connection write lock */
630
+inline static void _wbufq_destroy( struct  tcp_wbuffer_queue* q)
630 631
 {
631 632
 	struct tcp_wbuffer* wb;
632 633
 	struct tcp_wbuffer* next_wb;
... ...
@@ -650,7 +659,7 @@ inline static void wbufq_destroy( struct  tcp_wbuffer_queue* q)
650 650
 
651 651
 
652 652
 
653
-/* tries to empty the queue
653
+/* tries to empty the queue  (safe version, c->write_lock must not be hold)
654 654
  * returns -1 on error, bytes written on success (>=0) 
655 655
  * if the whole queue is emptied => sets *empty*/
656 656
 inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
... ...
@@ -688,7 +697,7 @@ inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
688 688
 				atomic_add_int((int*)tcp_total_wq, -n);
689 689
 				break;
690 690
 			}
691
-			c->last_write=t;
691
+			q->wr_timeout=t+tcp_options.tcp_wq_timeout;
692 692
 			c->state=S_CONN_OK;
693 693
 		}else{
694 694
 			if (n<0){
... ...
@@ -1018,8 +1027,8 @@ static inline void _tcpconn_detach(struct tcp_connection *c)
1018 1018
 static inline void _tcpconn_free(struct tcp_connection* c)
1019 1019
 {
1020 1020
 #ifdef TCP_BUF_WRITE
1021
-	if (unlikely(c->wbuf_q.first))
1022
-		wbufq_destroy(&c->wbuf_q);
1021
+	if (unlikely(_wbufq_non_empty(c)))
1022
+		_wbufq_destroy(&c->wbuf_q);
1023 1023
 #endif
1024 1024
 	lock_destroy(&c->write_lock);
1025 1025
 #ifdef USE_TLS
... ...
@@ -1412,11 +1421,11 @@ no_id:
1412 1412
 get_fd:
1413 1413
 #ifdef TCP_BUF_WRITE
1414 1414
 		/* if data is already queued, we don't need the fd any more */
1415
-		if (unlikely(tcp_options.tcp_buf_write && c->wbuf_q.first)){
1415
+		if (unlikely(tcp_options.tcp_buf_write && _wbufq_non_empty(c))){
1416 1416
 			lock_get(&c->write_lock);
1417
-				if (likely(c->wbuf_q.first)){
1417
+				if (likely(_wbufq_non_empty(c))){
1418 1418
 					do_close_fd=0;
1419
-					if (unlikely(wbufq_add(c, buf, len)<0)){
1419
+					if (unlikely(_wbufq_add(c, buf, len)<0)){
1420 1420
 						lock_release(&c->write_lock);
1421 1421
 						n=-1;
1422 1422
 						goto error;
... ...
@@ -1483,8 +1492,8 @@ send_it:
1483 1483
 	lock_get(&c->write_lock);
1484 1484
 #ifdef TCP_BUF_WRITE
1485 1485
 	if (likely(tcp_options.tcp_buf_write)){
1486
-		if (c->wbuf_q.first){
1487
-			if (unlikely(wbufq_add(c, buf, len)<0)){
1486
+		if (_wbufq_non_empty(c)){
1487
+			if (unlikely(_wbufq_add(c, buf, len)<0)){
1488 1488
 				lock_release(&c->write_lock);
1489 1489
 				n=-1;
1490 1490
 				goto error;
... ...
@@ -1514,8 +1523,8 @@ send_it:
1514 1514
 		if (tcp_options.tcp_buf_write && 
1515 1515
 				(errno==EAGAIN || errno==EWOULDBLOCK)){
1516 1516
 			lock_get(&c->write_lock);
1517
-			enable_write_watch=(c->wbuf_q.first==0);
1518
-			if (unlikely(wbufq_add(c, buf, len)<0)){
1517
+			enable_write_watch=_wbufq_empty(c);
1518
+			if (unlikely(_wbufq_add(c, buf, len)<0)){
1519 1519
 				lock_release(&c->write_lock);
1520 1520
 				n=-1;
1521 1521
 				goto error;
... ...
@@ -1567,7 +1576,6 @@ error:
1567 1567
 	if (likely(tcp_options.tcp_buf_write)){
1568 1568
 		if (unlikely(c->state==S_CONN_CONNECT))
1569 1569
 			c->state=S_CONN_OK;
1570
-		c->last_write=get_ticks_raw();
1571 1570
 	}
1572 1571
 #endif /* TCP_BUF_WRITE */
1573 1572
 end:
... ...
@@ -1747,11 +1755,11 @@ static void tcpconn_destroy(struct tcp_connection* tcpconn)
1747 1747
 		LOG(L_CRIT, "tcpconn_destroy: possible BUG: flags = %0x\n",
1748 1748
 					tcpconn->flags);
1749 1749
 	}
1750
-	if (unlikely(tcpconn->wbuf_q.first)){
1750
+	if (unlikely(_wbufq_non_empty(tcpconn))){
1751 1751
 		lock_get(&tcpconn->write_lock);
1752 1752
 			/* check again, while holding the lock */
1753
-			if (likely(tcpconn->wbuf_q.first))
1754
-				wbufq_destroy(&tcpconn->wbuf_q);
1753
+			if (likely(_wbufq_non_empty(tcpconn)))
1754
+				_wbufq_destroy(&tcpconn->wbuf_q);
1755 1755
 		lock_release(&tcpconn->write_lock);
1756 1756
 	}
1757 1757
 #endif /* TCP_BUF_WRITE */
... ...
@@ -1986,6 +1994,7 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
1986 1986
 	int bytes;
1987 1987
 	int n;
1988 1988
 	ticks_t t;
1989
+	ticks_t crt_timeout;
1989 1990
 	
1990 1991
 	if (unlikely(tcp_c->unix_sock<=0)){
1991 1992
 		/* (we can't have a fd==0, 0 is never closed )*/
... ...
@@ -2059,11 +2068,31 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
2059 2059
 			t=get_ticks_raw();
2060 2060
 			tcpconn->timeout=t+tcp_con_lifetime;
2061 2061
 			tcpconn_put(tcpconn);
2062
+			crt_timeout=tcp_con_lifetime;
2063
+#ifdef TCP_BUF_WRITE
2064
+			if (unlikely(tcp_options.tcp_buf_write && 
2065
+							_wbufq_non_empty(tcpconn) )){
2066
+				if (unlikely(TICKS_LE(t, tcpconn->wbuf_q.wr_timeout))){
2067
+					DBG("handle_tcp_child: wr. timeout on CONN_RELEASE for %p "
2068
+							"refcnt= %d\n", tcpconn,
2069
+							atomic_get(&tcpconn->refcnt));
2070
+					/* timeout */
2071
+					if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
2072
+						io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2073
+						tcpconn->flags&=~F_CONN_WRITE_W;
2074
+					}
2075
+					tcpconn_destroy(tcpconn); /* closes also the fd */
2076
+					break;
2077
+				}else{
2078
+					crt_timeout=MIN_unsigned(tcp_con_lifetime,
2079
+											tcpconn->wbuf_q.wr_timeout-t);
2080
+				}
2081
+			}
2082
+#endif /* TCP_BUF_WRITE */
2062 2083
 			/* re-activate the timer */
2063 2084
 			tcpconn->timer.f=tcpconn_main_timeout;
2064 2085
 			local_timer_reinit(&tcpconn->timer);
2065
-			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
2066
-								tcp_con_lifetime, t);
2086
+			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, crt_timeout, t);
2067 2087
 			/* must be after the de-ref*/
2068 2088
 			tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
2069 2089
 #ifdef TCP_BUF_WRITE
... ...
@@ -2234,7 +2263,8 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
2234 2234
 			/* update the timeout*/
2235 2235
 			t=get_ticks_raw();
2236 2236
 			tcpconn->timeout=t+tcp_con_lifetime;
2237
-			/* activate the timer (already properly init. in tcpconn_new() */
2237
+			/* activate the timer (already properly init. in tcpconn_new())
2238
+			 * no need for */
2238 2239
 			local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
2239 2240
 								tcp_con_lifetime, t);
2240 2241
 			tcpconn->flags&=~F_CONN_REMOVED;
... ...
@@ -2279,6 +2309,20 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
2279 2279
 					}
2280 2280
 				}
2281 2281
 				tcpconn->flags|=F_CONN_WRITE_W;
2282
+				t=get_ticks_raw();
2283
+				if (likely(!(tcpconn->flags & F_CONN_READER) && 
2284
+					(TICKS_LT(tcpconn->wbuf_q.wr_timeout, tcpconn->timeout)) &&
2285
+						TICKS_LT(t, tcpconn->wbuf_q.wr_timeout) )){
2286
+					/* _wbufq_nonempty() is guaranteed here */
2287
+					/* update the timer */
2288
+					local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
2289
+					local_timer_reinit(&tcpconn->timer);
2290
+					local_timer_add(&tcp_main_ltimer, &tcpconn->timer,
2291
+										tcpconn->wbuf_q.wr_timeout-t, t);
2292
+					DBG("tcp_main: handle_ser_child: CONN_QUEUED_WRITE; %p "
2293
+							"timeout adjusted to %d s\n", tcpconn, 
2294
+							TICKS_TO_S(tcpconn->wbuf_q.wr_timeout-t));
2295
+				}
2282 2296
 			}else{
2283 2297
 				LOG(L_WARN, "tcp_main: hanlder_ser_child: connection %p"
2284 2298
 							" already watched for write\n", tcpconn);
... ...
@@ -2621,10 +2665,28 @@ static ticks_t tcpconn_main_timeout(ticks_t t, struct timer_ln* tl, void* data)
2621 2621
 	c=(struct tcp_connection*)data; 
2622 2622
 	/* or (struct tcp...*)(tl-offset(c->timer)) */
2623 2623
 	
2624
+#ifdef TCP_BUF_WRITE
2625
+	DBG( "tcp_main: entering timer for %p (ticks=%d, timeout=%d (%d s), "
2626
+			"wr_timeout=%d (%d s)), write queue: %d bytes\n",
2627
+			c, t, c->timeout, TICKS_TO_S(c->timeout-t),
2628
+			c->wbuf_q.wr_timeout, TICKS_TO_S(c->wbuf_q.wr_timeout-t),
2629
+			c->wbuf_q.queued);
2630
+	
2631
+	if (TICKS_LT(t, c->timeout) && 
2632
+			(!tcp_options.tcp_buf_write | _wbufq_empty(c) |
2633
+				TICKS_LT(t, c->wbuf_q.wr_timeout)) ){
2634
+		if (unlikely(tcp_options.tcp_buf_write && _wbufq_non_empty(c)))
2635
+			return (ticks_t)MIN_unsigned(c->timeout-t, c->wbuf_q.wr_timeout-t);
2636
+		else
2637
+			return (ticks_t)(c->timeout - t);
2638
+	}
2639
+#else /* ! TCP_BUF_WRITE */
2624 2640
 	if (TICKS_LT(t, c->timeout)){
2625 2641
 		/* timeout extended, exit */
2626 2642
 		return (ticks_t)(c->timeout - t);
2627 2643
 	}
2644
+#endif /* TCP_BUF_WRITE */
2645
+	DBG("tcp_main: timeout for %p\n", c);
2628 2646
 	if (likely(atomic_get(&c->refcnt)==0)){
2629 2647
 		TCPCONN_LOCK;
2630 2648
 			/* check again to avoid races with tcp_send() */