diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index cdf1020051f..d040e91b796 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -201,7 +201,7 @@ struct cq_vtable { bool (*begin_op)(grpc_completion_queue* cq, void* tag); void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, @@ -359,19 +359,19 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal = false); static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal = false); static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal = false); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -679,7 +679,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { + void* done_arg, grpc_cq_completion* storage, + bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -759,7 +760,8 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { + void* done_arg, grpc_cq_completion* storage, + bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -831,7 +833,8 @@ static void functor_callback(void* arg, grpc_error* error) { static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage) { + grpc_cq_completion* storage, + bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -862,19 +865,23 @@ static void cq_end_op_for_callback( } auto* functor = static_cast(tag); - GRPC_CLOSURE_SCHED( + if (internal) { + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE)); + } else { + GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE( functor_callback, functor, grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage); + void* done_arg, grpc_cq_completion* storage, bool internal) { + cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } typedef struct { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index d60fe6d6efe..f5b0822fcb4 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,7 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 2377c4d8f23..19f61c548d6 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, } grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion); + rc, &rc->completion, true); } static void publish_new_rpc(void* arg, grpc_error* error) {