Add support for eventfd based kicking on linux.

This adds support for eventfd based kicking, with the skeleton of
support for runtime selection between eventfds and pipes.
pull/157/head
David Klempner 10 years ago
parent 85eea08ba8
commit dbb4f942d0
  1. 5
      Makefile
  2. 2
      build.json
  3. 1
      include/grpc/support/port_platform.h
  4. 4
      src/core/iomgr/pollset_kick.h
  5. 85
      src/core/iomgr/pollset_kick_eventfd.c
  6. 42
      src/core/iomgr/pollset_kick_eventfd.h
  7. 172
      src/core/iomgr/pollset_kick_posix.c
  8. 15
      src/core/iomgr/pollset_kick_posix.h
  9. 16
      test/core/iomgr/poll_kick_test.c
  10. 3
      vsprojects/vs2013/grpc.vcxproj
  11. 3
      vsprojects/vs2013/grpc_unsecure.vcxproj

@ -1354,6 +1354,7 @@ LIBGRPC_SRC = \
src/core/iomgr/iomgr.c \
src/core/iomgr/iomgr_posix.c \
src/core/iomgr/pollset_kick_posix.c \
src/core/iomgr/pollset_kick_eventfd.c \
src/core/iomgr/pollset_multipoller_with_poll_posix.c \
src/core/iomgr/pollset_posix.c \
src/core/iomgr/resolve_address_posix.c \
@ -1472,6 +1473,7 @@ src/core/iomgr/fd_posix.c: $(OPENSSL_DEP)
src/core/iomgr/iomgr.c: $(OPENSSL_DEP)
src/core/iomgr/iomgr_posix.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_kick_posix.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_kick_eventfd.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_multipoller_with_poll_posix.c: $(OPENSSL_DEP)
src/core/iomgr/pollset_posix.c: $(OPENSSL_DEP)
src/core/iomgr/resolve_address_posix.c: $(OPENSSL_DEP)
@ -1607,6 +1609,7 @@ objs/$(CONFIG)/src/core/iomgr/fd_posix.o:
objs/$(CONFIG)/src/core/iomgr/iomgr.o:
objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_kick_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_kick_eventfd.o:
objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_posix.o:
objs/$(CONFIG)/src/core/iomgr/resolve_address_posix.o:
@ -1762,6 +1765,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/iomgr/iomgr.c \
src/core/iomgr/iomgr_posix.c \
src/core/iomgr/pollset_kick_posix.c \
src/core/iomgr/pollset_kick_eventfd.c \
src/core/iomgr/pollset_multipoller_with_poll_posix.c \
src/core/iomgr/pollset_posix.c \
src/core/iomgr/resolve_address_posix.c \
@ -1880,6 +1884,7 @@ objs/$(CONFIG)/src/core/iomgr/fd_posix.o:
objs/$(CONFIG)/src/core/iomgr/iomgr.o:
objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_kick_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_kick_eventfd.o:
objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o:
objs/$(CONFIG)/src/core/iomgr/pollset_posix.o:
objs/$(CONFIG)/src/core/iomgr/resolve_address_posix.o:

@ -48,6 +48,7 @@
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_kick_eventfd.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/resolve_address.h",
"src/core/iomgr/sockaddr.h",
@ -124,6 +125,7 @@
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_kick_eventfd.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
"src/core/iomgr/resolve_address_posix.c",

@ -68,6 +68,7 @@
#define GPR_GCC_ATOMIC 1
#define GPR_LINUX 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_STRING 1

@ -48,6 +48,10 @@
void grpc_pollset_kick_global_init(void);
void grpc_pollset_kick_global_destroy(void);
/* Guarantees a pure posix implementation rather than a specialized one, if
* applicable. Intended for testing. */
void grpc_pollset_kick_global_init_posix(void);
void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);

@ -0,0 +1,85 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/pollset_kick_eventfd.h"
#ifdef GPR_LINUX_EVENTFD
#include <errno.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
static void eventfd_create(grpc_kick_fd_info *fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
/* TODO(klempner): Handle failure more gracefully */
GPR_ASSERT(efd >= 0);
fd_info->read_fd = efd;
fd_info->write_fd = -1;
}
static void eventfd_consume(grpc_kick_fd_info *fd_info) {
eventfd_t value;
int err;
do {
err = eventfd_read(fd_info->read_fd, &value);
} while (err < 0 && errno == EINTR);
}
static void eventfd_kick(grpc_kick_fd_info *fd_info) {
int err;
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
}
static void eventfd_destroy(grpc_kick_fd_info *fd_info) {
close(fd_info->read_fd);
}
static const grpc_pollset_kick_vtable eventfd_kick_vtable = {
eventfd_create, eventfd_consume, eventfd_kick, eventfd_destroy
};
const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void) {
/* TODO(klempner): Check that eventfd works */
return &eventfd_kick_vtable;
}
#else /* GPR_LINUX_EVENTFD not defined */
const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void) {
return NULL;
}
#endif /* GPR_LINUX_EVENTFD */

@ -0,0 +1,42 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_
#include "src/core/iomgr/pollset_kick_posix.h"
/* Tries to enable eventfd support, returns a kick vtable if successful. */
const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void);
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ */

@ -37,6 +37,7 @@
#include <string.h>
#include <unistd.h>
#include "src/core/iomgr/pollset_kick_eventfd.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -46,82 +47,58 @@
#define GRPC_MAX_CACHED_PIPES 50
#define GRPC_PIPE_LOW_WATERMARK 25
typedef struct grpc_kick_pipe_info {
int pipe_read_fd;
int pipe_write_fd;
struct grpc_kick_pipe_info *next;
} grpc_kick_pipe_info;
static grpc_kick_pipe_info *pipe_freelist = NULL;
static int pipe_freelist_count = 0;
static gpr_mu pipe_freelist_mu;
static grpc_kick_pipe_info *allocate_pipe(void) {
grpc_kick_pipe_info *info;
gpr_mu_lock(&pipe_freelist_mu);
if (pipe_freelist != NULL) {
info = pipe_freelist;
pipe_freelist = pipe_freelist->next;
--pipe_freelist_count;
static grpc_kick_fd_info *fd_freelist = NULL;
static int fd_freelist_count = 0;
static gpr_mu fd_freelist_mu;
static const grpc_pollset_kick_vtable *kick_vtable = NULL;
static grpc_kick_fd_info *allocate_pipe(void) {
grpc_kick_fd_info *info;
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist != NULL) {
info = fd_freelist;
fd_freelist = fd_freelist->next;
--fd_freelist_count;
} else {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
info = gpr_malloc(sizeof(*info));
info->pipe_read_fd = pipefd[0];
info->pipe_write_fd = pipefd[1];
kick_vtable->create(info);
info->next = NULL;
}
gpr_mu_unlock(&pipe_freelist_mu);
gpr_mu_unlock(&fd_freelist_mu);
return info;
}
static void destroy_pipe(void) {
/* assumes pipe_freelist_mu is held */
grpc_kick_pipe_info *current = pipe_freelist;
pipe_freelist = pipe_freelist->next;
pipe_freelist_count--;
close(current->pipe_read_fd);
close(current->pipe_write_fd);
/* assumes fd_freelist_mu is held */
grpc_kick_fd_info *current = fd_freelist;
fd_freelist = fd_freelist->next;
fd_freelist_count--;
kick_vtable->destroy(current);
gpr_free(current);
}
static void free_pipe(grpc_kick_pipe_info *pipe_info) {
gpr_mu_lock(&pipe_freelist_mu);
pipe_info->next = pipe_freelist;
pipe_freelist = pipe_info;
pipe_freelist_count++;
if (pipe_freelist_count > GRPC_MAX_CACHED_PIPES) {
while (pipe_freelist_count > GRPC_PIPE_LOW_WATERMARK) {
static void free_pipe(grpc_kick_fd_info *fd_info) {
gpr_mu_lock(&fd_freelist_mu);
fd_info->next = fd_freelist;
fd_freelist = fd_info;
fd_freelist_count++;
if (fd_freelist_count > GRPC_MAX_CACHED_PIPES) {
while (fd_freelist_count > GRPC_PIPE_LOW_WATERMARK) {
destroy_pipe();
}
}
gpr_mu_unlock(&pipe_freelist_mu);
}
void grpc_pollset_kick_global_init() {
pipe_freelist = NULL;
gpr_mu_init(&pipe_freelist_mu);
}
void grpc_pollset_kick_global_destroy() {
while (pipe_freelist != NULL) {
destroy_pipe();
}
gpr_mu_destroy(&pipe_freelist_mu);
gpr_mu_unlock(&fd_freelist_mu);
}
void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
gpr_mu_init(&kick_state->mu);
kick_state->kicked = 0;
kick_state->pipe_info = NULL;
kick_state->fd_info = NULL;
}
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
gpr_mu_destroy(&kick_state->mu);
GPR_ASSERT(kick_state->pipe_info == NULL);
GPR_ASSERT(kick_state->fd_info == NULL);
}
int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
@ -131,17 +108,48 @@ int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_unlock(&kick_state->mu);
return -1;
}
kick_state->pipe_info = allocate_pipe();
kick_state->fd_info = allocate_pipe();
gpr_mu_unlock(&kick_state->mu);
return kick_state->pipe_info->pipe_read_fd;
return kick_state->fd_info->read_fd;
}
void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) {
kick_vtable->consume(kick_state->fd_info);
}
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
free_pipe(kick_state->fd_info);
kick_state->fd_info = NULL;
gpr_mu_unlock(&kick_state->mu);
}
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
if (kick_state->fd_info != NULL) {
kick_vtable->kick(kick_state->fd_info);
} else {
kick_state->kicked = 1;
}
gpr_mu_unlock(&kick_state->mu);
}
static void pipe_create(grpc_kick_fd_info *fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
fd_info->read_fd = pipefd[0];
fd_info->write_fd = pipefd[1];
}
static void pipe_consume(grpc_kick_fd_info *fd_info) {
char buf[128];
int r;
for (;;) {
r = read(kick_state->pipe_info->pipe_read_fd, buf, sizeof(buf));
r = read(fd_info->read_fd, buf, sizeof(buf));
if (r > 0) continue;
if (r == 0) return;
switch (errno) {
@ -156,22 +164,44 @@ void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) {
}
}
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
free_pipe(kick_state->pipe_info);
kick_state->pipe_info = NULL;
gpr_mu_unlock(&kick_state->mu);
static void pipe_kick(grpc_kick_fd_info *fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
}
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
if (kick_state->pipe_info != NULL) {
char c = 0;
while (write(kick_state->pipe_info->pipe_write_fd, &c, 1) != 1 &&
errno == EINTR)
;
} else {
kick_state->kicked = 1;
static void pipe_destroy(grpc_kick_fd_info *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
}
static const grpc_pollset_kick_vtable pipe_kick_vtable = {
pipe_create, pipe_consume, pipe_kick, pipe_destroy
};
static void global_init_common(void) {
fd_freelist = NULL;
gpr_mu_init(&fd_freelist_mu);
}
void grpc_pollset_kick_global_init_posix(void) {
global_init_common();
kick_vtable = &pipe_kick_vtable;
}
void grpc_pollset_kick_global_init(void) {
global_init_common();
kick_vtable = grpc_pollset_kick_eventfd_init();
if (kick_vtable == NULL) {
kick_vtable = &pipe_kick_vtable;
}
gpr_mu_unlock(&kick_state->mu);
}
void grpc_pollset_kick_global_destroy(void) {
while (fd_freelist != NULL) {
destroy_pipe();
}
gpr_mu_destroy(&fd_freelist_mu);
}

@ -36,12 +36,23 @@
#include <grpc/support/sync.h>
struct grpc_kick_pipe_info;
typedef struct grpc_kick_fd_info {
int read_fd;
int write_fd;
struct grpc_kick_fd_info *next;
} grpc_kick_fd_info;
typedef struct grpc_pollset_kick_vtable {
void (*create)(struct grpc_kick_fd_info *fd_info);
void (*consume)(struct grpc_kick_fd_info *fd_info);
void (*kick)(struct grpc_kick_fd_info *fd_info);
void (*destroy)(struct grpc_kick_fd_info *fd_info);
} grpc_pollset_kick_vtable;
typedef struct grpc_pollset_kick_state {
gpr_mu mu;
int kicked;
struct grpc_kick_pipe_info *pipe_info;
struct grpc_kick_fd_info *fd_info;
} grpc_pollset_kick_state;
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ */

@ -107,17 +107,23 @@ static void test_over_free(void) {
}
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_pollset_kick_global_init();
static void run_tests(void) {
test_allocation();
test_basic_kick();
test_non_poll_kick();
test_non_kick();
test_over_free();
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_pollset_kick_global_init();
run_tests();
grpc_pollset_kick_global_destroy();
grpc_pollset_kick_global_init_posix();
run_tests();
grpc_pollset_kick_global_destroy();
return 0;
}

@ -121,6 +121,7 @@
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_eventfd.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\resolve_address.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr.h" />
@ -250,6 +251,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_eventfd.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">

@ -121,6 +121,7 @@
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_eventfd.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\resolve_address.h" />
<ClInclude Include="..\..\src\core\iomgr\sockaddr.h" />
@ -250,6 +251,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_eventfd.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_posix.c">

Loading…
Cancel
Save