From cfaa046a0127f9e40e63eaf5490164794d6ab253 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 5 May 2017 15:27:40 +0000 Subject: [PATCH] Integrate new tracers --- src/core/lib/iomgr/ev_epoll1_linux.c | 14 ----- src/core/lib/iomgr/ev_epollex_linux.c | 73 +++++++------------------- src/core/lib/iomgr/ev_epollsig_linux.c | 3 +- src/core/lib/iomgr/ev_posix.c | 2 +- src/core/lib/iomgr/ev_posix.h | 3 +- 5 files changed, 23 insertions(+), 72 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 90c55465558..8aa69cd73a0 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -63,12 +63,8 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" -/* TODO: sreek: Right now, this wakes up all pollers. In future we should make - * sure to wake up one polling thread (which can wake up other threads if - * needed) */ static grpc_wakeup_fd global_wakeup_fd; static int g_epfd; -static gpr_atm g_timer_kick; /******************************************************************************* * Fd Declarations @@ -512,9 +508,6 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, for (int i = 0; i < r; i++) { void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { - if (gpr_atm_no_barrier_cas(&g_timer_kick, 1, 0)) { - grpc_timer_consume_kick(); - } gpr_mu_lock(&g_wq_mu); grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list); gpr_mu_unlock(&g_wq_mu); @@ -799,11 +792,6 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {} -static grpc_error *kick_poller(void) { - gpr_atm_no_barrier_store(&g_timer_kick, 1); - return grpc_wakeup_fd_wakeup(&global_wakeup_fd); -} - /******************************************************************************* * Workqueue Definitions */ @@ -951,8 +939,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .kick_poller = kick_poller, - .workqueue_ref = workqueue_ref, .workqueue_unref = workqueue_unref, .workqueue_scheduler = workqueue_scheduler, diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index b9f60d8e3ea..5a7c0448b65 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -64,11 +64,6 @@ #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" -/* TODO: sreek: Right now, this wakes up all pollers. In future we should make - * sure to wake up one polling thread (which can wake up other threads if - * needed) */ -static grpc_wakeup_fd global_wakeup_fd; - /******************************************************************************* * Pollset-set sibling link */ @@ -560,16 +555,6 @@ static grpc_error *pollable_materialize(pollable *p) { int new_epfd = epoll_create1(EPOLL_CLOEXEC); if (new_epfd < 0) { return GRPC_OS_ERROR(errno, "epoll_create1"); - } else { - struct epoll_event ev = { - .events = (uint32_t)(EPOLLIN | EPOLLET | EPOLLEXCLUSIVE), - .data.ptr = &global_wakeup_fd}; - if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != - 0) { - grpc_error *err = GRPC_OS_ERROR(errno, "epoll_ctl"); - close(new_epfd); - return err; - } } grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); if (err != GRPC_ERROR_NONE) { @@ -639,22 +624,16 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); -static bool global_wakeup_fd_initialized = false; /* Global state management */ static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); - grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "pollset_global_init"; - global_wakeup_fd_initialized = - append_error(&error, grpc_wakeup_fd_init(&global_wakeup_fd), err_desc); pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE); - return error; + return GRPC_ERROR_NONE; } static void pollset_global_shutdown(void) { - if (global_wakeup_fd_initialized) grpc_wakeup_fd_destroy(&global_wakeup_fd); pollable_destroy(&g_empty_pollable); gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); @@ -687,7 +666,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kick %p tls_pollset=%p tls_worker=%p " "root_worker=(pollset:%p pollable:%p)", @@ -698,13 +677,13 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, if (specific_worker == NULL) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { if (pollset->root_worker == NULL) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p); } pollset->kicked_without_poller = true; return GRPC_ERROR_NONE; } else { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); } grpc_error *err = pollable_materialize(p); @@ -712,25 +691,25 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, return grpc_wakeup_fd_wakeup(&p->wakeup); } } else { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p); } return GRPC_ERROR_NONE; } } else if (specific_worker->kicked) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); } return GRPC_ERROR_NONE; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); } specific_worker->kicked = true; return GRPC_ERROR_NONE; } else if (specific_worker == p->root_worker) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } grpc_error *err = pollable_materialize(p); @@ -738,7 +717,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, specific_worker->kicked = true; return grpc_wakeup_fd_wakeup(&p->wakeup); } else { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } specific_worker->kicked = true; @@ -761,10 +740,6 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, return error; } -static grpc_error *kick_poller(void) { - return grpc_wakeup_fd_wakeup(&global_wakeup_fd); -} - static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollable_init(&pollset->pollable, PO_POLLSET); pollset->current_pollable = &g_empty_pollable; @@ -865,7 +840,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout = poll_deadline_to_millis_timeout(deadline, now); - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); } @@ -882,23 +857,15 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); } grpc_error *error = GRPC_ERROR_NONE; for (int i = 0; i < r; i++) { void *data_ptr = events[i].data.ptr; - if (data_ptr == &global_wakeup_fd) { - if (grpc_polling_trace) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got global_wakeup_fd", pollset, p); - } - - grpc_timer_consume_kick(); - append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), - err_desc); - } else if (data_ptr == &p->wakeup) { - if (grpc_polling_trace) { + if (data_ptr == &p->wakeup) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); } append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); @@ -908,7 +875,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (events[i].events & EPOLLOUT) != 0; - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " "write=%d", @@ -994,25 +961,25 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, if (worker->pollable != &pollset->pollable) { gpr_mu_unlock(&pollset->pollable.po.mu); } - if (grpc_polling_trace && worker->pollable->root_worker != worker) { + if (GRPC_TRACER_ON(grpc_polling_trace) && worker->pollable->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, worker->pollable, worker, poll_deadline_to_millis_timeout(deadline, *now)); } while (do_poll && worker->pollable->root_worker != worker) { if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, worker->pollable, worker); } do_poll = false; } else if (worker->kicked) { - if (grpc_polling_trace) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, worker); } do_poll = false; - } else if (grpc_polling_trace && + } else if (GRPC_TRACER_ON(grpc_polling_trace) && worker->pollable->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, worker->pollable, worker); @@ -1056,7 +1023,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; - if (0 && grpc_polling_trace) { + if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, @@ -1484,8 +1451,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .kick_poller = kick_poller, - .workqueue_ref = workqueue_ref, .workqueue_unref = workqueue_unref, .workqueue_scheduler = workqueue_scheduler, diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 11be0e70a4e..d9ba77c6f04 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -65,9 +65,8 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) -/* TODO: sreek - Move this to init.c and initialize this like other tracers. */ #define GRPC_POLLING_TRACE(fmt, ...) \ - if (grpc_polling_trace) { \ + if (GRPC_TRACER_ON(grpc_polling_trace)) { \ gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \ } diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index d3302f4a807..e3d53d6d3d1 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -51,7 +51,7 @@ #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" -int grpc_polling_trace = 0; /* Disabled by default */ +grpc_tracer_flag grpc_polling_trace = GRPC_TRACER_INITIALIZER(false); /* Disabled by default */ /** Default poll() function - a pointer so that it can be overridden by some * tests */ diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index d70ed74ae1c..e013a6c9530 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -41,8 +41,9 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/workqueue.h" +#include "src/core/lib/debug/trace.h" -extern int grpc_polling_trace; /* Disabled by default */ +extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */ typedef struct grpc_fd grpc_fd;