diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index e796071eedc..d0ed1a9f673 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -34,6 +34,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/atomic.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -200,7 +201,7 @@ struct cq_vtable { bool (*begin_op)(grpc_completion_queue* cq, void* tag); void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, @@ -354,23 +355,20 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); // queue. The done argument is a callback that will be invoked when it is // safe to free up that storage. The storage MUST NOT be freed until the // done callback is invoked. -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); - -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); - -static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); +static void cq_end_op_for_next( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal); + +static void cq_end_op_for_pluck( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal); + +static void cq_end_op_for_callback( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -674,11 +672,10 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { +static void cq_end_op_for_next( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -754,11 +751,10 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_PLUCK) */ -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { +static void cq_end_op_for_pluck( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -821,15 +817,19 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, GRPC_ERROR_UNREF(error); } +static void functor_callback(void* arg, grpc_error* error) { + auto* functor = static_cast(arg); + functor->functor_run(functor, error == GRPC_ERROR_NONE); +} + /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage) { + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); - bool is_success = (error == GRPC_ERROR_NONE); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && @@ -856,16 +856,25 @@ static void cq_end_op_for_callback( cq_finish_shutdown_callback(cq); } - GRPC_ERROR_UNREF(error); - auto* functor = static_cast(tag); - grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success); + if (internal) { + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, + (error == GRPC_ERROR_NONE)); + } else { + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE( + functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage); + void* done_arg, grpc_cq_completion* storage, + bool internal) { + cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } typedef struct { @@ -1343,7 +1352,11 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE( + functor_callback, callback, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_NONE); } static void cq_shutdown_callback(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index d60fe6d6efe..3ba9fbb8765 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,7 +77,8 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, + bool internal = false); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 2377c4d8f23..19f61c548d6 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, } grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion); + rc, &rc->completion, true); } static void publish_new_rpc(void* arg, grpc_error* error) { diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index 7c3630eaf18..4a33b934f43 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -23,6 +23,7 @@ #include #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" @@ -359,12 +360,19 @@ static void test_pluck_after_shutdown(void) { static void test_callback(void) { grpc_completion_queue* cc; - void* tags[128]; + static void* tags[128]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; grpc_cq_polling_type polling_types[] = { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; grpc_completion_queue_attributes attr; unsigned i; + static gpr_mu mu, shutdown_mu; + static gpr_cv cv, shutdown_cv; + static int cb_counter; + gpr_mu_init(&mu); + gpr_mu_init(&shutdown_mu); + gpr_cv_init(&cv); + gpr_cv_init(&shutdown_cv); LOG_TEST("test_callback"); @@ -376,7 +384,11 @@ static void test_callback(void) { } ~ShutdownCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); + // Signal when the shutdown callback is completed. + gpr_cv_signal(&shutdown_cv); + gpr_mu_unlock(&shutdown_mu); } private: @@ -391,9 +403,9 @@ static void test_callback(void) { for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { int sumtags = 0; int counter = 0; + cb_counter = 0; { // reset exec_ctx types - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; attr.cq_polling_type = polling_types[pidx]; cc = grpc_completion_queue_create( @@ -409,7 +421,13 @@ static void test_callback(void) { int ok) { GPR_ASSERT(static_cast(ok)); auto* callback = static_cast(cb); + gpr_mu_lock(&mu); + cb_counter++; *callback->counter_ += callback->tag_; + if (cb_counter == GPR_ARRAY_SIZE(tags)) { + gpr_cv_signal(&cv); + } + gpr_mu_unlock(&mu); grpc_core::Delete(callback); }; @@ -429,12 +447,34 @@ static void test_callback(void) { nullptr, &completions[i]); } + gpr_mu_lock(&mu); + while (cb_counter != GPR_ARRAY_SIZE(tags)) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + shutdown_and_destroy(cc); + + gpr_mu_lock(&shutdown_mu); + while (!got_shutdown) { + // Wait for the shutdown callback to complete. + gpr_cv_wait(&shutdown_cv, &shutdown_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&shutdown_mu); } + + // Run the assertions to check if the test ran successfully. GPR_ASSERT(sumtags == counter); GPR_ASSERT(got_shutdown); got_shutdown = false; } + + gpr_cv_destroy(&cv); + gpr_cv_destroy(&shutdown_cv); + gpr_mu_destroy(&mu); + gpr_mu_destroy(&shutdown_mu); } struct thread_state { diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index a154324216b..8cf6def1073 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -374,6 +374,34 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { SendRpcs(1, false); } +TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { + MAYBE_SKIP_TEST; + ResetStub(); + std::mutex mu; + std::condition_variable cv; + bool done = false; + EchoRequest request; + request.set_message("Hello locked world."); + EchoResponse response; + ClientContext cli_ctx; + { + std::lock_guard l(mu); + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&mu, &cv, &done, &request, &response](Status s) { + std::lock_guard l(mu); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(request.message(), response.message()); + done = true; + cv.notify_one(); + }); + } + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } +} + TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { MAYBE_SKIP_TEST; ResetStub(); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 50eb9454fbe..edbff9c2be3 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -150,6 +150,9 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) { grpc_completion_queue_destroy(cc); } +static gpr_mu shutdown_mu, mu; +static gpr_cv shutdown_cv, cv; + // Tag completion queue iterate times class TagCallback : public grpc_experimental_completion_queue_functor { public: @@ -158,8 +161,11 @@ class TagCallback : public grpc_experimental_completion_queue_functor { } ~TagCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&mu); GPR_ASSERT(static_cast(ok)); *static_cast(cb)->iter_ += 1; + gpr_cv_signal(&cv); + gpr_mu_unlock(&mu); }; private: @@ -174,7 +180,10 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { } ~ShutdownCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); + gpr_cv_signal(&shutdown_cv); + gpr_mu_unlock(&shutdown_mu); } private: @@ -183,8 +192,12 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { TrackCounters track_counters; - int iteration = 0; + int iteration = 0, current_iterations = 0; TagCallback tag_cb(&iteration); + gpr_mu_init(&mu); + gpr_cv_init(&cv); + gpr_mu_init(&shutdown_mu); + gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); grpc_completion_queue* cc = @@ -198,9 +211,29 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { nullptr, &completion); } shutdown_and_destroy(cc); + + gpr_mu_lock(&mu); + current_iterations = static_cast(state.iterations()); + while (current_iterations != iteration) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + + gpr_mu_lock(&shutdown_mu); + while (!got_shutdown) { + // Wait for the shutdown callback to complete. + gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&shutdown_mu); + GPR_ASSERT(got_shutdown); GPR_ASSERT(iteration == static_cast(state.iterations())); track_counters.Finish(state); + gpr_cv_destroy(&cv); + gpr_mu_destroy(&mu); + gpr_cv_destroy(&shutdown_cv); + gpr_mu_destroy(&shutdown_mu); } BENCHMARK(BM_Callback_CQ_Pass1Core); diff --git a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h index 9fb86bd8299..0d27e0efa50 100644 --- a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h +++ b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h @@ -115,7 +115,7 @@ class BidiClient int msgs_size_; std::mutex mu; std::condition_variable cv; - bool done; + bool done = false; }; template