|
|
|
@ -246,8 +246,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, gpr_timespec now, |
|
|
|
|
grpc_pollset_worker **worker_hdl, gpr_timespec now, |
|
|
|
|
gpr_timespec deadline) { |
|
|
|
|
grpc_pollset_worker worker; |
|
|
|
|
*worker_hdl = &worker; |
|
|
|
|
|
|
|
|
|
/* pollset->mu already held */ |
|
|
|
|
int added_worker = 0; |
|
|
|
|
int locked = 1; |
|
|
|
@ -255,16 +258,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
int keep_polling = 0; |
|
|
|
|
GPR_TIMER_BEGIN("grpc_pollset_work", 0); |
|
|
|
|
/* this must happen before we (potentially) drop pollset->mu */ |
|
|
|
|
worker->next = worker->prev = NULL; |
|
|
|
|
worker->reevaluate_polling_on_wakeup = 0; |
|
|
|
|
worker.next = worker.prev = NULL; |
|
|
|
|
worker.reevaluate_polling_on_wakeup = 0; |
|
|
|
|
if (pollset->local_wakeup_cache != NULL) { |
|
|
|
|
worker->wakeup_fd = pollset->local_wakeup_cache; |
|
|
|
|
pollset->local_wakeup_cache = worker->wakeup_fd->next; |
|
|
|
|
worker.wakeup_fd = pollset->local_wakeup_cache; |
|
|
|
|
pollset->local_wakeup_cache = worker.wakeup_fd->next; |
|
|
|
|
} else { |
|
|
|
|
worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); |
|
|
|
|
grpc_wakeup_fd_init(&worker->wakeup_fd->fd); |
|
|
|
|
worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); |
|
|
|
|
grpc_wakeup_fd_init(&worker.wakeup_fd->fd); |
|
|
|
|
} |
|
|
|
|
worker->kicked_specifically = 0; |
|
|
|
|
worker.kicked_specifically = 0; |
|
|
|
|
/* If there's work waiting for the pollset to be idle, and the
|
|
|
|
|
pollset is idle, then do that work */ |
|
|
|
|
if (!grpc_pollset_has_workers(pollset) && |
|
|
|
@ -293,13 +296,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
keep_polling = 0; |
|
|
|
|
if (!pollset->kicked_without_pollers) { |
|
|
|
|
if (!added_worker) { |
|
|
|
|
push_front_worker(pollset, worker); |
|
|
|
|
push_front_worker(pollset, &worker); |
|
|
|
|
added_worker = 1; |
|
|
|
|
gpr_tls_set(&g_current_thread_worker, (intptr_t)worker); |
|
|
|
|
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); |
|
|
|
|
} |
|
|
|
|
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); |
|
|
|
|
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); |
|
|
|
|
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, |
|
|
|
|
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, |
|
|
|
|
deadline, now); |
|
|
|
|
GPR_TIMER_END("maybe_work_and_unlock", 0); |
|
|
|
|
locked = 0; |
|
|
|
@ -321,10 +324,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
/* If we're forced to re-evaluate polling (via grpc_pollset_kick with
|
|
|
|
|
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force |
|
|
|
|
a loop */ |
|
|
|
|
if (worker->reevaluate_polling_on_wakeup) { |
|
|
|
|
worker->reevaluate_polling_on_wakeup = 0; |
|
|
|
|
if (worker.reevaluate_polling_on_wakeup) { |
|
|
|
|
worker.reevaluate_polling_on_wakeup = 0; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
if (queued_work || worker->kicked_specifically) { |
|
|
|
|
if (queued_work || worker.kicked_specifically) { |
|
|
|
|
/* If there's queued work on the list, then set the deadline to be
|
|
|
|
|
immediate so we get back out of the polling loop quickly */ |
|
|
|
|
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); |
|
|
|
@ -333,12 +336,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (added_worker) { |
|
|
|
|
remove_worker(pollset, worker); |
|
|
|
|
remove_worker(pollset, &worker); |
|
|
|
|
gpr_tls_set(&g_current_thread_worker, 0); |
|
|
|
|
} |
|
|
|
|
/* release wakeup fd to the local pool */ |
|
|
|
|
worker->wakeup_fd->next = pollset->local_wakeup_cache; |
|
|
|
|
pollset->local_wakeup_cache = worker->wakeup_fd; |
|
|
|
|
worker.wakeup_fd->next = pollset->local_wakeup_cache; |
|
|
|
|
pollset->local_wakeup_cache = worker.wakeup_fd; |
|
|
|
|
/* check shutdown conditions */ |
|
|
|
|
if (pollset->shutting_down) { |
|
|
|
|
if (grpc_pollset_has_workers(pollset)) { |
|
|
|
@ -360,6 +363,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
*worker_hdl = NULL; |
|
|
|
|
GPR_TIMER_END("grpc_pollset_work", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|