Browse code

io_wait: fix kqueue and too many errors in changelist

kevent() tries to return errors in the changelist back in the
supplied eventlist array. However if this is not large enough, the
whole kevent() syscall will fail.
Now if kevent() fails with EBADF the call will be retried with a
smaller set of changes, until the entire original changelist is
applied.
Fixes also kq_ev_change() flush mode: on error it will try to
apply the changes one-by-one.

(this affects only systems that have kqueue: *bsd and darwin)

Andrei Pelinescu-Onciul authored on 17/06/2010 16:43:14
Showing 1 changed files
... ...
@@ -247,6 +247,7 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
247 247
 								void* data)
248 248
 {
249 249
 	int n;
250
+	int r;
250 251
 	struct timespec tspec;
251 252
 
252 253
 	if (h->kq_nchanges>=h->kq_changes_size){
... ...
@@ -257,11 +258,36 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
257 257
 		tspec.tv_nsec=0;
258 258
 again:
259 259
 		n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
260
-		if (n==-1){
261
-			if (errno==EINTR) goto again;
262
-			LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes "
260
+		if (unlikely(n == -1)){
261
+			if (likely(errno == EBADF)) {
262
+				/* one of the file descriptors is bad, probably already
263
+				   closed => try to apply changes one-by-one */
264
+				for (r = 0; r < h->kq_nchanges; r++) {
265
+retry2:
266
+					n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
267
+					if (n==-1) {
268
+						if (errno == EBADF)
269
+							continue; /* skip over it */
270
+						if (errno == EINTR)
271
+							goto retry2;
272
+						LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
273
+									" failed: %s [%d]\n",
274
+										strerror(errno), errno);
275
+						/* shift the array */
276
+						memmove(&h->kq_changes[0], &h->kq_changes[r+1],
277
+									sizeof(h->kq_changes[0])*
278
+										(h->kq_nchanges-r-1));
279
+						h->kq_nchanges-=(r+1);
280
+						return -1;
281
+					}
282
+				}
283
+			} else if (errno == EINTR) goto again;
284
+			else {
285
+				LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
263 286
 						" failed: %s [%d]\n", strerror(errno), errno);
264
-			return -1;
287
+				h->kq_nchanges=0; /* reset changes array */
288
+				return -1;
289
+			}
265 290
 		}
266 291
 		h->kq_nchanges=0; /* changes array is empty */
267 292
 	}
... ...
@@ -1076,22 +1102,43 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
1076 1076
 	int n, r;
1077 1077
 	struct timespec tspec;
1078 1078
 	struct fd_map* fm;
1079
+	int orig_changes;
1080
+	int apply_changes;
1079 1081
 	int revents;
1080 1082
 	
1081 1083
 	tspec.tv_sec=t;
1082 1084
 	tspec.tv_nsec=0;
1085
+	orig_changes=h->kq_nchanges;
1086
+	apply_changes=orig_changes;
1087
+	do {
1083 1088
 again:
1084
-		n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges,  h->kq_array,
1089
+		n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
1085 1090
 					h->fd_no, &tspec);
1086 1091
 		if (unlikely(n==-1)){
1087 1092
 			if (errno==EINTR) goto again; /* signal, ignore it */
1088
-			else{
1093
+			else if (errno==EBADF) {
1094
+				/* some of the FDs in kq_changes are bad (already closed)
1095
+				   and there is not enough space in kq_array to return all
1096
+				   of them back */
1097
+				apply_changes = h->fd_no;
1098
+				goto again;
1099
+			}else{
1089 1100
 				LOG(L_ERR, "ERROR: io_wait_loop_kqueue: kevent:"
1090 1101
 						" %s [%d]\n", strerror(errno), errno);
1091 1102
 				goto error;
1092 1103
 			}
1093 1104
 		}
1094
-		h->kq_nchanges=0; /* reset changes array */
1105
+		/* remove applied changes */
1106
+		h->kq_nchanges -= apply_changes;
1107
+		if (unlikely(apply_changes < orig_changes)) {
1108
+			orig_changes -= apply_changes;
1109
+			memmove(&h->kq_changes[0], &h->kq_changes[apply_changes],
1110
+									sizeof(h->kq_changes[0])*h->kq_nchanges);
1111
+			apply_changes = orig_changes<h->fd_no ? orig_changes : h->fd_no;
1112
+		} else {
1113
+			orig_changes = 0;
1114
+			apply_changes = 0;
1115
+		}
1095 1116
 		for (r=0; r<n; r++){
1096 1117
 #ifdef EXTRA_DEBUG
1097 1118
 			DBG("DBG: kqueue: event %d/%d: fd=%d, udata=%lx, flags=0x%x\n",
... ...
@@ -1148,6 +1195,7 @@ again:
1148 1148
 				}
1149 1149
 			}
1150 1150
 		}
1151
+	} while(unlikely(orig_changes));
1151 1152
 error:
1152 1153
 	return n;
1153 1154
 }