Merge pull request #8862 from ctiller/fixit1

Fix mac build
pull/8864/head
Craig Tiller 8 years ago committed by GitHub
commit 87ce813efd
  1. 26
      src/core/lib/iomgr/ev_epoll_linux.c
  2. 12
      src/core/lib/iomgr/ev_poll_posix.c
  3. 1
      src/core/lib/iomgr/ev_posix.h

@ -72,6 +72,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */
static int grpc_wakeup_signal = -1; static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false; static bool is_grpc_wakeup_signal_initialized = false;
/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
* sure to wake up one polling thread (which can wake up other threads if
* needed) */
static grpc_wakeup_fd global_wakeup_fd;
/* Implements the function defined in grpc_posix.h. This function might be /* Implements the function defined in grpc_posix.h. This function might be
* called before even calling grpc_init() to set either a different signal to * called before even calling grpc_init() to set either a different signal to
* use. If signum == -1, then the use of signals is disabled */ * use. If signum == -1, then the use of signals is disabled */
@ -438,7 +443,7 @@ static void polling_island_add_wakeup_fd_locked(polling_island *pi,
"epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with " "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
"error: %d (%s)", "error: %d (%s)",
pi->epoll_fd, pi->epoll_fd,
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd), errno,
strerror(errno)); strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg); gpr_free(err_msg);
@ -541,7 +546,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done; goto done;
} }
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) { if (initial_fd != NULL) {
@ -843,11 +848,6 @@ static void polling_island_global_shutdown() {
* alarm 'epoch'). This wakeup_fd gives us something to alert on when such a * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
* case occurs. */ * case occurs. */
/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
* sure to wake up one polling thread (which can wake up other threads if
* needed) */
grpc_wakeup_fd grpc_global_wakeup_fd;
static grpc_fd *fd_freelist = NULL; static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu; static gpr_mu fd_freelist_mu;
@ -1163,11 +1163,11 @@ static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker); gpr_tls_init(&g_current_thread_worker);
poller_kick_init(); poller_kick_init();
return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); return grpc_wakeup_fd_init(&global_wakeup_fd);
} }
static void pollset_global_shutdown(void) { static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker); gpr_tls_destroy(&g_current_thread_worker);
} }
@ -1274,7 +1274,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
} }
static grpc_error *kick_poller(void) { static grpc_error *kick_poller(void) {
return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} }
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@ -1501,13 +1501,13 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) { for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr; void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &grpc_global_wakeup_fd) { if (data_ptr == &global_wakeup_fd) {
append_error(error, append_error(error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc); err_desc);
} else if (data_ptr == &pi->workqueue_wakeup_fd) { } else if (data_ptr == &pi->workqueue_wakeup_fd) {
append_error(error, append_error(error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc); err_desc);
maybe_do_workqueue_work(exec_ctx, pi); maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) { } else if (data_ptr == &polling_island_wakeup_fd) {

@ -120,6 +120,8 @@ struct grpc_fd {
grpc_pollset *read_notifier_pollset; grpc_pollset *read_notifier_pollset;
}; };
static grpc_wakeup_fd global_wakeup_fd;
/* Begin polling on an fd. /* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that or writability interest changes, the pollset can be kicked to pick up that
@ -769,17 +771,17 @@ static grpc_error *pollset_kick(grpc_pollset *p,
static grpc_error *pollset_global_init(void) { static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker); gpr_tls_init(&g_current_thread_worker);
return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); return grpc_wakeup_fd_init(&global_wakeup_fd);
} }
static void pollset_global_shutdown(void) { static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker); gpr_tls_destroy(&g_current_thread_worker);
} }
static grpc_error *kick_poller(void) { static grpc_error *kick_poller(void) {
return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} }
/* main interface */ /* main interface */
@ -947,7 +949,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
fd_count = 0; fd_count = 0;
pfd_count = 2; pfd_count = 2;
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd);
pfds[0].events = POLLIN; pfds[0].events = POLLIN;
pfds[0].revents = 0; pfds[0].revents = 0;
pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd); pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
@ -1002,7 +1004,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
} else { } else {
if (pfds[0].revents & POLLIN_CHECK) { if (pfds[0].revents & POLLIN_CHECK) {
work_combine_error( work_combine_error(
&error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); &error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
} }
if (pfds[1].revents & POLLIN_CHECK) { if (pfds[1].revents & POLLIN_CHECK) {
work_combine_error( work_combine_error(

@ -183,6 +183,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
/* override to allow tests to hook poll() usage */ /* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function; extern grpc_poll_function_type grpc_poll_function;
extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */

Loading…
Cancel
Save