diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index d0ed1a9f673..e796071eedc 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -34,7 +34,6 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/atomic.h" -#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -201,7 +200,7 @@ struct cq_vtable { bool (*begin_op)(grpc_completion_queue* cq, void* tag); void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal); + void* done_arg, grpc_cq_completion* storage); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, @@ -355,20 +354,23 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); // queue. The done argument is a callback that will be invoked when it is // safe to free up that storage. The storage MUST NOT be freed until the // done callback is invoked. -static void cq_end_op_for_next( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal); - -static void cq_end_op_for_pluck( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal); - -static void cq_end_op_for_callback( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal); +static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(void* done_arg, + grpc_cq_completion* storage), + void* done_arg, grpc_cq_completion* storage); + +static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(void* done_arg, + grpc_cq_completion* storage), + void* done_arg, grpc_cq_completion* storage); + +static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(void* done_arg, + grpc_cq_completion* storage), + void* done_arg, grpc_cq_completion* storage); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -672,10 +674,11 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ -static void cq_end_op_for_next( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { +static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(void* done_arg, + grpc_cq_completion* storage), + void* done_arg, grpc_cq_completion* storage) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -751,10 +754,11 @@ static void cq_end_op_for_next( /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_PLUCK) */ -static void cq_end_op_for_pluck( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { +static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(void* done_arg, + grpc_cq_completion* storage), + void* done_arg, grpc_cq_completion* storage) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -817,19 +821,15 @@ static void cq_end_op_for_pluck( GRPC_ERROR_UNREF(error); } -static void functor_callback(void* arg, grpc_error* error) { - auto* functor = static_cast(arg); - functor->functor_run(functor, error == GRPC_ERROR_NONE); -} - /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { + grpc_cq_completion* storage) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); + bool is_success = (error == GRPC_ERROR_NONE); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && @@ -856,25 +856,16 @@ static void cq_end_op_for_callback( cq_finish_shutdown_callback(cq); } - auto* functor = static_cast(tag); - if (internal) { - grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, - (error == GRPC_ERROR_NONE)); - } else { - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_REF(error)); - } GRPC_ERROR_UNREF(error); + + auto* functor = static_cast(tag); + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, - bool internal) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); + void* done_arg, grpc_cq_completion* storage) { + cq->vtable->end_op(cq, tag, error, done, done_arg, storage); } typedef struct { @@ -1352,11 +1343,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, callback, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_NONE); + grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 3ba9fbb8765..d60fe6d6efe 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,8 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, - bool internal = false); + void* done_arg, grpc_cq_completion* storage); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 44de0cd42dd..5ecd5662c2c 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, } grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion, true); + rc, &rc->completion); } static void publish_new_rpc(void* arg, grpc_error* error) { diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index 4a33b934f43..7c3630eaf18 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -23,7 +23,6 @@ #include #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" @@ -360,19 +359,12 @@ static void test_pluck_after_shutdown(void) { static void test_callback(void) { grpc_completion_queue* cc; - static void* tags[128]; + void* tags[128]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; grpc_cq_polling_type polling_types[] = { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; grpc_completion_queue_attributes attr; unsigned i; - static gpr_mu mu, shutdown_mu; - static gpr_cv cv, shutdown_cv; - static int cb_counter; - gpr_mu_init(&mu); - gpr_mu_init(&shutdown_mu); - gpr_cv_init(&cv); - gpr_cv_init(&shutdown_cv); LOG_TEST("test_callback"); @@ -384,11 +376,7 @@ static void test_callback(void) { } ~ShutdownCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { - gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); - // Signal when the shutdown callback is completed. - gpr_cv_signal(&shutdown_cv); - gpr_mu_unlock(&shutdown_mu); } private: @@ -403,9 +391,9 @@ static void test_callback(void) { for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { int sumtags = 0; int counter = 0; - cb_counter = 0; { // reset exec_ctx types + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; attr.cq_polling_type = polling_types[pidx]; cc = grpc_completion_queue_create( @@ -421,13 +409,7 @@ static void test_callback(void) { int ok) { GPR_ASSERT(static_cast(ok)); auto* callback = static_cast(cb); - gpr_mu_lock(&mu); - cb_counter++; *callback->counter_ += callback->tag_; - if (cb_counter == GPR_ARRAY_SIZE(tags)) { - gpr_cv_signal(&cv); - } - gpr_mu_unlock(&mu); grpc_core::Delete(callback); }; @@ -447,34 +429,12 @@ static void test_callback(void) { nullptr, &completions[i]); } - gpr_mu_lock(&mu); - while (cb_counter != GPR_ARRAY_SIZE(tags)) { - // Wait for all the callbacks to complete. - gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&mu); - shutdown_and_destroy(cc); - - gpr_mu_lock(&shutdown_mu); - while (!got_shutdown) { - // Wait for the shutdown callback to complete. - gpr_cv_wait(&shutdown_cv, &shutdown_mu, - gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&shutdown_mu); } - - // Run the assertions to check if the test ran successfully. GPR_ASSERT(sumtags == counter); GPR_ASSERT(got_shutdown); got_shutdown = false; } - - gpr_cv_destroy(&cv); - gpr_cv_destroy(&shutdown_cv); - gpr_mu_destroy(&mu); - gpr_mu_destroy(&shutdown_mu); } struct thread_state { diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 8cf6def1073..a154324216b 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -374,34 +374,6 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { SendRpcs(1, false); } -TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { - MAYBE_SKIP_TEST; - ResetStub(); - std::mutex mu; - std::condition_variable cv; - bool done = false; - EchoRequest request; - request.set_message("Hello locked world."); - EchoResponse response; - ClientContext cli_ctx; - { - std::lock_guard l(mu); - stub_->experimental_async()->Echo( - &cli_ctx, &request, &response, - [&mu, &cv, &done, &request, &response](Status s) { - std::lock_guard l(mu); - EXPECT_TRUE(s.ok()); - EXPECT_EQ(request.message(), response.message()); - done = true; - cv.notify_one(); - }); - } - std::unique_lock l(mu); - while (!done) { - cv.wait(l); - } -} - TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { MAYBE_SKIP_TEST; ResetStub(); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index edbff9c2be3..50eb9454fbe 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -150,9 +150,6 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) { grpc_completion_queue_destroy(cc); } -static gpr_mu shutdown_mu, mu; -static gpr_cv shutdown_cv, cv; - // Tag completion queue iterate times class TagCallback : public grpc_experimental_completion_queue_functor { public: @@ -161,11 +158,8 @@ class TagCallback : public grpc_experimental_completion_queue_functor { } ~TagCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { - gpr_mu_lock(&mu); GPR_ASSERT(static_cast(ok)); *static_cast(cb)->iter_ += 1; - gpr_cv_signal(&cv); - gpr_mu_unlock(&mu); }; private: @@ -180,10 +174,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { } ~ShutdownCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { - gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); - gpr_cv_signal(&shutdown_cv); - gpr_mu_unlock(&shutdown_mu); } private: @@ -192,12 +183,8 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { TrackCounters track_counters; - int iteration = 0, current_iterations = 0; + int iteration = 0; TagCallback tag_cb(&iteration); - gpr_mu_init(&mu); - gpr_cv_init(&cv); - gpr_mu_init(&shutdown_mu); - gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); grpc_completion_queue* cc = @@ -211,29 +198,9 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { nullptr, &completion); } shutdown_and_destroy(cc); - - gpr_mu_lock(&mu); - current_iterations = static_cast(state.iterations()); - while (current_iterations != iteration) { - // Wait for all the callbacks to complete. - gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&mu); - - gpr_mu_lock(&shutdown_mu); - while (!got_shutdown) { - // Wait for the shutdown callback to complete. - gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&shutdown_mu); - GPR_ASSERT(got_shutdown); GPR_ASSERT(iteration == static_cast(state.iterations())); track_counters.Finish(state); - gpr_cv_destroy(&cv); - gpr_mu_destroy(&mu); - gpr_cv_destroy(&shutdown_cv); - gpr_mu_destroy(&shutdown_mu); } BENCHMARK(BM_Callback_CQ_Pass1Core); diff --git a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h index 0d27e0efa50..9fb86bd8299 100644 --- a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h +++ b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h @@ -115,7 +115,7 @@ class BidiClient int msgs_size_; std::mutex mu; std::condition_variable cv; - bool done = false; + bool done; }; template