|
|
|
@ -57,7 +57,7 @@ |
|
|
|
|
//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
|
|
|
|
|
|
|
|
|
|
#define MAX_EPOLL_EVENTS 100 |
|
|
|
|
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 1 |
|
|
|
|
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 |
|
|
|
|
|
|
|
|
|
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, |
|
|
|
|
"pollable_refcount"); |
|
|
|
@ -196,6 +196,7 @@ struct grpc_pollset_worker { |
|
|
|
|
|
|
|
|
|
struct grpc_pollset { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
gpr_atm worker_count; |
|
|
|
|
pollable* active_pollable; |
|
|
|
|
bool kicked_without_poller; |
|
|
|
|
grpc_closure* shutdown_closure; |
|
|
|
@ -683,6 +684,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { |
|
|
|
|
|
|
|
|
|
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { |
|
|
|
|
gpr_mu_init(&pollset->mu); |
|
|
|
|
gpr_atm_no_barrier_store(&pollset->worker_count, 0); |
|
|
|
|
pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); |
|
|
|
|
pollset->kicked_without_poller = false; |
|
|
|
|
pollset->shutdown_closure = nullptr; |
|
|
|
@ -756,8 +758,20 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, |
|
|
|
|
pollable* pollable_obj, bool drain) { |
|
|
|
|
GPR_TIMER_SCOPE("pollable_process_events", 0); |
|
|
|
|
static const char* err_desc = "pollset_process_events"; |
|
|
|
|
// Use a simple heuristic to determine how many fd events to process
|
|
|
|
|
// per loop iteration. (events/workers)
|
|
|
|
|
int handle_count = 1; |
|
|
|
|
int worker_count = gpr_atm_no_barrier_load(&pollset->worker_count); |
|
|
|
|
GPR_ASSERT(worker_count > 0); |
|
|
|
|
handle_count = |
|
|
|
|
(pollable_obj->event_count - pollable_obj->event_cursor) / worker_count; |
|
|
|
|
if (handle_count == 0) { |
|
|
|
|
handle_count = 1; |
|
|
|
|
} else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) { |
|
|
|
|
handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL; |
|
|
|
|
} |
|
|
|
|
grpc_error* error = GRPC_ERROR_NONE; |
|
|
|
|
for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && |
|
|
|
|
for (int i = 0; (drain || i < handle_count) && |
|
|
|
|
pollable_obj->event_cursor != pollable_obj->event_count; |
|
|
|
|
i++) { |
|
|
|
|
int n = pollable_obj->event_cursor++; |
|
|
|
@ -882,6 +896,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, |
|
|
|
|
GPR_TIMER_SCOPE("begin_worker", 0); |
|
|
|
|
bool do_poll = |
|
|
|
|
(pollset->shutdown_closure == nullptr && !pollset->already_shutdown); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pollset->worker_count, 1); |
|
|
|
|
if (worker_hdl != nullptr) *worker_hdl = worker; |
|
|
|
|
worker->initialized_cv = false; |
|
|
|
|
worker->kicked = false; |
|
|
|
@ -962,6 +977,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, |
|
|
|
|
if (worker->initialized_cv) { |
|
|
|
|
gpr_cv_destroy(&worker->cv); |
|
|
|
|
} |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&pollset->worker_count, -1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|