Browse code

io_wait: kqueue: use the entire array during too many errors fallback

Minor fix/optimization: if there are too many errors in the
changelist and the kevent() call has to be retried, use the entire
array (don't rely on the current watched fd number which will be
smaller then the array real size, since commit 996826).

(only kqueue using systems are affected by this fix: *bsd and
darwin)

Andrei Pelinescu-Onciul authored on 08/07/2010 13:18:52
Showing 1 changed files
... ...
@@ -1,6 +1,6 @@
1
-/* 
1
+/*
2 2
  * $Id$
3
- * 
3
+ *
4 4
  * Copyright (C) 2005 iptelorg GmbH
5 5
  *
6 6
  * Permission to use, copy, modify, and distribute this software for any
... ...
@@ -31,9 +31,9 @@
31 31
  *                 this assumption)
32 32
  *     local_malloc (defaults to pkg_malloc)
33 33
  *     local_free   (defaults to pkg_free)
34
- *  
34
+ *
35 35
  */
36
-/* 
36
+/*
37 37
  * History:
38 38
  * --------
39 39
  *  2005-06-13  created by andrei
... ...
@@ -79,8 +79,8 @@
79 79
 #endif
80 80
 #ifdef HAVE_SELECT
81 81
 /* needed on openbsd for select*/
82
-#include <sys/time.h> 
83
-#include <sys/types.h> 
82
+#include <sys/time.h>
83
+#include <sys/types.h>
84 84
 #include <unistd.h>
85 85
 /* needed according to POSIX for select*/
86 86
 #include <sys/select.h>
... ...
@@ -109,7 +109,7 @@ extern int _os_ver; /* os version number, needed to select bugs workarrounds */
109 109
 
110 110
 #if 0
111 111
 enum fd_types; /* this should be defined from the including file,
112
-				  see tcp_main.c for an example, 
112
+				  see tcp_main.c for an example,
113 113
 				  0 has a special meaning: not used/empty*/
114 114
 #endif
115 115
 
... ...
@@ -147,7 +147,7 @@ struct io_wait_handler{
147 147
 	enum poll_types poll_method;
148 148
 	int flags;
149 149
 	struct fd_map* fd_hash;
150
-	int fd_no; /*  current index used in fd_array and the passed size for 
150
+	int fd_no; /*  current index used in fd_array and the passed size for
151 151
 				   ep_array (for kq_array at least
152 152
 				    max(twice the size, kq_changes_size) should be
153 153
 				   be passed). */
... ...
@@ -222,7 +222,7 @@ static inline struct fd_map* hash_fd_map(	io_wait_h* h,
222 222
  *          events - combinations of POLLIN, POLLOUT, POLLERR & POLLHUP
223 223
  *          idx    - index in the fd_array (or -1 if not known)
224 224
  * return: -1 on error
225
- *          0 on EAGAIN or when by some other way it is known that no more 
225
+ *          0 on EAGAIN or when by some other way it is known that no more
226 226
  *            io events are queued on the fd (the receive buffer is empty).
227 227
  *            Usefull to detect when there are no more io events queued for
228 228
  *            sigio_rt, epoll_et, kqueue.
... ...
@@ -246,7 +246,7 @@ int handle_io(struct fd_map* fm, short events, int idx);
246 246
  *       and EVFILT_WRITE, EV_ADD for the same fd).
247 247
  * returns: -1 on error, 0 on success
248 248
  */
249
-static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag, 
249
+static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
250 250
 								void* data)
251 251
 {
252 252
 	int n;
... ...
@@ -424,7 +424,7 @@ inline static int io_watch_add(	io_wait_h* h,
424 424
 #ifdef HAVE_SIGIO_RT
425 425
 		case POLL_SIGIO_RT:
426 426
 			fd_array_setup(events);
427
-			/* re-set O_ASYNC might be needed, if not done from 
427
+			/* re-set O_ASYNC might be needed, if not done from
428 428
 			 * io_watch_del (or if somebody wants to add a fd which has
429 429
 			 * already O_ASYNC/F_SETSIG set on a duplicate)
430 430
 			 */
... ...
@@ -545,7 +545,7 @@ again_devpoll:
545 545
 		pf.events=events;
546 546
 check_io_again:
547 547
 		n=0;
548
-		while(e->type && ((n=poll(&pf, 1, 0))>0) && 
548
+		while(e->type && ((n=poll(&pf, 1, 0))>0) &&
549 549
 				(handle_io(e, pf.revents, idx)>0) &&
550 550
 				(pf.revents & (e->events|POLLERR|POLLHUP)));
551 551
 		if (unlikely(e->type && (n==-1))){
... ...
@@ -560,20 +560,20 @@ error:
560 560
 	if (e) unhash_fd_map(e);
561 561
 	return -1;
562 562
 #undef fd_array_setup
563
-#undef set_fd_flags 
563
+#undef set_fd_flags
564 564
 }
565 565
 
566 566
 
567 567
 
568 568
 #define IO_FD_CLOSING 16
569
-/* parameters:    h - handler 
569
+/* parameters:    h - handler
570 570
  *               fd - file descriptor
571 571
  *            index - index in the fd_array if known, -1 if not
572 572
  *                    (if index==-1 fd_array will be searched for the
573
- *                     corresponding fd* entry -- slower but unavoidable in 
573
+ *                     corresponding fd* entry -- slower but unavoidable in
574 574
  *                     some cases). index is not used (no fd_array) for epoll,
575 575
  *                     /dev/poll and kqueue
576
- *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was 
576
+ *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was
577 577
  *                    or will shortly be closed, in some cases we can avoid
578 578
  *                    extra remove operations (e.g.: epoll, kqueue, sigio)
579 579
  * returns 0 if ok, -1 on error */
... ...
@@ -642,7 +642,7 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
642 642
 				FD_CLR(fd, &h->master_wset);
643 643
 			if (unlikely(h->max_fd_select && (h->max_fd_select==fd)))
644 644
 				/* we don't know the prev. max, so we just decrement it */
645
-				h->max_fd_select--; 
645
+				h->max_fd_select--;
646 646
 			fix_fd_array;
647 647
 			break;
648 648
 #endif
... ...
@@ -656,17 +656,17 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
656 656
 			 */
657 657
 			/*if (!(flags & IO_FD_CLOSING)){*/
658 658
 				/* reset ASYNC */
659
-				fd_flags=fcntl(fd, F_GETFL); 
660
-				if (unlikely(fd_flags==-1)){ 
661
-					LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:" 
662
-							" %s [%d]\n", strerror(errno), errno); 
663
-					goto error; 
664
-				} 
665
-				if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){ 
666
-					LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL" 
667
-								" failed: %s [%d]\n", strerror(errno), errno); 
668
-					goto error; 
669
-				} 
659
+				fd_flags=fcntl(fd, F_GETFL);
660
+				if (unlikely(fd_flags==-1)){
661
+					LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:"
662
+							" %s [%d]\n", strerror(errno), errno);
663
+					goto error;
664
+				}
665
+				if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){
666
+					LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL"
667
+								" failed: %s [%d]\n", strerror(errno), errno);
668
+					goto error;
669
+				}
670 670
 			fix_fd_array; /* only on success */
671 671
 			break;
672 672
 #endif
... ...
@@ -676,7 +676,7 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
676 676
 			/* epoll doesn't seem to automatically remove sockets,
677 677
 			 * if the socket is a duplicate/moved and the original
678 678
 			 * is still open. The fd is removed from the epoll set
679
-			 * only when the original (and all the  copies?) is/are 
679
+			 * only when the original (and all the  copies?) is/are
680 680
 			 * closed. This is probably a bug in epoll. --andrei */
681 681
 #ifdef EPOLL_NO_CLOSE_BUG
682 682
 			if (!(flags & IO_FD_CLOSING)){
... ...
@@ -726,7 +726,7 @@ again_devpoll:
726 726
 				if (write(h->dpoll_fd, &pfd, sizeof(pfd))==-1){
727 727
 					if (errno==EINTR) goto again_devpoll;
728 728
 					LOG(L_ERR, "ERROR: io_watch_del: removing fd from "
729
-								"/dev/poll failed: %s [%d]\n", 
729
+								"/dev/poll failed: %s [%d]\n",
730 730
 								strerror(errno), errno);
731 731
 					goto error;
732 732
 				}
... ...
@@ -734,7 +734,7 @@ again_devpoll:
734 734
 #endif
735 735
 		default:
736 736
 			LOG(L_CRIT, "BUG: io_watch_del: no support for poll method "
737
-					" %s (%d)\n", poll_method_str[h->poll_method], 
737
+					" %s (%d)\n", poll_method_str[h->poll_method],
738 738
 					h->poll_method);
739 739
 			goto error;
740 740
 	}
... ...
@@ -748,12 +748,12 @@ error:
748 748
 
749 749
 
750 750
 
751
-/* parameters:    h - handler 
751
+/* parameters:    h - handler
752 752
  *               fd - file descriptor
753 753
  *           events - new events to watch for
754 754
  *              idx - index in the fd_array if known, -1 if not
755 755
  *                    (if index==-1 fd_array will be searched for the
756
- *                     corresponding fd* entry -- slower but unavoidable in 
756
+ *                     corresponding fd* entry -- slower but unavoidable in
757 757
  *                     some cases). index is not used (no fd_array) for epoll,
758 758
  *                     /dev/poll and kqueue
759 759
  * returns 0 if ok, -1 on error */
... ...
@@ -911,7 +911,7 @@ again_devpoll1:
911 911
 				if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
912 912
 					if (errno==EINTR) goto again_devpoll1;
913 913
 					LOG(L_ERR, "ERROR: io_watch_chg: removing fd from "
914
-								"/dev/poll failed: %s [%d]\n", 
914
+								"/dev/poll failed: %s [%d]\n",
915 915
 								strerror(errno), errno);
916 916
 					goto error;
917 917
 				}
... ...
@@ -921,7 +921,7 @@ again_devpoll2:
921 921
 				if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
922 922
 					if (errno==EINTR) goto again_devpoll2;
923 923
 					LOG(L_ERR, "ERROR: io_watch_chg: re-adding fd to "
924
-								"/dev/poll failed: %s [%d]\n", 
924
+								"/dev/poll failed: %s [%d]\n",
925 925
 								strerror(errno), errno);
926 926
 					/* error re-adding the fd => mark it as removed/unhash */
927 927
 					unhash_fd_map(e);
... ...
@@ -931,7 +931,7 @@ again_devpoll2:
931 931
 #endif
932 932
 		default:
933 933
 			LOG(L_CRIT, "BUG: io_watch_chg: no support for poll method "
934
-					" %s (%d)\n", poll_method_str[h->poll_method], 
934
+					" %s (%d)\n", poll_method_str[h->poll_method],
935 935
 					h->poll_method);
936 936
 			goto error;
937 937
 	}
... ...
@@ -944,7 +944,7 @@ error:
944 944
 
945 945
 
946 946
 
947
-/* io_wait_loop_x style function 
947
+/* io_wait_loop_x style function.
948 948
  * wait for io using poll()
949 949
  * params: h      - io_wait handle
950 950
  *         t      - timeout in s
... ...
@@ -985,11 +985,11 @@ again:
985 985
 				/* repeat handle_io if repeat, fd still watched (not deleted
986 986
 				 *  inside handle_io), handle_io returns that there's still
987 987
 				 *  IO and the fd is still watched for the triggering event */
988
-				while(fm->type && 
988
+				while(fm->type &&
989 989
 						(handle_io(fm, h->fd_array[r].revents, r) > 0) &&
990 990
 						repeat && ((fm->events|POLLERR|POLLHUP) &
991 991
 													h->fd_array[r].revents));
992
-				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd) 
992
+				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
993 993
 										  array shifting */
994 994
 			}
995 995
 		}
... ...
@@ -1034,9 +1034,9 @@ again:
1034 1034
 			if (unlikely(revents)){
1035 1035
 				h->crt_fd_array_idx=r;
1036 1036
 				fm=get_fd_map(h, h->fd_array[r].fd);
1037
-				while(fm->type && (fm->events & revents) && 
1037
+				while(fm->type && (fm->events & revents) &&
1038 1038
 						(handle_io(fm, revents, r)>0) && repeat);
1039
-				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd) 
1039
+				r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
1040 1040
 										  array shifting */
1041 1041
 				n--;
1042 1042
 			}
... ...
@@ -1060,7 +1060,7 @@ again:
1060 1060
 			if (errno==EINTR) goto again; /* signal, ignore it */
1061 1061
 			else{
1062 1062
 				LOG(L_ERR, "ERROR:io_wait_loop_epoll: "
1063
-						"epoll_wait(%d, %p, %d, %d): %s [%d]\n", 
1063
+						"epoll_wait(%d, %p, %d, %d): %s [%d]\n",
1064 1064
 						h->epfd, h->ep_array, h->fd_no, t*1000,
1065 1065
 						strerror(errno), errno);
1066 1066
 				goto error;
... ...
@@ -1086,7 +1086,7 @@ again:
1086 1086
 					;
1087 1087
 			if (likely(revents)){
1088 1088
 				fm=(struct fd_map*)h->ep_array[r].data.ptr;
1089
-				while(fm->type && ((fm->events|POLLERR|POLLHUP) & revents) && 
1089
+				while(fm->type && ((fm->events|POLLERR|POLLHUP) & revents) &&
1090 1090
 						(handle_io(fm, revents, -1)>0) && repeat);
1091 1091
 			}else{
1092 1092
 				LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
... ...
@@ -1140,7 +1140,8 @@ again:
1140 1140
 			orig_changes -= apply_changes;
1141 1141
 			memmove(&h->kq_changes[0], &h->kq_changes[apply_changes],
1142 1142
 									sizeof(h->kq_changes[0])*h->kq_nchanges);
1143
-			apply_changes = orig_changes<h->fd_no ? orig_changes : h->fd_no;
1143
+			apply_changes = (orig_changes < h->kq_array_size) ? orig_changes :
1144
+								h->kq_array_size;
1144 1145
 		} else {
1145 1146
 			orig_changes = 0;
1146 1147
 			apply_changes = 0;
... ...
@@ -1171,7 +1172,7 @@ again:
1171 1171
 					watched fd changes will be applied the fd will be valid
1172 1172
 					(so no EBADF), but it's not already watch => ENOENT.
1173 1173
 					We report a BUG for the other errors (there's nothing
1174
-					constructive we can do if we get an error we don't know 
1174
+					constructive we can do if we get an error we don't know
1175 1175
 					how to handle), but apart from that we ignore it in the
1176 1176
 					idea that it is better apply the rest of the changes,
1177 1177
 					rather then dropping all of them.
... ...
@@ -1203,21 +1204,21 @@ again:
1203 1203
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
1204 1204
 						(((int)!((h->kq_array[r].flags & EV_EOF) &&
1205 1205
 								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
1206
-					while(fm->type && (fm->events & revents) && 
1206
+					while(fm->type && (fm->events & revents) &&
1207 1207
 							(handle_io(fm, revents, -1)>0) && repeat);
1208 1208
 				}else if (h->kq_array[r].filter==EVFILT_WRITE){
1209 1209
 					revents=POLLOUT |
1210 1210
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
1211 1211
 						(((int)!((h->kq_array[r].flags & EV_EOF) &&
1212 1212
 								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
1213
-					while(fm->type && (fm->events & revents) && 
1213
+					while(fm->type && (fm->events & revents) &&
1214 1214
 							(handle_io(fm, revents, -1)>0) && repeat);
1215 1215
 				}else{
1216 1216
 					BUG("io_wait_loop_kqueue: unknown filter: kqueue: event "
1217 1217
 							"%d/%d: fd=%d, filter=%d, flags=0x%x, fflags=0x%x,"
1218 1218
 							" data=%lx, udata=%lx\n",
1219 1219
 					r, n, h->kq_array[r].ident, h->kq_array[r].filter,
1220
-					h->kq_array[r].flags, h->kq_array[r].fflags, 
1220
+					h->kq_array[r].flags, h->kq_array[r].fflags,
1221 1221
 					(long)h->kq_array[r].data, (long)h->kq_array[r].udata);
1222 1222
 				}
1223 1223
 			}
... ...
@@ -1306,14 +1307,14 @@ again:
1306 1306
 			 *  POLLIN|POLLRDNORM|POLLMSG (=POLL_MSG),
1307 1307
 			 *  POLLERR (=POLL_ERR),
1308 1308
 			 *  POLLPRI|POLLRDBAND (=POLL_PRI),
1309
-			 *  POLLHUP|POLLERR (=POLL_HUP) 
1309
+			 *  POLLHUP|POLLERR (=POLL_HUP)
1310 1310
 			 *  [linux 2.6.22 fs/fcntl.c:447]
1311 1311
 			 */
1312 1312
 #ifdef EXTRA_DEBUG
1313 1313
 			DBG("io_wait_loop_sigio_rt: siginfo: signal=%d (%d),"
1314 1314
 					" si_code=%d, si_band=0x%x,"
1315 1315
 					" si_fd=%d\n",
1316
-					siginfo.si_signo, n, siginfo.si_code, 
1316
+					siginfo.si_signo, n, siginfo.si_code,
1317 1317
 					(unsigned)sigio_band,
1318 1318
 					sigio_fd);
1319 1319
 #endif
... ...
@@ -1326,7 +1327,7 @@ again:
1326 1326
 				/* fix revents==POLLPRI case */
1327 1327
 				revents |= (!(revents & POLLPRI)-1) & POLLIN;
1328 1328
 				/* we can have queued signals generated by fds not watched
1329
-			 	 * any more, or by fds in transition, to a child 
1329
+			 	 * any more, or by fds in transition, to a child
1330 1330
 				 * => ignore them */
1331 1331
 				if (fm->type && ((fm->events|POLLERR|POLLHUP) & revents))
1332 1332
 					handle_io(fm, revents, -1);
... ...
@@ -1342,7 +1343,7 @@ again:
1342 1342
 			}
1343 1343
 		}
1344 1344
 	}else{
1345
-		/* signal queue overflow 
1345
+		/* signal queue overflow
1346 1346
 		 * TODO: increase signal queue size: 2.4x /proc/.., 2.6x -rlimits */
1347 1347
 		LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: signal queue overflowed"
1348 1348
 					"- falling back to poll\n");