Linux compile fixes

pull/11888/head
Craig Tiller 7 years ago
parent 19b9af0cb8
commit 203977925c
  1. 48
      src/core/lib/iomgr/ev_epoll1_linux.c
  2. 70
      src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
  3. 4
      src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
  4. 68
      src/core/lib/iomgr/ev_epollex_linux.c
  5. 37
      src/core/lib/iomgr/ev_epollsig_linux.c
  6. 3
      src/core/lib/iomgr/timer_generic.c
  7. 6
      test/core/iomgr/ev_epollsig_linux_test.c
  8. 22
      test/core/iomgr/pollset_set_test.c

@ -25,6 +25,7 @@
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <limits.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <string.h> #include <string.h>
@ -444,30 +445,24 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
#define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS 100
static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
gpr_timespec now) { grpc_millis millis) {
gpr_timespec timeout; if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
return -1; if (delta > INT_MAX)
} return INT_MAX;
else if (delta < 0)
if (gpr_time_cmp(deadline, now) <= 0) {
return 0; return 0;
} else
return (int)delta;
static const gpr_timespec round_up = {
.clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
timeout = gpr_time_sub(deadline, now);
int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
return millis >= 1 ? millis : 1;
} }
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_timespec now, gpr_timespec deadline) { grpc_millis deadline) {
struct epoll_event events[MAX_EPOLL_EVENTS]; struct epoll_event events[MAX_EPOLL_EVENTS];
static const char *err_desc = "pollset_poll"; static const char *err_desc = "pollset_poll";
int timeout = poll_deadline_to_millis_timeout(deadline, now); int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (timeout != 0) { if (timeout != 0) {
GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_SCHEDULING_START_BLOCKING_REGION;
@ -505,9 +500,10 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error; return error;
} }
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, gpr_timespec *now, grpc_pollset_worker *worker,
gpr_timespec deadline) { grpc_pollset_worker **worker_hdl,
grpc_millis deadline) {
if (worker_hdl != NULL) *worker_hdl = worker; if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false; worker->initialized_cv = false;
worker->kick_state = UNKICKED; worker->kick_state = UNKICKED;
@ -562,12 +558,13 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_cv_init(&worker->cv); gpr_cv_init(&worker->cv);
while (worker->kick_state == UNKICKED && while (worker->kick_state == UNKICKED &&
pollset->shutdown_closure == NULL) { pollset->shutdown_closure == NULL) {
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && if (gpr_cv_wait(&worker->cv, &pollset->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) &&
worker->kick_state == UNKICKED) { worker->kick_state == UNKICKED) {
worker->kick_state = KICKED; worker->kick_state = KICKED;
} }
} }
*now = gpr_now(now->clock_type); grpc_exec_ctx_invalidate_now(exec_ctx);
} }
return worker->kick_state == DESIGNATED_POLLER && return worker->kick_state == DESIGNATED_POLLER &&
@ -694,7 +691,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { grpc_millis deadline) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work"; static const char *err_desc = "pollset_work";
@ -703,13 +700,12 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure); GPR_ASSERT(!pollset->shutdown_closure);
GPR_ASSERT(!pollset->seen_inactive); GPR_ASSERT(!pollset->seen_inactive);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline), append_error(&error, pollset_epoll(exec_ctx, pollset, deadline), err_desc);
err_desc);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
gpr_tls_set(&g_current_thread_worker, 0); gpr_tls_set(&g_current_thread_worker, 0);
} }

@ -1196,30 +1196,16 @@ static struct timespec millis_to_timespec(int millis) {
return linux_ts; return linux_ts;
} }
/* Convert a timespec to milliseconds: static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
- Very small or negative poll times are clamped to zero to do a non-blocking grpc_millis millis) {
poll (which becomes spin polling) if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
- Other small values are rounded up to one millisecond grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
- Longer than a millisecond polls are rounded up to the next nearest if (delta > INT_MAX)
millisecond to avoid spinning return INT_MAX;
- Infinite timeouts are converted to -1 */ else if (delta < 0)
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
static const int64_t max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
max_spin_polling_us,
GPR_TIMESPAN))) <= 0) {
return 0; return 0;
} else
timeout = gpr_time_sub(deadline, now); return (int)delta;
int millis = gpr_time_to_millis(gpr_time_add(
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
return millis >= 1 ? millis : 1;
} }
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@ -1287,10 +1273,9 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu); gpr_mu_destroy(&pollset->po.mu);
} }
/* NOTE: This function may modify 'now' */ static bool acquire_polling_lease(grpc_exec_ctx *exec_ctx,
static bool acquire_polling_lease(grpc_pollset_worker *worker, grpc_pollset_worker *worker,
polling_island *pi, gpr_timespec deadline, polling_island *pi, grpc_millis *deadline) {
gpr_timespec *now) {
bool is_lease_acquired = false; bool is_lease_acquired = false;
gpr_mu_lock(&pi->worker_list_mu); // LOCK gpr_mu_lock(&pi->worker_list_mu); // LOCK
@ -1302,7 +1287,7 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
bool is_timeout = false; bool is_timeout = false;
int ret; int ret;
int timeout_ms = poll_deadline_to_millis_timeout(deadline, *now); int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, *deadline);
if (timeout_ms == -1) { if (timeout_ms == -1) {
ret = sigwaitinfo(&g_wakeup_sig_set, NULL); ret = sigwaitinfo(&g_wakeup_sig_set, NULL);
} else { } else {
@ -1325,18 +1310,13 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
} }
} }
/* Did the worker come out of sigtimedwait due to a thread that just
exited epoll and kicking it (in release_polling_lease function). */
bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn);
/* Did the worker come out of sigtimedwait due to a thread alerting it that /* Did the worker come out of sigtimedwait due to a thread alerting it that
some completion event was (likely) available in the completion queue */ some completion event was (likely) available in the completion queue */
bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked); bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked);
if (is_kicked || is_timeout) { if (is_kicked || is_timeout) {
*now = deadline; /* Essentially make the epoll timeout = 0 */ *deadline = grpc_exec_ctx_now(
} else if (is_polling_turn) { exec_ctx); /* Essentially make the epoll timeout = 0 */
*now = gpr_now(GPR_CLOCK_MONOTONIC); /* Reduce the epoll timeout */
} }
gpr_mu_lock(&pi->worker_list_mu); // LOCK gpr_mu_lock(&pi->worker_list_mu); // LOCK
@ -1376,11 +1356,11 @@ static void release_polling_lease(polling_island *pi, grpc_error **error) {
static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
grpc_pollset *pollset, polling_island *pi, grpc_pollset *pollset, polling_island *pi,
grpc_pollset_worker *worker, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline, grpc_millis deadline, sigset_t *sig_mask,
sigset_t *sig_mask, grpc_error **error) { grpc_error **error) {
/* Only g_max_pollers_per_pi threads can be doing polling in parallel. /* Only g_max_pollers_per_pi threads can be doing polling in parallel.
If we cannot get a lease, we cannot continue to do epoll_pwait() */ If we cannot get a lease, we cannot continue to do epoll_pwait() */
if (!acquire_polling_lease(worker, pi, deadline, &now)) { if (!acquire_polling_lease(exec_ctx, worker, pi, &deadline)) {
return; return;
} }
@ -1390,12 +1370,12 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
const char *err_desc = "pollset_work_and_unlock"; const char *err_desc = "pollset_work_and_unlock";
/* timeout_ms is the time between 'now' and 'deadline' */ /* timeout_ms is the time between 'now' and 'deadline' */
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline);
GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_SCHEDULING_START_BLOCKING_REGION;
ep_rv = ep_rv =
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
/* Give back the lease right away so that some other thread can enter */ /* Give back the lease right away so that some other thread can enter */
release_polling_lease(pi, error); release_polling_lease(pi, error);
@ -1450,8 +1430,8 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, grpc_pollset *pollset,
grpc_pollset_worker *worker, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline, grpc_millis deadline, sigset_t *sig_mask,
sigset_t *sig_mask, grpc_error **error) { grpc_error **error) {
int epoll_fd = -1; int epoll_fd = -1;
polling_island *pi = NULL; polling_island *pi = NULL;
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
@ -1499,7 +1479,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&pollset->po.mu); gpr_mu_unlock(&pollset->po.mu);
g_current_thread_polling_island = pi; g_current_thread_polling_island = pi;
pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline, pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, deadline,
sig_mask, error); sig_mask, error);
g_current_thread_polling_island = NULL; g_current_thread_polling_island = NULL;
@ -1521,7 +1501,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { grpc_millis deadline) {
GPR_TIMER_BEGIN("pollset_work", 0); GPR_TIMER_BEGIN("pollset_work", 0);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
@ -1577,7 +1557,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
push_front_worker(pollset, &worker); /* Add worker to pollset */ push_front_worker(pollset, &worker); /* Add worker to pollset */
pollset_work_and_unlock(exec_ctx, pollset, &worker, now, deadline, pollset_work_and_unlock(exec_ctx, pollset, &worker, deadline,
&g_orig_sigmask, &error); &g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);

@ -838,7 +838,7 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { grpc_millis deadline) {
GPR_TIMER_BEGIN("pollset_work", 0); GPR_TIMER_BEGIN("pollset_work", 0);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
@ -861,7 +861,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
push_front_worker(pollset, &worker); push_front_worker(pollset, &worker);
gpr_cv_wait(&worker.kick_cv, &pollset->mu, gpr_cv_wait(&worker.kick_cv, &pollset->mu,
gpr_convert_clock_type(deadline, GPR_CLOCK_REALTIME)); grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME));
/* pollset->mu locked here */ /* pollset->mu locked here */
remove_worker(pollset, &worker); remove_worker(pollset, &worker);

@ -25,6 +25,7 @@
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <limits.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <string.h> #include <string.h>
@ -683,29 +684,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &pollset->pollable.po.mu; *mu = &pollset->pollable.po.mu;
} }
/* Convert a timespec to milliseconds: static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
- Very small or negative poll times are clamped to zero to do a non-blocking grpc_millis millis) {
poll (which becomes spin polling) if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
- Other small values are rounded up to one millisecond grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
- Longer than a millisecond polls are rounded up to the next nearest if (delta > INT_MAX)
millisecond to avoid spinning return INT_MAX;
- Infinite timeouts are converted to -1 */ else if (delta < 0)
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
if (gpr_time_cmp(deadline, now) <= 0) {
return 0; return 0;
} else
return (int)delta;
static const gpr_timespec round_up = {
.clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
timeout = gpr_time_sub(deadline, now);
int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
return millis >= 1 ? millis : 1;
} }
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@ -799,9 +787,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
} }
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollable *p, gpr_timespec now, pollable *p, grpc_millis deadline) {
gpr_timespec deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
char *desc = pollable_desc(p); char *desc = pollable_desc(p);
@ -872,9 +859,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root,
} }
/* Return true if this thread should poll */ /* Return true if this thread should poll */
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, gpr_timespec *now, grpc_pollset_worker *worker,
gpr_timespec deadline) { grpc_pollset_worker **worker_hdl,
grpc_millis deadline) {
bool do_poll = true; bool do_poll = true;
if (worker_hdl != NULL) *worker_hdl = worker; if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false; worker->initialized_cv = false;
@ -897,10 +885,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->pollable->root_worker != worker) { worker->pollable->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
worker->pollable, worker, worker->pollable, worker,
poll_deadline_to_millis_timeout(deadline, *now)); poll_deadline_to_millis_timeout(exec_ctx, deadline));
} }
while (do_poll && worker->pollable->root_worker != worker) { while (do_poll && worker->pollable->root_worker != worker) {
if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
worker->pollable, worker); worker->pollable, worker);
@ -923,7 +912,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_mu_lock(&pollset->pollable.po.mu); gpr_mu_lock(&pollset->pollable.po.mu);
gpr_mu_lock(&worker->pollable->po.mu); gpr_mu_lock(&worker->pollable->po.mu);
} }
*now = gpr_now(now->clock_type); grpc_exec_ctx_invalidate_now(exec_ctx);
} }
return do_poll && pollset->shutdown_closure == NULL && return do_poll && pollset->shutdown_closure == NULL &&
@ -954,14 +943,13 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { grpc_millis deadline) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { if (0 && GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR
".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", " deadline=%" PRIdPTR " kwp=%d root_worker=%p",
pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline,
deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, pollset->kicked_without_poller, pollset->root_worker);
pollset->root_worker);
} }
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work"; static const char *err_desc = "pollset_work";
@ -972,7 +960,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->current_pollable != &pollset->pollable) { if (pollset->current_pollable != &pollset->pollable) {
gpr_mu_lock(&pollset->current_pollable->po.mu); gpr_mu_lock(&pollset->current_pollable->po.mu);
} }
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure); GPR_ASSERT(!pollset->shutdown_closure);
@ -982,8 +970,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
} }
gpr_mu_unlock(&pollset->pollable.po.mu); gpr_mu_unlock(&pollset->pollable.po.mu);
if (pollset->event_cursor == pollset->event_count) { if (pollset->event_cursor == pollset->event_count) {
append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, append_error(&error,
now, deadline), pollset_epoll(exec_ctx, pollset, worker.pollable, deadline),
err_desc); err_desc);
} }
append_error(&error, pollset_process_events(exec_ctx, pollset, false), append_error(&error, pollset_process_events(exec_ctx, pollset, false),

@ -25,6 +25,7 @@
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <limits.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <signal.h> #include <signal.h>
@ -1088,30 +1089,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutdown_done = NULL; pollset->shutdown_done = NULL;
} }
/* Convert a timespec to milliseconds: static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
- Very small or negative poll times are clamped to zero to do a non-blocking grpc_millis millis) {
poll (which becomes spin polling) if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
- Other small values are rounded up to one millisecond grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx);
- Longer than a millisecond polls are rounded up to the next nearest if (delta > INT_MAX)
millisecond to avoid spinning return INT_MAX;
- Infinite timeouts are converted to -1 */ else if (delta < 0)
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
static const int64_t max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
max_spin_polling_us,
GPR_TIMESPAN))) <= 0) {
return 0; return 0;
} else
timeout = gpr_time_sub(deadline, now); return (int)delta;
int millis = gpr_time_to_millis(gpr_time_add(
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
return millis >= 1 ? millis : 1;
} }
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@ -1307,10 +1294,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
ensure that it is held by the time the function returns */ ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { grpc_millis deadline) {
GPR_TIMER_BEGIN("pollset_work", 0); GPR_TIMER_BEGIN("pollset_work", 0);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline);
sigset_t new_mask; sigset_t new_mask;

@ -225,7 +225,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
return; return;
} }
grpc_time_averaged_stats_add_sample(&shard->stats, (deadline - now) / 1000.0); grpc_time_averaged_stats_add_sample(&shard->stats,
(double)(deadline - now) / 1000.0);
if (deadline < shard->queue_deadline_cap) { if (deadline < shard->queue_deadline_cap) {
is_first_timer = grpc_timer_heap_add(&shard->heap, timer); is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else { } else {

@ -236,10 +236,8 @@ static void test_threading_loop(void *arg) {
grpc_pollset_worker *worker; grpc_pollset_worker *worker;
gpr_mu_lock(shared->mu); gpr_mu_lock(shared->mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR( GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", "pollset_work", grpc_pollset_work(&exec_ctx, shared->pollset, &worker,
grpc_pollset_work(&exec_ctx, shared->pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC))));
gpr_mu_unlock(shared->mu); gpr_mu_unlock(shared->mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }

@ -202,7 +202,7 @@ static void pollset_set_test_basic() {
*/ */
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker *worker; grpc_pollset_worker *worker;
gpr_timespec deadline; grpc_millis deadline;
test_fd tfds[10]; test_fd tfds[10];
test_pollset pollsets[3]; test_pollset pollsets[3];
@ -255,10 +255,10 @@ static void pollset_set_test_basic() {
make_test_fds_readable(tfds, num_fds); make_test_fds_readable(tfds, num_fds);
gpr_mu_lock(pollsets[i].mu); gpr_mu_lock(pollsets[i].mu);
deadline = grpc_timeout_milliseconds_to_deadline(2); deadline =
grpc_timespec_to_millis(grpc_timeout_milliseconds_to_deadline(2));
GPR_ASSERT(GRPC_ERROR_NONE == GPR_ASSERT(GRPC_ERROR_NONE ==
grpc_pollset_work(&exec_ctx, pollsets[i].ps, &worker, grpc_pollset_work(&exec_ctx, pollsets[i].ps, &worker, deadline));
gpr_now(GPR_CLOCK_MONOTONIC), deadline));
gpr_mu_unlock(pollsets[i].mu); gpr_mu_unlock(pollsets[i].mu);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
@ -307,7 +307,7 @@ void pollset_set_test_dup_fds() {
*/ */
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker *worker; grpc_pollset_worker *worker;
gpr_timespec deadline; grpc_millis deadline;
test_fd tfds[3]; test_fd tfds[3];
test_pollset pollset; test_pollset pollset;
@ -337,10 +337,9 @@ void pollset_set_test_dup_fds() {
make_test_fds_readable(tfds, num_fds); make_test_fds_readable(tfds, num_fds);
gpr_mu_lock(pollset.mu); gpr_mu_lock(pollset.mu);
deadline = grpc_timeout_milliseconds_to_deadline(2); deadline = grpc_timespec_to_millis(grpc_timeout_milliseconds_to_deadline(2));
GPR_ASSERT(GRPC_ERROR_NONE == GPR_ASSERT(GRPC_ERROR_NONE ==
grpc_pollset_work(&exec_ctx, pollset.ps, &worker, grpc_pollset_work(&exec_ctx, pollset.ps, &worker, deadline));
gpr_now(GPR_CLOCK_MONOTONIC), deadline));
gpr_mu_unlock(pollset.mu); gpr_mu_unlock(pollset.mu);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
@ -380,7 +379,7 @@ void pollset_set_test_empty_pollset() {
*/ */
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker *worker; grpc_pollset_worker *worker;
gpr_timespec deadline; grpc_millis deadline;
test_fd tfds[3]; test_fd tfds[3];
test_pollset pollsets[2]; test_pollset pollsets[2];
@ -406,10 +405,9 @@ void pollset_set_test_empty_pollset() {
make_test_fds_readable(tfds, num_fds); make_test_fds_readable(tfds, num_fds);
gpr_mu_lock(pollsets[0].mu); gpr_mu_lock(pollsets[0].mu);
deadline = grpc_timeout_milliseconds_to_deadline(2); deadline = grpc_timespec_to_millis(grpc_timeout_milliseconds_to_deadline(2));
GPR_ASSERT(GRPC_ERROR_NONE == GPR_ASSERT(GRPC_ERROR_NONE ==
grpc_pollset_work(&exec_ctx, pollsets[0].ps, &worker, grpc_pollset_work(&exec_ctx, pollsets[0].ps, &worker, deadline));
gpr_now(GPR_CLOCK_MONOTONIC), deadline));
gpr_mu_unlock(pollsets[0].mu); gpr_mu_unlock(pollsets[0].mu);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);

Loading…
Cancel
Save