diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 3e13e657d35..3a5f48cb8f0 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -40,6 +40,7 @@ #include #include +#include #include #include #include @@ -62,6 +63,7 @@ #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/env.h" /* TODO: sreek - Move this to init.c and initialize this like other tracers. */ static int grpc_polling_trace = 0; /* Disabled by default */ @@ -73,6 +75,10 @@ static int grpc_polling_trace = 0; /* Disabled by default */ /* Uncomment the following to enable extra checks on poll_object operations */ /* #define PO_DEBUG */ +/* The maximum number of polling threads per polling island. By default no + limit */ +static int g_max_pollers_per_pi = INT_MAX; + static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; @@ -97,9 +103,6 @@ void grpc_use_signal(int signum) { } } -/* The maximum number of polling threads per polling island */ -#define GRPC_MAX_POLLERS_PER_ISLAND 1 - struct polling_island; typedef enum { @@ -1466,7 +1469,7 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, return false; } -/* NOTE: May modify 'now' */ +/* NOTE: This function may modify 'now' */ static bool acquire_polling_lease(grpc_pollset_worker *worker, polling_island *pi, gpr_timespec deadline, gpr_timespec *now) { @@ -1475,7 +1478,7 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker, gpr_mu_lock(&pi->worker_list_mu); // LOCK long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); - if (num_pollers >= GRPC_MAX_POLLERS_PER_ISLAND) { + if (num_pollers >= g_max_pollers_per_pi) { push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link); gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK @@ -1493,20 +1496,21 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker, if (errno == EAGAIN) { is_timeout = true; } else { - /* TODO: sreek This should not happen. If we see these log messages, it - * means we are most likely something incorrect in the setup needed for - * sigwaitinfo/sigtimedwait */ - gpr_log(GPR_ERROR, "Failed with retcode: %d (timeout_ms: %d)", errno, + /* NOTE: This should not happen. If we see these log messages, it means + we are most likely doing something incorrect in the setup * needed + for sigwaitinfo/sigtimedwait */ + gpr_log(GPR_ERROR, + "sigtimedwait failed with retcode: %d (timeout_ms: %d)", errno, timeout_ms); } } /* Did the worker come out of sigtimedwait due to a thread that just - * exited epoll and kicking it (in release_polling_lease function). */ + exited epoll and kicking it (in release_polling_lease function). */ bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn); /* Did the worker come out of sigtimedwait due to a thread alerting it that - * some completion event was (likely) available in the completion queue */ + some completion event was (likely) available in the completion queue */ bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked); if (is_kicked || is_timeout) { @@ -1522,11 +1526,11 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker, remove_worker_node(&worker->pi_list_link); /* It is important to read the num_pollers again under the lock so that we * have the latest num_pollers value that doesn't change while we are doing - * the "(num_pollers < GPRC_MAX_POLLERS_PER_ISLAND)" a a few lines below */ + * the "(num_pollers < g_max_pollers_per_pi)" a a few lines below */ num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); } - if (num_pollers < GRPC_MAX_POLLERS_PER_ISLAND) { + if (num_pollers < g_max_pollers_per_pi) { gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); is_lease_acquired = true; } @@ -1536,7 +1540,7 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker, } static void release_polling_lease(polling_island *pi, grpc_error **error) { - gpr_mu_lock(&pi->worker_list_mu); // Lock + gpr_mu_lock(&pi->worker_list_mu); gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); worker_node *node = pop_front_worker_node(&pi->worker_list_head); @@ -1554,6 +1558,8 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline, sigset_t *sig_mask, grpc_error **error) { + /* Only g_max_pollers_per_pi threads can be doing polling in parallel. + If we cannot get a lease, we cannot continue to do epoll_pwait() */ if (!acquire_polling_lease(worker, pi, deadline, &now)) { return; } @@ -1563,6 +1569,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, char *err_msg; const char *err_desc = "pollset_work_and_unlock"; + /* timeout_ms is the time between 'now' and 'deadline' */ int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); GRPC_SCHEDULING_START_BLOCKING_REGION; @@ -1570,6 +1577,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); GRPC_SCHEDULING_END_BLOCKING_REGION; + /* Give back the lease right away so that some other thread can enter */ release_polling_lease(pi, error); if (ep_rv < 0) { @@ -2116,6 +2124,21 @@ static bool is_epoll_available() { return true; } +/* This is mainly for testing purposes. Checks to see if environment variable + * GRPC_MAX_POLLERS_PER_PI is set and if so, assigns that value to the */ +static void set_max_pollers_per_island() { + char *s = gpr_getenv("GRPC_MAX_POLLERS_PER_PI"); + if (s) { + int max_pollers = (int)strtol(s, NULL, 10); + if (max_pollers > 0) { + g_max_pollers_per_pi = max_pollers; + } + } + + gpr_log(GPR_INFO, "Max number of pollers per polling island: %d", + g_max_pollers_per_pi); +} + const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { /* If use of signals is disabled, we cannot use epoll engine*/ if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) { @@ -2134,6 +2157,8 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { grpc_use_signal(SIGRTMIN + 6); } + set_max_pollers_per_island(); + fd_global_init(); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {