Run callbacks on same thread if trigerred from background thread

pull/19406/head
Karthik Ravi Shankar 6 years ago
parent 6f73241f71
commit 7a22143d7a
  1. 24
      src/core/lib/surface/completion_queue.cc

@ -857,17 +857,20 @@ static void cq_end_op_for_callback(
}
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
if (internal) {
if (internal || grpc_iomgr_is_any_background_poller_thread()) {
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
(error == GRPC_ERROR_NONE));
GRPC_ERROR_UNREF(error);
} else {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
functor_callback, functor,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
error);
return;
}
// Schedule the shutdown callback on a closure if not internal or triggered
// from a background poller thread.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
functor_callback, functor,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
error);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@ -1352,6 +1355,13 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
if (grpc_iomgr_is_any_background_poller_thread()) {
grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
return;
}
// Schedule the shutdown callback on a closure if not internal or triggered
// from a background poller thread.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
functor_callback, callback,

Loading…
Cancel
Save