pull/18813/head
Karthik Ravi Shankar 6 years ago
parent 92aa0530fa
commit a89b1763af
  1. 29
      src/core/lib/surface/completion_queue.cc
  2. 3
      src/core/lib/surface/completion_queue.h
  3. 2
      src/core/lib/surface/server.cc

@ -201,7 +201,7 @@ struct cq_vtable {
bool (*begin_op)(grpc_completion_queue* cq, void* tag); bool (*begin_op)(grpc_completion_queue* cq, void* tag);
void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, bool internal); void* done_arg, grpc_cq_completion* storage);
grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved); void* reserved);
grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
@ -358,17 +358,17 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
static void cq_end_op_for_next( static void cq_end_op_for_next(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal = false); grpc_cq_completion* storage);
static void cq_end_op_for_pluck( static void cq_end_op_for_pluck(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal = false); grpc_cq_completion* storage);
static void cq_end_op_for_callback( static void cq_end_op_for_callback(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal = false); grpc_cq_completion* storage);
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved); void* reserved);
@ -675,7 +675,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
static void cq_end_op_for_next( static void cq_end_op_for_next(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) { grpc_cq_completion* storage) {
GPR_TIMER_SCOPE("cq_end_op_for_next", 0); GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
@ -754,7 +754,7 @@ static void cq_end_op_for_next(
static void cq_end_op_for_pluck( static void cq_end_op_for_pluck(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) { grpc_cq_completion* storage) {
GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
@ -826,7 +826,7 @@ static void functor_callback(void* arg, grpc_error* error) {
static void cq_end_op_for_callback( static void cq_end_op_for_callback(
grpc_completion_queue* cq, void* tag, grpc_error* error, grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) { grpc_cq_completion* storage) {
GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
@ -857,25 +857,18 @@ static void cq_end_op_for_callback(
} }
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag); auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
if (internal) { GRPC_CLOSURE_RUN(
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
(error == GRPC_ERROR_NONE));
} else {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE( GRPC_CLOSURE_CREATE(
functor_callback, functor, functor_callback, functor,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, void* done_arg, grpc_cq_completion* storage) {
bool internal) { cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
} }
typedef struct { typedef struct {
@ -1353,7 +1346,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_RUN(
GRPC_CLOSURE_CREATE( GRPC_CLOSURE_CREATE(
functor_callback, callback, functor_callback, callback,
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),

@ -77,8 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag);
grpc_cq_begin_op */ grpc_cq_begin_op */
void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, void* done_arg, grpc_cq_completion* storage);
bool internal = false);
grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc);

@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
} }
grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
rc, &rc->completion, true); rc, &rc->completion);
} }
static void publish_new_rpc(void* arg, grpc_error* error) { static void publish_new_rpc(void* arg, grpc_error* error) {

Loading…
Cancel
Save