Make max-pollers configurable via GRPC_MAX_POLLERS_PER_PI env variable

pull/10803/head
Sree Kuchibhotla 8 years ago
parent 7f59e9d3ac
commit 22b8bb1e08
  1. 53
      src/core/lib/iomgr/ev_epoll_linux.c

@ -40,6 +40,7 @@
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <limits.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <signal.h> #include <signal.h>
@ -62,6 +63,7 @@
#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/env.h"
/* TODO: sreek - Move this to init.c and initialize this like other tracers. */ /* TODO: sreek - Move this to init.c and initialize this like other tracers. */
static int grpc_polling_trace = 0; /* Disabled by default */ static int grpc_polling_trace = 0; /* Disabled by default */
@ -73,6 +75,10 @@ static int grpc_polling_trace = 0; /* Disabled by default */
/* Uncomment the following to enable extra checks on poll_object operations */ /* Uncomment the following to enable extra checks on poll_object operations */
/* #define PO_DEBUG */ /* #define PO_DEBUG */
/* The maximum number of polling threads per polling island. By default no
limit */
static int g_max_pollers_per_pi = INT_MAX;
static int grpc_wakeup_signal = -1; static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false; static bool is_grpc_wakeup_signal_initialized = false;
@ -97,9 +103,6 @@ void grpc_use_signal(int signum) {
} }
} }
/* The maximum number of polling threads per polling island */
#define GRPC_MAX_POLLERS_PER_ISLAND 1
struct polling_island; struct polling_island;
typedef enum { typedef enum {
@ -1466,7 +1469,7 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
return false; return false;
} }
/* NOTE: May modify 'now' */ /* NOTE: This function may modify 'now' */
static bool acquire_polling_lease(grpc_pollset_worker *worker, static bool acquire_polling_lease(grpc_pollset_worker *worker,
polling_island *pi, gpr_timespec deadline, polling_island *pi, gpr_timespec deadline,
gpr_timespec *now) { gpr_timespec *now) {
@ -1475,7 +1478,7 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
gpr_mu_lock(&pi->worker_list_mu); // LOCK gpr_mu_lock(&pi->worker_list_mu); // LOCK
long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
if (num_pollers >= GRPC_MAX_POLLERS_PER_ISLAND) { if (num_pollers >= g_max_pollers_per_pi) {
push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link); push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link);
gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
@ -1493,20 +1496,21 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
if (errno == EAGAIN) { if (errno == EAGAIN) {
is_timeout = true; is_timeout = true;
} else { } else {
/* TODO: sreek This should not happen. If we see these log messages, it /* NOTE: This should not happen. If we see these log messages, it means
* means we are most likely something incorrect in the setup needed for we are most likely doing something incorrect in the setup * needed
* sigwaitinfo/sigtimedwait */ for sigwaitinfo/sigtimedwait */
gpr_log(GPR_ERROR, "Failed with retcode: %d (timeout_ms: %d)", errno, gpr_log(GPR_ERROR,
"sigtimedwait failed with retcode: %d (timeout_ms: %d)", errno,
timeout_ms); timeout_ms);
} }
} }
/* Did the worker come out of sigtimedwait due to a thread that just /* Did the worker come out of sigtimedwait due to a thread that just
* exited epoll and kicking it (in release_polling_lease function). */ exited epoll and kicking it (in release_polling_lease function). */
bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn); bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn);
/* Did the worker come out of sigtimedwait due to a thread alerting it that /* Did the worker come out of sigtimedwait due to a thread alerting it that
* some completion event was (likely) available in the completion queue */ some completion event was (likely) available in the completion queue */
bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked); bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked);
if (is_kicked || is_timeout) { if (is_kicked || is_timeout) {
@ -1522,11 +1526,11 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
remove_worker_node(&worker->pi_list_link); remove_worker_node(&worker->pi_list_link);
/* It is important to read the num_pollers again under the lock so that we /* It is important to read the num_pollers again under the lock so that we
* have the latest num_pollers value that doesn't change while we are doing * have the latest num_pollers value that doesn't change while we are doing
* the "(num_pollers < GPRC_MAX_POLLERS_PER_ISLAND)" a a few lines below */ * the "(num_pollers < g_max_pollers_per_pi)" a a few lines below */
num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
} }
if (num_pollers < GRPC_MAX_POLLERS_PER_ISLAND) { if (num_pollers < g_max_pollers_per_pi) {
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
is_lease_acquired = true; is_lease_acquired = true;
} }
@ -1536,7 +1540,7 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
} }
static void release_polling_lease(polling_island *pi, grpc_error **error) { static void release_polling_lease(polling_island *pi, grpc_error **error) {
gpr_mu_lock(&pi->worker_list_mu); // Lock gpr_mu_lock(&pi->worker_list_mu);
gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
worker_node *node = pop_front_worker_node(&pi->worker_list_head); worker_node *node = pop_front_worker_node(&pi->worker_list_head);
@ -1554,6 +1558,8 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
grpc_pollset_worker *worker, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline, gpr_timespec now, gpr_timespec deadline,
sigset_t *sig_mask, grpc_error **error) { sigset_t *sig_mask, grpc_error **error) {
/* Only g_max_pollers_per_pi threads can be doing polling in parallel.
If we cannot get a lease, we cannot continue to do epoll_pwait() */
if (!acquire_polling_lease(worker, pi, deadline, &now)) { if (!acquire_polling_lease(worker, pi, deadline, &now)) {
return; return;
} }
@ -1563,6 +1569,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
char *err_msg; char *err_msg;
const char *err_desc = "pollset_work_and_unlock"; const char *err_desc = "pollset_work_and_unlock";
/* timeout_ms is the time between 'now' and 'deadline' */
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_SCHEDULING_START_BLOCKING_REGION;
@ -1570,6 +1577,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_SCHEDULING_END_BLOCKING_REGION;
/* Give back the lease right away so that some other thread can enter */
release_polling_lease(pi, error); release_polling_lease(pi, error);
if (ep_rv < 0) { if (ep_rv < 0) {
@ -2116,6 +2124,21 @@ static bool is_epoll_available() {
return true; return true;
} }
/* This is mainly for testing purposes. Checks to see if environment variable
* GRPC_MAX_POLLERS_PER_PI is set and if so, assigns that value to the */
static void set_max_pollers_per_island() {
char *s = gpr_getenv("GRPC_MAX_POLLERS_PER_PI");
if (s) {
int max_pollers = (int)strtol(s, NULL, 10);
if (max_pollers > 0) {
g_max_pollers_per_pi = max_pollers;
}
}
gpr_log(GPR_INFO, "Max number of pollers per polling island: %d",
g_max_pollers_per_pi);
}
const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
/* If use of signals is disabled, we cannot use epoll engine*/ /* If use of signals is disabled, we cannot use epoll engine*/
if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) { if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
@ -2134,6 +2157,8 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
grpc_use_signal(SIGRTMIN + 6); grpc_use_signal(SIGRTMIN + 6);
} }
set_max_pollers_per_island();
fd_global_init(); fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {

Loading…
Cancel
Save