Epoll based multipoller

This is a multipoller based on epoll rather than poll.

Note that this implementation is aimed at correctness rather than
performance, although it should immediately have better scalability to
large numbers of FDs, both due to epoll's O(1) sized API and due to not
needing to wake up polling threads to do interest set changes.

One notable difference here is that we directly attach a wakeup fd
rather than using the freelisting kick mechanism that the poll() based
implementations use, because modifying the epoll set to use a different
kick fd each time isn't free.
pull/475/head
David Klempner 10 years ago
parent a81196c361
commit baced4def0
  1. 5
      Makefile
  2. 1
      build.json
  3. 2
      include/grpc/support/port_platform.h
  4. 199
      src/core/iomgr/pollset_multipoller_with_epoll.c
  5. 6
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  6. 11
      src/core/iomgr/pollset_posix.c
  7. 1
      src/core/iomgr/pollset_posix.h
  8. 1
      test/core/iomgr/tcp_client_posix_test.c
  9. 2
      vsprojects/vs2013/grpc.vcxproj
  10. 3
      vsprojects/vs2013/grpc.vcxproj.filters
  11. 2
      vsprojects/vs2013/grpc_unsecure.vcxproj
  12. 3
      vsprojects/vs2013/grpc_unsecure.vcxproj.filters

@ -1889,6 +1889,7 @@ LIBGRPC_SRC = \
src/core/iomgr/iomgr_posix.c \
src/core/iomgr/pollset_kick.c \
src/core/iomgr/pollset_multipoller_with_poll_posix.c \
src/core/iomgr/pollset_multipoller_with_epoll.c \
src/core/iomgr/pollset_posix.c \
src/core/iomgr/pollset_windows.c \
src/core/iomgr/resolve_address.c \
@ -2018,6 +2019,7 @@ src/core/iomgr/iomgr.c: $(OPENSSL_DEP)
src/core/iomgr/iomgr_posix.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_kick.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_multipoller_with_poll_posix.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_multipoller_with_epoll.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_posix.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_windows.c: $(OPENSSL_DEP)
src/core/iomgr/resolve_address.c: $(OPENSSL_DEP)
@ -2169,6 +2171,7 @@ objs/$(CONFIG)/src/core/iomgr/iomgr.o:
objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_kick.o:
objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_epoll.o:
objs/$(CONFIG)/src/core/iomgr/pollset_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_windows.o:
objs/$(CONFIG)/src/core/iomgr/resolve_address.o:
@ -2404,6 +2407,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/iomgr/iomgr_posix.c \
src/core/iomgr/pollset_kick.c \
src/core/iomgr/pollset_multipoller_with_poll_posix.c \
src/core/iomgr/pollset_multipoller_with_epoll.c \
src/core/iomgr/pollset_posix.c \
src/core/iomgr/pollset_windows.c \
src/core/iomgr/resolve_address.c \
@ -2538,6 +2542,7 @@ objs/$(CONFIG)/src/core/iomgr/iomgr.o:
objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_kick.o:
objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_epoll.o:
objs/$(CONFIG)/src/core/iomgr/pollset_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_windows.o:
objs/$(CONFIG)/src/core/iomgr/resolve_address.o:

@ -134,6 +134,7 @@
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/pollset_kick.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_posix.c",
"src/core/iomgr/pollset_windows.c",
"src/core/iomgr/resolve_address.c",

@ -71,7 +71,7 @@
#define GPR_CPU_LINUX 1
#define GPR_GCC_ATOMIC 1
#define GPR_LINUX 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_MULTIPOLL_WITH_EPOLL 1
#define GPR_POSIX_WAKEUP_FD 1
#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1

@ -0,0 +1,199 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_MULTIPOLL_WITH_EPOLL
#include <errno.h>
#include <string.h>
#include <sys/epoll.h>
#include "src/core/iomgr/fd_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
typedef struct {
int epoll_fd;
grpc_wakeup_fd_info wakeup_fd;
} pollset_hdr;
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = fd;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0) {
/* FDs may be added to a pollset multiple times, so EEXIST is normal. */
if (errno != EEXIST) {
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
strerror(errno));
}
}
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
int err;
/* Note that this can race with concurrent poll, but that should be fine since
* at worst it creates a spurious read event on a reused grpc_fd object. */
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
if (err < 0) {
gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd,
strerror(errno));
}
}
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
static int multipoll_with_epoll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
pollset_hdr *h = pollset->data.ptr;
int timeout_ms;
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
* if (pollset->counter == 0) { return 0 }
* here.
*/
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout_ms = -1;
} else {
timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now));
if (timeout_ms <= 0) {
return 1;
}
}
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
}
} else {
int i;
for (i = 0; i < ep_rv; ++i) {
if (ep_ev[i].data.ptr == 0) {
grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd);
} else {
grpc_fd *fd = ep_ev[i].data.ptr;
/* TODO(klempner): We might want to consider making err and pri
* separate events */
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write = ep_ev[i].events & EPOLLOUT;
if (read || cancel) {
grpc_fd_become_readable(fd, allow_synchronous_callback);
}
if (write || cancel) {
grpc_fd_become_writable(fd, allow_synchronous_callback);
}
}
}
}
timeout_ms = 0;
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
gpr_mu_lock(&pollset->mu);
pollset->counter -= 1;
/* TODO(klempner): This should be a signal and not a broadcast, althoughit
* probably doesn't matter because */
gpr_cv_broadcast(&pollset->cv);
return 1;
}
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
gpr_free(h);
}
static void epoll_kick(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
grpc_wakeup_fd_wakeup(&h->wakeup_fd);
}
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
multipoll_with_epoll_pollset_maybe_work, epoll_kick,
multipoll_with_epoll_pollset_destroy};
void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
struct epoll_event ev;
int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (h->epoll_fd < 0) {
/* TODO(klempner): Fall back to poll here, especially on ENOSYS */
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
for (i = 0; i < nfds; i++) {
if (grpc_fd_is_orphaned(fds[i])) {
/* This should not happen, remove before merging upstream because this is
* better fixed in unary poller */
grpc_fd_unref(fds[i]);
} else {
multipoll_with_epoll_pollset_add_fd(pollset, fds[i]);
}
}
grpc_wakeup_fd_create(&h->wakeup_fd);
ev.events = EPOLLIN;
ev.data.ptr = 0;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev);
if (err < 0) {
gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno));
abort();
}
}
#endif /* GPR_POSIX_MULTIPOLL_WITH_EPOLL */

@ -200,6 +200,10 @@ static int multipoll_with_poll_pollset_maybe_work(
return 1;
}
static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
grpc_pollset_kick_kick(&p->kick_state);
}
static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
@ -219,7 +223,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_destroy};
void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,

@ -76,7 +76,7 @@ static void backup_poller(void *p) {
void grpc_pollset_kick(grpc_pollset *p) {
if (p->counter) {
grpc_pollset_kick_kick(&p->kick_state);
p->vtable->kick(p);
}
}
@ -84,6 +84,10 @@ void grpc_pollset_force_kick(grpc_pollset *p) {
grpc_pollset_kick_kick(&p->kick_state);
}
static void kick_using_pollset_kick(grpc_pollset *p) {
grpc_pollset_kick_kick(&p->kick_state);
}
/* global state management */
grpc_pollset *grpc_backup_pollset(void) { return &g_backup_pollset; }
@ -186,7 +190,7 @@ static void empty_pollset_destroy(grpc_pollset *pollset) {}
static const grpc_pollset_vtable empty_pollset = {
empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work,
empty_pollset_destroy};
kick_using_pollset_kick, empty_pollset_destroy};
static void become_empty_pollset(grpc_pollset *pollset) {
pollset->vtable = &empty_pollset;
@ -289,7 +293,8 @@ static void unary_poll_pollset_destroy(grpc_pollset *pollset) {
static const grpc_pollset_vtable unary_poll_pollset = {
unary_poll_pollset_add_fd, unary_poll_pollset_del_fd,
unary_poll_pollset_maybe_work, unary_poll_pollset_destroy};
unary_poll_pollset_maybe_work, kick_using_pollset_kick,
unary_poll_pollset_destroy};
static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) {
pollset->vtable = &unary_poll_pollset;

@ -66,6 +66,7 @@ struct grpc_pollset_vtable {
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
gpr_timespec now, int allow_synchronous_callback);
void (*kick)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};

@ -171,6 +171,7 @@ void test_times_out(void) {
int main(void) {
grpc_iomgr_init();
test_succeeds();
gpr_log(GPR_ERROR, "End of first test");
test_fails();
test_times_out();
grpc_iomgr_shutdown();

@ -261,6 +261,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_windows.c">

@ -124,6 +124,9 @@
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>

@ -261,6 +261,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_windows.c">

@ -85,6 +85,9 @@
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>

Loading…
Cancel
Save