Browse code

- for tcp read processes, reuse the read fd for sending - keep a tcp send fd cache (experimental) (performance improvement)

Andrei Pelinescu-Onciul authored on 27/11/2007 21:05:32
Showing 3 changed files
... ...
@@ -128,6 +128,7 @@ struct tcp_connection{
128 128
 	gen_lock_t write_lock;
129 129
 	int id; /* id (unique!) used to retrieve a specific connection when
130 130
 	           reply-ing*/
131
+	int reader_pid; /* pid of the active reader process */
131 132
 	struct receive_info rcv; /* src & dst ip, ports, proto a.s.o*/
132 133
 	struct tcp_req req; /* request data */
133 134
 	atomic_t refcnt;
... ...
@@ -84,6 +84,7 @@
84 84
  *  2007-11-22  always add the connection & clear the coresponding flags before
85 85
  *               io_watch_add-ing its fd - it's safer this way (andrei)
86 86
  *  2007-11-26  improved tcp timers: switched to local_timer (andrei)
87
+ *  2007-11-27  added send fd cache and reader fd reuse (andrei)
87 88
  */
88 89
 
89 90
 
... ...
@@ -176,6 +177,23 @@
176 177
 enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
177 178
 				F_TCPCONN, F_TCPCHILD, F_PROC };
178 179
 
180
+
181
+#define TCP_FD_CACHE
182
+
183
+#ifdef TCP_FD_CACHE
184
+
185
+#define TCP_FD_CACHE_SIZE 8
186
+
187
+struct fd_cache_entry{
188
+	struct tcp_connection* con;
189
+	int id;
190
+	int fd;
191
+};
192
+
193
+
194
+static struct fd_cache_entry fd_cache[TCP_FD_CACHE_SIZE];
195
+#endif /* TCP_FD_CACHE */
196
+
179 197
 static int is_tcp_main=0;
180 198
 
181 199
 int tcp_accept_aliases=0; /* by default don't accept aliases */
... ...
@@ -971,6 +989,48 @@ error:
971 989
 
972 990
 
973 991
 
992
+#ifdef TCP_FD_CACHE
993
+
994
+static void tcp_fd_cache_init()
995
+{
996
+	int r;
997
+	for (r=0; r<TCP_FD_CACHE_SIZE; r++)
998
+		fd_cache[r].fd=-1;
999
+}
1000
+
1001
+
1002
+inline static struct fd_cache_entry* tcp_fd_cache_get(struct tcp_connection *c)
1003
+{
1004
+	int h;
1005
+	
1006
+	h=c->id%TCP_FD_CACHE_SIZE;
1007
+	if ((fd_cache[h].fd>0) && (fd_cache[h].id==c->id) && (fd_cache[h].con==c))
1008
+		return &fd_cache[h];
1009
+	return 0;
1010
+}
1011
+
1012
+
1013
+inline static void tcp_fd_cache_rm(struct fd_cache_entry* e)
1014
+{
1015
+	e->fd=-1;
1016
+}
1017
+
1018
+
1019
+inline static void tcp_fd_cache_add(struct tcp_connection *c, int fd)
1020
+{
1021
+	int h;
1022
+	
1023
+	h=c->id%TCP_FD_CACHE_SIZE;
1024
+	if (fd_cache[h].fd>0)
1025
+		close(fd_cache[h].fd);
1026
+	fd_cache[h].fd=fd;
1027
+	fd_cache[h].id=c->id;
1028
+	fd_cache[h].con=c;
1029
+}
1030
+
1031
+#endif /* TCP_FD_CACHE */
1032
+
1033
+
974 1034
 /* finds a tcpconn & sends on it
975 1035
  * uses the dst members to, proto (TCP|TLS) and id and tries to send
976 1036
  *  from the "from" address (if non null and id==0)
... ...
@@ -986,21 +1046,27 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
986 1046
 	int fd;
987 1047
 	long response[2];
988 1048
 	int n;
1049
+	int do_close_fd;
1050
+#ifdef TCP_FD_CACHE
1051
+	struct fd_cache_entry* fd_cache_e;
989 1052
 	
1053
+	fd_cache_e=0;
1054
+#endif /* TCP_FD_CACHE */
1055
+	do_close_fd=1; /* close the fd on exit */
990 1056
 	port=su_getport(&dst->to);
991
-	if (port){
1057
+	if (likely(port)){
992 1058
 		su2ip_addr(&ip, &dst->to);
993 1059
 		c=tcpconn_get(dst->id, &ip, port, from, tcp_con_lifetime); 
994
-	}else if (dst->id){
1060
+	}else if (likely(dst->id)){
995 1061
 		c=tcpconn_get(dst->id, 0, 0, 0, tcp_con_lifetime);
996 1062
 	}else{
997 1063
 		LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
998 1064
 		return -1;
999 1065
 	}
1000 1066
 	
1001
-	if (dst->id){
1002
-		if (c==0) {
1003
-			if (port){
1067
+	if (likely(dst->id)){
1068
+		if (unlikely(c==0)) {
1069
+			if (likely(port)){
1004 1070
 				/* try again w/o id */
1005 1071
 				c=tcpconn_get(0, &ip, port, from, tcp_con_lifetime);
1006 1072
 				goto no_id;
... ...
@@ -1012,10 +1078,10 @@ int tcp_send(struct dest_info* dst, union sockaddr_union* from,
1012 1078
 		}else goto get_fd;
1013 1079
 	}
1014 1080
 no_id:
1015
-		if (c==0){
1081
+		if (unlikely(c==0)){
1016 1082
 			DBG("tcp_send: no open tcp connection found, opening new one\n");
1017 1083
 			/* create tcp connection */
1018
-			if (from==0){
1084
+			if (likely(from==0)){
1019 1085
 				/* check to see if we have to use a specific source addr. */
1020 1086
 				switch (dst->to.s.sa_family) {
1021 1087
 					case AF_INET:
... ...
@@ -1031,7 +1097,7 @@ no_id:
1031 1097
 						break;
1032 1098
 				}
1033 1099
 			}
1034
-			if ((c=tcpconn_connect(&dst->to, from, dst->proto))==0){
1100
+			if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto))==0)){
1035 1101
 				LOG(L_ERR, "ERROR: tcp_send: connect failed\n");
1036 1102
 				return -1;
1037 1103
 			}
... ...
@@ -1042,7 +1108,7 @@ no_id:
1042 1108
 			response[0]=(long)c;
1043 1109
 			response[1]=CONN_NEW;
1044 1110
 			n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
1045
-			if (n<=0){
1111
+			if (unlikely(n<=0)){
1046 1112
 				LOG(L_ERR, "BUG: tcp_send: failed send_fd: %s (%d)\n",
1047 1113
 						strerror(errno), errno);
1048 1114
 				n=-1;
... ...
@@ -1051,14 +1117,27 @@ no_id:
1051 1117
 			goto send_it;
1052 1118
 		}
1053 1119
 get_fd:
1054
-			/* todo: see if this is not the same process holding
1055
-			 *  c  and if so send directly on c->fd */
1120
+		/* check if this is not the same reader process holding
1121
+		 *  c  and if so send directly on c->fd */
1122
+		if (c->reader_pid==my_pid()){
1123
+			WARN("tcp_send: FIXME: send from reader (%d (%d)), reusing fd\n",
1124
+					my_pid(), process_no);
1125
+			fd=c->fd;
1126
+			do_close_fd=0; /* don't close the fd on exit, it's in use */
1127
+#ifdef TCP_FD_CACHE
1128
+		}else if (likely((fd_cache_e=tcp_fd_cache_get(c))!=0)){
1129
+			fd=fd_cache_e->fd;
1130
+			do_close_fd=0;
1131
+			WARN("tcp_send: FIXME: found fd in cache ( %d, %p, %d)\n",
1132
+					fd, c, fd_cache_e->id);
1133
+#endif /* TCP_FD_CACHE */
1134
+		}else{
1056 1135
 			DBG("tcp_send: tcp connection found (%p), acquiring fd\n", c);
1057 1136
 			/* get the fd */
1058 1137
 			response[0]=(long)c;
1059 1138
 			response[1]=CONN_GET_FD;
1060 1139
 			n=send_all(unix_tcp_sock, response, sizeof(response));
1061
-			if (n<=0){
1140
+			if (unlikely(n<=0)){
1062 1141
 				LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
1063 1142
 						strerror(errno), errno);
1064 1143
 				n=-1;
... ...
@@ -1066,13 +1145,13 @@ get_fd:
1066 1145
 			}
1067 1146
 			DBG("tcp_send, c= %p, n=%d\n", c, n);
1068 1147
 			n=receive_fd(unix_tcp_sock, &tmp, sizeof(tmp), &fd, MSG_WAITALL);
1069
-			if (n<=0){
1148
+			if (unlikely(n<=0)){
1070 1149
 				LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
1071 1150
 							" %s (%d)\n", strerror(errno), errno);
1072 1151
 				n=-1;
1073 1152
 				goto release_c;
1074 1153
 			}
1075
-			if (c!=tmp){
1154
+			if (unlikely(c!=tmp)){
1076 1155
 				LOG(L_CRIT, "BUG: tcp_send: get_fd: got different connection:"
1077 1156
 						"  %p (id= %d, refcnt=%d state=%d) != "
1078 1157
 						"  %p (n=%d)\n",
... ...
@@ -1083,7 +1162,7 @@ get_fd:
1083 1162
 				goto end;
1084 1163
 			}
1085 1164
 			DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
1086
-		
1165
+		}
1087 1166
 	
1088 1167
 	
1089 1168
 send_it:
... ...
@@ -1099,7 +1178,7 @@ send_it:
1099 1178
 	lock_release(&c->write_lock);
1100 1179
 	DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, fd);
1101 1180
 	DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
1102
-	if (n<0){
1181
+	if (unlikely(n<0)){
1103 1182
 		LOG(L_ERR, "ERROR: tcp_send: failed to send\n");
1104 1183
 		/* error on the connection , mark it as bad and set 0 timeout */
1105 1184
 		c->state=S_CONN_BAD;
... ...
@@ -1115,11 +1194,25 @@ send_it:
1115 1194
 		}
1116 1195
 		/* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put 
1117 1196
 		 * if it succeeds */
1118
-		close(fd);
1197
+#ifdef TCP_FD_CACHE
1198
+		if (unlikely(fd_cache_e)){
1199
+			LOG(L_ERR, "ERROR: tcp_send: error on cached fd, removing from the"
1200
+					"cache (%d, %p, %d)\n", 
1201
+					fd, fd_cache_e->con, fd_cache_e->id);
1202
+			tcp_fd_cache_rm(fd_cache_e);
1203
+			close(fd);
1204
+		}else
1205
+#endif /* TCP_FD_CACHE */
1206
+		if (do_close_fd) close(fd);
1119 1207
 		return n; /* error return, no tcpconn_put */
1120 1208
 	}
1121 1209
 end:
1122
-	close(fd);
1210
+#ifdef TCP_FD_CACHE
1211
+	if (unlikely(fd_cache_e==0)){
1212
+		tcp_fd_cache_add(c, fd);
1213
+	}else
1214
+#endif /* TCP_FD_CACHE */
1215
+	if (do_close_fd) close(fd);
1123 1216
 release_c:
1124 1217
 	tcpconn_put(c); /* release c (lock; dec refcnt; unlock) */
1125 1218
 	return n;
... ...
@@ -1248,6 +1341,9 @@ static void tcpconn_destroy(struct tcp_connection* tcpconn)
1248 1341
 			tls_close(tcpconn, fd);
1249 1342
 #endif
1250 1343
 		_tcpconn_free(tcpconn);
1344
+#ifdef TCP_FD_CACHE
1345
+		shutdown(fd, SHUT_RDWR);
1346
+#endif /* TCP_FD_CACHE */
1251 1347
 		close(fd);
1252 1348
 		(*tcp_connections_no)--;
1253 1349
 	}else{
... ...
@@ -1958,6 +2054,9 @@ static ticks_t tcpconn_main_timeout(ticks_t t, struct timer_ln* tl, void* data)
1958 2054
 						tls_close(c, fd);
1959 2055
 #endif /* USE_TLS */
1960 2056
 					_tcpconn_free(c);
2057
+#ifdef TCP_FD_CACHE
2058
+					shutdown(fd, SHUT_RDWR);
2059
+#endif /* TCP_FD_CACHE */
1961 2060
 					close(fd);
1962 2061
 				}
1963 2062
 				(*tcp_connections_no)--; /* modified only in tcp_main
... ...
@@ -2024,6 +2123,9 @@ static inline void tcpconn_destroy_all()
2024 2123
 #endif
2025 2124
 				_tcpconn_rm(c);
2026 2125
 				if (fd>0) {
2126
+#ifdef TCP_FD_CACHE
2127
+					shutdown(fd, SHUT_RDWR);
2128
+#endif /* TCP_FD_CACHE */
2027 2129
 					close(fd);
2028 2130
 				}
2029 2131
 				(*tcp_connections_no)--;
... ...
@@ -2059,6 +2161,15 @@ void tcp_main_loop()
2059 2161
 		goto error;
2060 2162
 	/* init: start watching all the fds*/
2061 2163
 	
2164
+	/* init local timer */
2165
+	if (init_local_timer(&tcp_main_ltimer, get_ticks_raw())!=0){
2166
+		LOG(L_ERR, "ERROR: init_tcp: failed to init local timer\n");
2167
+		goto error;
2168
+	}
2169
+#ifdef TCP_FD_CACHE
2170
+	tcp_fd_cache_init();
2171
+#endif /* TCP_FD_CACHE */
2172
+	
2062 2173
 	/* add all the sockets we listen on for connections */
2063 2174
 	for (si=tcp_listen; si; si=si->next){
2064 2175
 		if ((si->proto==PROTO_TCP) &&(si->socket!=-1)){
... ...
@@ -2312,11 +2423,6 @@ int init_tcp()
2312 2423
 					poll_method_name(tcp_poll_method));
2313 2424
 	}
2314 2425
 	
2315
-	if (init_local_timer(&tcp_main_ltimer, get_ticks_raw())!=0){
2316
-		LOG(L_ERR, "ERROR: init_tcp: failed to init local timer\n");
2317
-		goto error;
2318
-	}
2319
-	
2320 2426
 	return 0;
2321 2427
 error:
2322 2428
 	/* clean-up */
... ...
@@ -64,6 +64,7 @@
64 64
 #include "timer.h"
65 65
 #include "local_timer.h"
66 66
 #include "ut.h"
67
+#include "pt.h"
67 68
 #ifdef CORE_TLS
68 69
 #include "tls/tls_server.h"
69 70
 #else
... ...
@@ -671,6 +672,7 @@ void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
671 672
 				c, state, c->fd, c->id);
672 673
 		DBG(" extra_data %p\n", c->extra_data);
673 674
 		/* release req & signal the parent */
675
+		c->reader_pid=0; /* reset it */
674 676
 		if (c->fd!=-1) close(c->fd);
675 677
 		/* errno==EINTR, EWOULDBLOCK a.s.o todo */
676 678
 		response[0]=(long)c;
... ...
@@ -755,6 +757,7 @@ again:
755 757
 									"no fd read\n");
756 758
 				goto con_error;
757 759
 			}
760
+			con->reader_pid=my_pid();
758 761
 			if (unlikely(con==tcp_conn_lst)){
759 762
 				LOG(L_CRIT, "BUG: tcp_receive: handle_io: duplicate"
760 763
 							" connection received: %p, id %d, fd %d, refcnt %d"