Browse code

io_wait: fix: check for EV_ERROR for kqueue()

Re-enabled and enhanced the check for EV_ERROR when using kqueue
(*bsd). This is needed to workaround errors reported by kqueue
when trying to delete (EV_DELETE) an already closed FD (this
can happen in the tcp code, where we try to avoid applying
immediately changes in the set of watched FDs and instead
collect them and apply them after all the current kqueue
events are processed => in some corner case situations it's
possible to try to delete the FD from kqueue after the fd
was close()'ed).
This fix will ignore EV_ERROR with data == EBADF. All the other
errors will result in a POLLERR flag for the callback.

It fixes crashes with *bsd under tcp stress tests (lots of very
short lived connections).

Andrei Pelinescu-Onciul authored on 17/06/2010 14:15:10 • Andrei Pelinescu - Onciul committed on 17/06/2010 14:28:02
Showing 1 changed files
... ...
@@ -45,6 +45,7 @@
45 45
  *  2007-11-29  support for write (POLLOUT); added io_watch_chg() (andrei)
46 46
  *  2008-02-04  POLLRDHUP & EPOLLRDHUP support (automatically enabled if POLLIN
47 47
  *               is set) (andrei)
48
+ *  2010-06-17  re-enabled & enhanced the EV_ERROR for kqueue (andrei)
48 49
  */
49 50
 
50 51
 
... ...
@@ -1097,31 +1098,55 @@ again:
1097 1098
 					r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
1098 1099
 					h->kq_array[r].flags);
1099 1100
 #endif
1100
-#if 0
1101
-			if (unlikely(h->kq_array[r].flags & EV_ERROR)){
1102
-				/* error in changes: we ignore it, it can be caused by
1103
-				   trying to remove an already closed fd: race between
1104
-				   adding something to the changes array, close() and
1105
-				   applying the changes */
1106
-				LOG(L_INFO, "INFO: io_wait_loop_kqueue: kevent error on "
1107
-							"fd %ld: %s [%ld]\n", h->kq_array[r].ident,
1101
+			if (unlikely((h->kq_array[r].flags & EV_ERROR) &&
1102
+							(h->kq_array[r].data == EBADF ||
1103
+							 h->kq_array[r].udata == 0))){
1104
+				/* error in changes: we ignore it if it has to do with a
1105
+				   bad fd or update==0. It can be caused by trying to remove an
1106
+				   already closed fd: race between adding something to the
1107
+				   changes array, close() and applying the changes.
1108
+				   E.g. for ser tcp: tcp_main sends a fd to child fore reading
1109
+				    => deletes it from the watched fds => the changes array
1110
+					will contain an EV_DELETE for it. Before the changes
1111
+					are applied (they are at the end of the main io_wait loop,
1112
+					after all the fd events were processed), a CON_ERR sent
1113
+					to tcp_main by a sender (send fail) is processed and causes
1114
+					the fd to be closed. When the changes are applied =>
1115
+					error for the EV_DELETE attempt of a closed fd.
1116
+				*/
1117
+				/*
1118
+					example EV_ERROR for trying to delete a read watched fd,
1119
+					that was already closed:
1120
+					{
1121
+						ident = 63,  [fd]
1122
+						filter = -1, [EVFILT_READ]
1123
+						flags = 16384, [EV_ERROR]
1124
+						fflags = 0,
1125
+						data = 9, [errno = EBADF]
1126
+						udata = 0x0
1127
+					}
1128
+				*/
1129
+				if (h->kq_array[r].data != EBADF)
1130
+					LOG(L_INFO, "INFO: io_wait_loop_kqueue: kevent error on "
1131
+							"fd %ld: %s [%ld]\n", (long)h->kq_array[r].ident,
1108 1132
 							strerror(h->kq_array[r].data),
1109 1133
 							(long)h->kq_array[r].data);
1110
-			}else{ 
1111
-#endif
1134
+			}else{
1112 1135
 				fm=(struct fd_map*)h->kq_array[r].udata;
1113 1136
 				if (likely(h->kq_array[r].filter==EVFILT_READ)){
1114
-					revents=POLLIN | 
1115
-						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP);
1137
+					revents=POLLIN |
1138
+						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
1139
+						(((int)!(h->kq_array[r].flags & EV_ERROR)-1)&POLLERR);
1116 1140
 					while(fm->type && (fm->events & revents) && 
1117 1141
 							(handle_io(fm, revents, -1)>0) && repeat);
1118 1142
 				}else if (h->kq_array[r].filter==EVFILT_WRITE){
1119
-					revents=POLLOUT | 
1120
-						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP);
1143
+					revents=POLLOUT |
1144
+						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
1145
+						(((int)!(h->kq_array[r].flags & EV_ERROR)-1)&POLLERR);
1121 1146
 					while(fm->type && (fm->events & revents) && 
1122 1147
 							(handle_io(fm, revents, -1)>0) && repeat);
1123 1148
 				}
1124
-			/*} */
1149
+			}
1125 1150
 		}
1126 1151
 error:
1127 1152
 	return n;