From baced4def008b3b03f89eef3f9d6da34e38b5f2a Mon Sep 17 00:00:00 2001 From: David Klempner Date: Tue, 10 Feb 2015 17:10:15 -0800 Subject: [PATCH] Epoll based multipoller This is a multipoller based on epoll rather than poll. Note that this implementation is aimed at correctness rather than performance, although it should immediately have better scalability to large numbers of FDs, both due to epoll's O(1) sized API and due to not needing to wake up polling threads to do interest set changes. One notable difference here is that we directly attach a wakeup fd rather than using the freelisting kick mechanism that the poll() based implementations use, because modifying the epoll set to use a different kick fd each time isn't free. --- Makefile | 5 + build.json | 1 + include/grpc/support/port_platform.h | 2 +- .../iomgr/pollset_multipoller_with_epoll.c | 199 ++++++++++++++++++ .../pollset_multipoller_with_poll_posix.c | 6 +- src/core/iomgr/pollset_posix.c | 11 +- src/core/iomgr/pollset_posix.h | 1 + test/core/iomgr/tcp_client_posix_test.c | 1 + vsprojects/vs2013/grpc.vcxproj | 2 + vsprojects/vs2013/grpc.vcxproj.filters | 3 + vsprojects/vs2013/grpc_unsecure.vcxproj | 2 + .../vs2013/grpc_unsecure.vcxproj.filters | 3 + 12 files changed, 231 insertions(+), 5 deletions(-) create mode 100644 src/core/iomgr/pollset_multipoller_with_epoll.c diff --git a/Makefile b/Makefile index c411c07e425..8e0d649288d 100644 --- a/Makefile +++ b/Makefile @@ -1889,6 +1889,7 @@ LIBGRPC_SRC = \ src/core/iomgr/iomgr_posix.c \ src/core/iomgr/pollset_kick.c \ src/core/iomgr/pollset_multipoller_with_poll_posix.c \ + src/core/iomgr/pollset_multipoller_with_epoll.c \ src/core/iomgr/pollset_posix.c \ src/core/iomgr/pollset_windows.c \ src/core/iomgr/resolve_address.c \ @@ -2018,6 +2019,7 @@ src/core/iomgr/iomgr.c: $(OPENSSL_DEP) src/core/iomgr/iomgr_posix.c: $(OPENSSL_DEP) src/core/iomgr/pollset_kick.c: $(OPENSSL_DEP) src/core/iomgr/pollset_multipoller_with_poll_posix.c: $(OPENSSL_DEP) +src/core/iomgr/pollset_multipoller_with_epoll.c: $(OPENSSL_DEP) src/core/iomgr/pollset_posix.c: $(OPENSSL_DEP) src/core/iomgr/pollset_windows.c: $(OPENSSL_DEP) src/core/iomgr/resolve_address.c: $(OPENSSL_DEP) @@ -2169,6 +2171,7 @@ objs/$(CONFIG)/src/core/iomgr/iomgr.o: objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_kick.o: objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o: +objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_epoll.o: objs/$(CONFIG)/src/core/iomgr/pollset_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_windows.o: objs/$(CONFIG)/src/core/iomgr/resolve_address.o: @@ -2404,6 +2407,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/iomgr_posix.c \ src/core/iomgr/pollset_kick.c \ src/core/iomgr/pollset_multipoller_with_poll_posix.c \ + src/core/iomgr/pollset_multipoller_with_epoll.c \ src/core/iomgr/pollset_posix.c \ src/core/iomgr/pollset_windows.c \ src/core/iomgr/resolve_address.c \ @@ -2538,6 +2542,7 @@ objs/$(CONFIG)/src/core/iomgr/iomgr.o: objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_kick.o: objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o: +objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_epoll.o: objs/$(CONFIG)/src/core/iomgr/pollset_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_windows.o: objs/$(CONFIG)/src/core/iomgr/resolve_address.o: diff --git a/build.json b/build.json index c9a83f56c17..2681cf5064c 100644 --- a/build.json +++ b/build.json @@ -134,6 +134,7 @@ "src/core/iomgr/iomgr_posix.c", "src/core/iomgr/pollset_kick.c", "src/core/iomgr/pollset_multipoller_with_poll_posix.c", + "src/core/iomgr/pollset_multipoller_with_epoll.c", "src/core/iomgr/pollset_posix.c", "src/core/iomgr/pollset_windows.c", "src/core/iomgr/resolve_address.c", diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index e99099c651c..69a2f9c6840 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -71,7 +71,7 @@ #define GPR_CPU_LINUX 1 #define GPR_GCC_ATOMIC 1 #define GPR_LINUX 1 -#define GPR_POSIX_MULTIPOLL_WITH_POLL 1 +#define GPR_POSIX_MULTIPOLL_WITH_EPOLL 1 #define GPR_POSIX_WAKEUP_FD 1 #define GPR_LINUX_EVENTFD 1 #define GPR_POSIX_SOCKET 1 diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c new file mode 100644 index 00000000000..14c038e7afa --- /dev/null +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -0,0 +1,199 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_POSIX_MULTIPOLL_WITH_EPOLL + +#include +#include +#include + +#include "src/core/iomgr/fd_posix.h" +#include +#include + +typedef struct { + int epoll_fd; + grpc_wakeup_fd_info wakeup_fd; +} pollset_hdr; + +static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, + grpc_fd *fd) { + pollset_hdr *h = pollset->data.ptr; + struct epoll_event ev; + int err; + + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = fd; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (err < 0) { + /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ + if (errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } + } +} + +static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, + grpc_fd *fd) { + pollset_hdr *h = pollset->data.ptr; + int err; + /* Note that this can race with concurrent poll, but that should be fine since + * at worst it creates a spurious read event on a reused grpc_fd object. */ + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd, + strerror(errno)); + } +} + +/* TODO(klempner): We probably want to turn this down a bit */ +#define GRPC_EPOLL_MAX_EVENTS 1000 + +static int multipoll_with_epoll_pollset_maybe_work( + grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; + int ep_rv; + pollset_hdr *h = pollset->data.ptr; + int timeout_ms; + + /* If you want to ignore epoll's ability to sanely handle parallel pollers, + * for a more apples-to-apples performance comparison with poll, add a + * if (pollset->counter == 0) { return 0 } + * here. + */ + + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout_ms = -1; + } else { + timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout_ms <= 0) { + return 1; + } + } + pollset->counter += 1; + gpr_mu_unlock(&pollset->mu); + + do { + ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms); + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); + } + } else { + int i; + for (i = 0; i < ep_rv; ++i) { + if (ep_ev[i].data.ptr == 0) { + grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd); + } else { + grpc_fd *fd = ep_ev[i].data.ptr; + /* TODO(klempner): We might want to consider making err and pri + * separate events */ + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write = ep_ev[i].events & EPOLLOUT; + if (read || cancel) { + grpc_fd_become_readable(fd, allow_synchronous_callback); + } + if (write || cancel) { + grpc_fd_become_writable(fd, allow_synchronous_callback); + } + } + } + } + timeout_ms = 0; + } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); + + gpr_mu_lock(&pollset->mu); + pollset->counter -= 1; + /* TODO(klempner): This should be a signal and not a broadcast, althoughit + * probably doesn't matter because */ + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { + pollset_hdr *h = pollset->data.ptr; + gpr_free(h); +} + +static void epoll_kick(grpc_pollset *pollset) { + pollset_hdr *h = pollset->data.ptr; + grpc_wakeup_fd_wakeup(&h->wakeup_fd); +} + +static const grpc_pollset_vtable multipoll_with_epoll_pollset = { + multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, + multipoll_with_epoll_pollset_maybe_work, epoll_kick, + multipoll_with_epoll_pollset_destroy}; + +void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, + size_t nfds) { + size_t i; + pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + struct epoll_event ev; + int err; + + pollset->vtable = &multipoll_with_epoll_pollset; + pollset->data.ptr = h; + h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (h->epoll_fd < 0) { + /* TODO(klempner): Fall back to poll here, especially on ENOSYS */ + gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); + abort(); + } + for (i = 0; i < nfds; i++) { + if (grpc_fd_is_orphaned(fds[i])) { + /* This should not happen, remove before merging upstream because this is + * better fixed in unary poller */ + grpc_fd_unref(fds[i]); + } else { + multipoll_with_epoll_pollset_add_fd(pollset, fds[i]); + } + } + + grpc_wakeup_fd_create(&h->wakeup_fd); + ev.events = EPOLLIN; + ev.data.ptr = 0; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, + GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno)); + abort(); + } +} + +#endif /* GPR_POSIX_MULTIPOLL_WITH_EPOLL */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 3244ae08db5..c136ee0b528 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -200,6 +200,10 @@ static int multipoll_with_poll_pollset_maybe_work( return 1; } +static void multipoll_with_poll_pollset_kick(grpc_pollset *p) { + grpc_pollset_kick_kick(&p->kick_state); +} + static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { size_t i; pollset_hdr *h = pollset->data.ptr; @@ -219,7 +223,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { static const grpc_pollset_vtable multipoll_with_poll_pollset = { multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, - multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_kick, multipoll_with_poll_pollset_destroy}; void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index b0404b870b5..ceedfbd51fc 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -76,7 +76,7 @@ static void backup_poller(void *p) { void grpc_pollset_kick(grpc_pollset *p) { if (p->counter) { - grpc_pollset_kick_kick(&p->kick_state); + p->vtable->kick(p); } } @@ -84,6 +84,10 @@ void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); } +static void kick_using_pollset_kick(grpc_pollset *p) { + grpc_pollset_kick_kick(&p->kick_state); +} + /* global state management */ grpc_pollset *grpc_backup_pollset(void) { return &g_backup_pollset; } @@ -186,7 +190,7 @@ static void empty_pollset_destroy(grpc_pollset *pollset) {} static const grpc_pollset_vtable empty_pollset = { empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work, - empty_pollset_destroy}; + kick_using_pollset_kick, empty_pollset_destroy}; static void become_empty_pollset(grpc_pollset *pollset) { pollset->vtable = &empty_pollset; @@ -289,7 +293,8 @@ static void unary_poll_pollset_destroy(grpc_pollset *pollset) { static const grpc_pollset_vtable unary_poll_pollset = { unary_poll_pollset_add_fd, unary_poll_pollset_del_fd, - unary_poll_pollset_maybe_work, unary_poll_pollset_destroy}; + unary_poll_pollset_maybe_work, kick_using_pollset_kick, + unary_poll_pollset_destroy}; static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) { pollset->vtable = &unary_poll_pollset; diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index cdcb9951675..b1a82fccfe7 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -66,6 +66,7 @@ struct grpc_pollset_vtable { void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback); + void (*kick)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 00b10f93648..78709f47fbc 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -171,6 +171,7 @@ void test_times_out(void) { int main(void) { grpc_iomgr_init(); test_succeeds(); + gpr_log(GPR_ERROR, "End of first test"); test_fails(); test_times_out(); grpc_iomgr_shutdown(); diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index c6f2846e317..0050d8cec3b 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -261,6 +261,8 @@ + + diff --git a/vsprojects/vs2013/grpc.vcxproj.filters b/vsprojects/vs2013/grpc.vcxproj.filters index ce76dd8d2b9..26b4aadf147 100644 --- a/vsprojects/vs2013/grpc.vcxproj.filters +++ b/vsprojects/vs2013/grpc.vcxproj.filters @@ -124,6 +124,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index c6f2846e317..0050d8cec3b 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -261,6 +261,8 @@ + + diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters index 0e942219d07..80525445f27 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters @@ -85,6 +85,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr