Merge pull request #10493 from ctiller/epex

Change pollset rules
pull/10467/head
Craig Tiller 8 years ago committed by GitHub
commit 9da6b8b865
  1. 4
      src/core/lib/iomgr/ev_epoll_linux.c
  2. 4
      src/core/lib/iomgr/ev_poll_posix.c
  3. 4
      src/core/lib/iomgr/pollset.h
  4. 4
      src/core/lib/iomgr/pollset_windows.c
  5. 5
      src/core/lib/surface/completion_queue.c

@ -1717,7 +1717,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker.pt_id = pthread_self(); worker.pt_id = pthread_self();
gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0); gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
*worker_hdl = &worker; if (worker_hdl) *worker_hdl = &worker;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
@ -1795,7 +1795,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->po.mu); gpr_mu_lock(&pollset->po.mu);
} }
*worker_hdl = NULL; if (worker_hdl) *worker_hdl = NULL;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)0); gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
gpr_tls_set(&g_current_thread_worker, (intptr_t)0); gpr_tls_set(&g_current_thread_worker, (intptr_t)0);

@ -871,7 +871,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
*worker_hdl = &worker; if (worker_hdl) *worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
/* Avoid malloc for small number of elements. */ /* Avoid malloc for small number of elements. */
@ -1092,7 +1092,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
} }
} }
*worker_hdl = NULL; if (worker_hdl) *worker_hdl = NULL;
GPR_TIMER_END("pollset_work", 0); GPR_TIMER_END("pollset_work", 0);
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error)); GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error; return error;

@ -75,6 +75,10 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
and it is guaranteed that it will not be released by grpc_pollset_work and it is guaranteed that it will not be released by grpc_pollset_work
AFTER worker has been destroyed. AFTER worker has been destroyed.
It's legal for worker to be NULL: in that case, this specific thread can not
be directly woken with a kick, but maybe be indirectly (with a kick against
the pollset as a whole).
Tries not to block past deadline. Tries not to block past deadline.
May call grpc_closure_list_run on grpc_closure_list, without holding the May call grpc_closure_list_run on grpc_closure_list, without holding the
pollset pollset

@ -120,7 +120,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) { gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker; grpc_pollset_worker worker;
*worker_hdl = &worker; if (worker_hdl) *worker_hdl = &worker;
int added_worker = 0; int added_worker = 0;
worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
@ -185,7 +185,7 @@ done:
remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
} }
gpr_cv_destroy(&worker.cv); gpr_cv_destroy(&worker.cv);
*worker_hdl = NULL; if (worker_hdl) *worker_hdl = NULL;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }

@ -345,7 +345,6 @@ static void dump_pending_tags(grpc_completion_queue *cc) {}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) { gpr_timespec deadline, void *reserved) {
grpc_event ret; grpc_event ret;
grpc_pollset_worker *worker = NULL;
gpr_timespec now; gpr_timespec now;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@ -426,8 +425,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu); gpr_mu_lock(cc->mu);
continue; continue;
} else { } else {
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL,
&worker, now, iteration_deadline); now, iteration_deadline);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err); const char *msg = grpc_error_string(err);

Loading…
Cancel
Save