Merge pull request #157 from dklempner/eventfd

Add support for eventfd based kicking on linux.
pull/255/head
Craig Tiller 10 years ago
commit 3b56a57b25
  1. 56
      Makefile
  2. 12
      build.json
  3. 2
      include/grpc/support/port_platform.h
  4. 142
      src/core/iomgr/pollset_kick.c
  5. 10
      src/core/iomgr/pollset_kick.h
  6. 10
      src/core/iomgr/pollset_kick_posix.h
  7. 4
      src/core/iomgr/pollset_kick_windows.h
  8. 82
      src/core/iomgr/wakeup_fd_eventfd.c
  9. 53
      src/core/iomgr/wakeup_fd_nospecial.c
  10. 93
      src/core/iomgr/wakeup_fd_pipe.c
  11. 41
      src/core/iomgr/wakeup_fd_pipe.h
  12. 70
      src/core/iomgr/wakeup_fd_posix.c
  13. 102
      src/core/iomgr/wakeup_fd_posix.h
  14. 16
      test/core/iomgr/poll_kick_posix_test.c
  15. 2
      tools/run_tests/tests.json
  16. 12
      vsprojects/vs2013/grpc.vcxproj
  17. 20
      vsprojects/vs2013/grpc.vcxproj.filters
  18. 12
      vsprojects/vs2013/grpc_unsecure.vcxproj
  19. 20
      vsprojects/vs2013/grpc_unsecure.vcxproj.filters

File diff suppressed because one or more lines are too long

@ -61,6 +61,8 @@
"src/core/iomgr/tcp_posix.h",
"src/core/iomgr/tcp_server.h",
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/statistics/census_interface.h",
"src/core/statistics/census_log.h",
"src/core/statistics/census_rpc_stats.h",
@ -125,7 +127,7 @@
"src/core/iomgr/fd_posix.c",
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_kick.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
"src/core/iomgr/pollset_windows.c",
@ -138,6 +140,10 @@
"src/core/iomgr/tcp_posix.c",
"src/core/iomgr/tcp_server_posix.c",
"src/core/iomgr/time_averaged_stats.c",
"src/core/iomgr/wakeup_fd_eventfd.c",
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
"src/core/iomgr/wakeup_fd_posix.c",
"src/core/statistics/census_init.c",
"src/core/statistics/census_log.c",
"src/core/statistics/census_rpc_stats.c",
@ -1262,11 +1268,11 @@
]
},
{
"name": "poll_kick_test",
"name": "poll_kick_posix_test",
"build": "test",
"language": "c",
"src": [
"test/core/iomgr/poll_kick_test.c"
"test/core/iomgr/poll_kick_posix_test.c"
],
"deps": [
"grpc_test_util",

@ -68,6 +68,8 @@
#define GPR_GCC_ATOMIC 1
#define GPR_LINUX 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_HAS_SPECIAL_WAKEUP_FD 1
#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_STRING 1

@ -34,98 +34,74 @@
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_kick_posix.h"
#include "src/core/iomgr/pollset_kick.h"
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
/* This implementation is based on a freelist of pipes. */
#define GRPC_MAX_CACHED_PIPES 50
#define GRPC_PIPE_LOW_WATERMARK 25
/* This implementation is based on a freelist of wakeup fds, with extra logic to
* handle kicks while there is no attached fd. */
typedef struct grpc_kick_pipe_info {
int pipe_read_fd;
int pipe_write_fd;
struct grpc_kick_pipe_info *next;
} grpc_kick_pipe_info;
#define GRPC_MAX_CACHED_WFDS 50
#define GRPC_WFD_LOW_WATERMARK 25
static grpc_kick_pipe_info *pipe_freelist = NULL;
static int pipe_freelist_count = 0;
static gpr_mu pipe_freelist_mu;
static grpc_kick_fd_info *fd_freelist = NULL;
static int fd_freelist_count = 0;
static gpr_mu fd_freelist_mu;
static grpc_kick_pipe_info *allocate_pipe(void) {
grpc_kick_pipe_info *info;
gpr_mu_lock(&pipe_freelist_mu);
if (pipe_freelist != NULL) {
info = pipe_freelist;
pipe_freelist = pipe_freelist->next;
--pipe_freelist_count;
static grpc_kick_fd_info *allocate_wfd(void) {
grpc_kick_fd_info *info;
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist != NULL) {
info = fd_freelist;
fd_freelist = fd_freelist->next;
--fd_freelist_count;
} else {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
info = gpr_malloc(sizeof(*info));
info->pipe_read_fd = pipefd[0];
info->pipe_write_fd = pipefd[1];
grpc_wakeup_fd_create(&info->wakeup_fd);
info->next = NULL;
}
gpr_mu_unlock(&pipe_freelist_mu);
gpr_mu_unlock(&fd_freelist_mu);
return info;
}
static void destroy_pipe(void) {
/* assumes pipe_freelist_mu is held */
grpc_kick_pipe_info *current = pipe_freelist;
pipe_freelist = pipe_freelist->next;
pipe_freelist_count--;
close(current->pipe_read_fd);
close(current->pipe_write_fd);
static void destroy_wfd(void) {
/* assumes fd_freelist_mu is held */
grpc_kick_fd_info *current = fd_freelist;
fd_freelist = fd_freelist->next;
fd_freelist_count--;
grpc_wakeup_fd_destroy(&current->wakeup_fd);
gpr_free(current);
}
static void free_pipe(grpc_kick_pipe_info *pipe_info) {
gpr_mu_lock(&pipe_freelist_mu);
pipe_info->next = pipe_freelist;
pipe_freelist = pipe_info;
pipe_freelist_count++;
if (pipe_freelist_count > GRPC_MAX_CACHED_PIPES) {
while (pipe_freelist_count > GRPC_PIPE_LOW_WATERMARK) {
destroy_pipe();
static void free_wfd(grpc_kick_fd_info *fd_info) {
gpr_mu_lock(&fd_freelist_mu);
fd_info->next = fd_freelist;
fd_freelist = fd_info;
fd_freelist_count++;
if (fd_freelist_count > GRPC_MAX_CACHED_WFDS) {
while (fd_freelist_count > GRPC_WFD_LOW_WATERMARK) {
destroy_wfd();
}
}
gpr_mu_unlock(&pipe_freelist_mu);
}
void grpc_pollset_kick_global_init() {
pipe_freelist = NULL;
gpr_mu_init(&pipe_freelist_mu);
}
void grpc_pollset_kick_global_destroy() {
while (pipe_freelist != NULL) {
destroy_pipe();
}
gpr_mu_destroy(&pipe_freelist_mu);
gpr_mu_unlock(&fd_freelist_mu);
}
void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
gpr_mu_init(&kick_state->mu);
kick_state->kicked = 0;
kick_state->pipe_info = NULL;
kick_state->fd_info = NULL;
}
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
gpr_mu_destroy(&kick_state->mu);
GPR_ASSERT(kick_state->pipe_info == NULL);
GPR_ASSERT(kick_state->fd_info == NULL);
}
int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
@ -135,49 +111,43 @@ int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_unlock(&kick_state->mu);
return -1;
}
kick_state->pipe_info = allocate_pipe();
kick_state->fd_info = allocate_wfd();
gpr_mu_unlock(&kick_state->mu);
return kick_state->pipe_info->pipe_read_fd;
return GRPC_WAKEUP_FD_GET_READ_FD(&kick_state->fd_info->wakeup_fd);
}
void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) {
char buf[128];
int r;
for (;;) {
r = read(kick_state->pipe_info->pipe_read_fd, buf, sizeof(buf));
if (r > 0) continue;
if (r == 0) return;
switch (errno) {
case EAGAIN:
return;
case EINTR:
continue;
default:
gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
return;
}
}
grpc_wakeup_fd_consume_wakeup(&kick_state->fd_info->wakeup_fd);
}
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
free_pipe(kick_state->pipe_info);
kick_state->pipe_info = NULL;
free_wfd(kick_state->fd_info);
kick_state->fd_info = NULL;
gpr_mu_unlock(&kick_state->mu);
}
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
if (kick_state->pipe_info != NULL) {
char c = 0;
while (write(kick_state->pipe_info->pipe_write_fd, &c, 1) != 1 &&
errno == EINTR)
;
if (kick_state->fd_info != NULL) {
grpc_wakeup_fd_wakeup(&kick_state->fd_info->wakeup_fd);
} else {
kick_state->kicked = 1;
}
gpr_mu_unlock(&kick_state->mu);
}
#endif
void grpc_pollset_kick_global_init_fallback_fd(void) {
grpc_wakeup_fd_global_init_force_fallback();
}
void grpc_pollset_kick_global_init(void) {
grpc_wakeup_fd_global_init();
}
void grpc_pollset_kick_global_destroy(void) {
grpc_wakeup_fd_global_destroy();
}
#endif /* GPR_POSIX_SOCKET */

@ -36,9 +36,6 @@
#include <grpc/support/port_platform.h>
/* This is an abstraction around the typical pipe mechanism for waking up a
thread sitting in a poll() style call. */
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_kick_posix.h"
#endif
@ -47,12 +44,19 @@
#include "src/core/iomgr/pollset_kick_windows.h"
#endif
/* This is an abstraction around the typical pipe mechanism for waking up a
thread sitting in a poll() style call. */
void grpc_pollset_kick_global_init(void);
void grpc_pollset_kick_global_destroy(void);
void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
/* Guarantees a pure posix implementation rather than a specialized one, if
* applicable. Intended for testing. */
void grpc_pollset_kick_global_init_fallback_fd(void);
/* Must be called before entering poll(). If return value is -1, this consumed
an existing kick. Otherwise the return value is an FD to add to the poll set.
*/

@ -34,14 +34,18 @@
#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/sync.h>
struct grpc_kick_pipe_info;
typedef struct grpc_kick_fd_info {
grpc_wakeup_fd_info wakeup_fd;
struct grpc_kick_fd_info *next;
} grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state {
gpr_mu mu;
int kicked;
struct grpc_kick_pipe_info *pipe_info;
struct grpc_kick_fd_info *fd_info;
} grpc_pollset_kick_state;
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ */
#endif /* __GRPC_INTERNALIOMGR_POLLSET_KICK_POSIX_H_ */

@ -36,10 +36,10 @@
#include <grpc/support/sync.h>
struct grpc_kick_pipe_info;
struct grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state {
int unused;
} grpc_pollset_kick_state;
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_WINDOWS_H_ */
#endif /* __GRPC_INTERNALIOMGR_POLLSET_KICK_WINDOWS_H_ */

@ -0,0 +1,82 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_LINUX_EVENTFD
#include <errno.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/log.h>
static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
/* TODO(klempner): Handle failure more gracefully */
GPR_ASSERT(efd >= 0);
fd_info->read_fd = efd;
fd_info->write_fd = -1;
}
static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
eventfd_t value;
int err;
do {
err = eventfd_read(fd_info->read_fd, &value);
} while (err < 0 && errno == EINTR);
}
static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) {
int err;
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
}
static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) {
close(fd_info->read_fd);
}
static int eventfd_check_availability(void) {
/* TODO(klempner): Actually check if eventfd is available */
return 1;
}
const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = {
eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy,
eventfd_check_availability
};
#endif /* GPR_LINUX_EVENTFD */

@ -0,0 +1,53 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* This is a dummy file to provide an invalid specialized_wakeup_fd_vtable on
* systems without anything better than pipe.
*/
#include <grpc/support/port_platform.h>
#ifndef GPR_POSIX_HAS_SPECIAL_WAKEUP_FD
#include "src/core/iomgr/wakeup_fd.h"
static int check_availability_invalid(void) {
return 0;
}
const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = {
NULL, NULL, NULL, NULL, check_availability_invalid
};
#endif /* GPR_POSIX_HAS_SPECIAL_WAKEUP */

@ -0,0 +1,93 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/* TODO(klempner): Allow this code to be disabled. */
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/log.h>
static void pipe_create(grpc_wakeup_fd_info *fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
fd_info->read_fd = pipefd[0];
fd_info->write_fd = pipefd[1];
}
static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
char buf[128];
int r;
for (;;) {
r = read(fd_info->read_fd, buf, sizeof(buf));
if (r > 0) continue;
if (r == 0) return;
switch (errno) {
case EAGAIN:
return;
case EINTR:
continue;
default:
gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
return;
}
}
}
static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
}
static void pipe_destroy(grpc_wakeup_fd_info *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
}
static int pipe_check_availability(void) {
/* Assume that pipes are always available. */
return 1;
}
const grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable = {
pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
};

@ -0,0 +1,41 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_
#define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_
#include "src/core/iomgr/wakeup_fd_posix.h"
extern grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable;
#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ */

@ -0,0 +1,70 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/wakeup_fd_posix.h"
#include "src/core/iomgr/wakeup_fd_pipe.h"
#include <stddef.h>
static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL;
void grpc_wakeup_fd_global_init(void) {
if (specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &specialized_wakeup_fd_vtable;
} else {
wakeup_fd_vtable = &pipe_wakeup_fd_vtable;
}
}
void grpc_wakeup_fd_global_init_force_fallback(void) {
wakeup_fd_vtable = &pipe_wakeup_fd_vtable;
}
void grpc_wakeup_fd_global_destroy(void) {
wakeup_fd_vtable = NULL;
}
void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->create(fd_info);
}
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->consume(fd_info);
}
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->wakeup(fd_info);
}
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}

@ -0,0 +1,102 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* wakeup_fd abstracts the concept of a file descriptor for the purpose of
* waking up a thread in select()/poll()/epoll_wait()/etc.
* The poll() family of system calls provide a way for a thread to block until
* there is activity on one (or more) of a set of file descriptors. An
* application may wish to wake up this thread to do non file related work. The
* typical way to do this is to add a pipe to the set of file descriptors, then
* write to the pipe to wake up the thread in poll().
*
* Linux has a lighter weight eventfd specifically designed for this purpose.
* wakeup_fd abstracts the difference between the two.
*
* Setup:
* 1. Before calling anything, call global_init() at least once.
* 1. Call grpc_wakeup_fd_create() to get a wakeup_fd.
* 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file
* descriptors for the poll() style API you are using. Monitor the file
* descriptor for readability.
* 3. To tear down, call grpc_wakeup_fd_destroy(). This closes the underlying
* file descriptor.
*
* Usage:
* 1. To wake up a polling thread, call grpc_wakeup_fd_wakeup() on a wakeup_fd
* it is monitoring.
* 2. If the polling thread was awakened by a wakeup_fd event, call
* grpc_wakeup_fd_consume_wakeup() on it.
*/
#ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_
#define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_
typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
void grpc_wakeup_fd_global_init(void);
void grpc_wakeup_fd_global_destroy(void);
void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
/* Force using the fallback implementation. This is intended for testing
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
/* Private structures; don't access their fields directly outside of wakeup fd
* code. */
struct grpc_wakeup_fd_info {
int read_fd;
int write_fd;
};
typedef struct grpc_wakeup_fd_vtable {
void (*create)(grpc_wakeup_fd_info *fd_info);
void (*consume)(grpc_wakeup_fd_info *fd_info);
void (*wakeup)(grpc_wakeup_fd_info *fd_info);
void (*destroy)(grpc_wakeup_fd_info *fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */
extern const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable;
#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_ */

@ -107,17 +107,23 @@ static void test_over_free(void) {
}
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_pollset_kick_global_init();
static void run_tests(void) {
test_allocation();
test_basic_kick();
test_non_poll_kick();
test_non_kick();
test_over_free();
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_pollset_kick_global_init();
run_tests();
grpc_pollset_kick_global_destroy();
grpc_pollset_kick_global_init_fallback_fd();
run_tests();
grpc_pollset_kick_global_destroy();
return 0;
}

@ -207,7 +207,7 @@
},
{
"language": "c",
"name": "poll_kick_test"
"name": "poll_kick_posix_test"
},
{
"language": "c",

@ -134,6 +134,8 @@
<ClInclude Include="..\..\src\core\iomgr\tcp_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\tcp_server.h" />
<ClInclude Include="..\..\src\core\iomgr\time_averaged_stats.h" />
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="..\..\src\core\statistics\census_interface.h" />
<ClInclude Include="..\..\src\core\statistics\census_log.h" />
<ClInclude Include="..\..\src\core\statistics\census_rpc_stats.h" />
@ -250,7 +252,7 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
<ClCompile Include="..\..\src\core\iomgr\pollset_kick.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
</ClCompile>
@ -276,6 +278,14 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\time_averaged_stats.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_nospecial.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_pipe.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\statistics\census_init.c">
</ClCompile>
<ClCompile Include="..\..\src\core\statistics\census_log.c">

@ -118,7 +118,7 @@
<ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
<ClCompile Include="..\..\src\core\iomgr\pollset_kick.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
@ -157,6 +157,18 @@
<ClCompile Include="..\..\src\core\iomgr\time_averaged_stats.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_nospecial.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_pipe.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\statistics\census_init.c">
<Filter>src\core\statistics</Filter>
</ClCompile>
@ -464,6 +476,12 @@
<ClInclude Include="..\..\src\core\iomgr\time_averaged_stats.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_posix.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\statistics\census_interface.h">
<Filter>src\core\statistics</Filter>
</ClInclude>

@ -134,6 +134,8 @@
<ClInclude Include="..\..\src\core\iomgr\tcp_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\tcp_server.h" />
<ClInclude Include="..\..\src\core\iomgr\time_averaged_stats.h" />
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="..\..\src\core\statistics\census_interface.h" />
<ClInclude Include="..\..\src\core\statistics\census_log.h" />
<ClInclude Include="..\..\src\core\statistics\census_rpc_stats.h" />
@ -250,7 +252,7 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
<ClCompile Include="..\..\src\core\iomgr\pollset_kick.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
</ClCompile>
@ -276,6 +278,14 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\time_averaged_stats.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_nospecial.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_pipe.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\statistics\census_init.c">
</ClCompile>
<ClCompile Include="..\..\src\core\statistics\census_log.c">

@ -79,7 +79,7 @@
<ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
<ClCompile Include="..\..\src\core\iomgr\pollset_kick.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">
@ -118,6 +118,18 @@
<ClCompile Include="..\..\src\core\iomgr\time_averaged_stats.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_nospecial.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_pipe.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\wakeup_fd_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\statistics\census_init.c">
<Filter>src\core\statistics</Filter>
</ClCompile>
@ -389,6 +401,12 @@
<ClInclude Include="..\..\src\core\iomgr\time_averaged_stats.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_posix.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\statistics\census_interface.h">
<Filter>src\core\statistics</Filter>
</ClInclude>

Loading…
Cancel
Save