From e9b4e168ba76163e5dfde30148d6bb7f5edd2563 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 30 Apr 2020 15:38:28 -0700 Subject: [PATCH 1/3] Declare ApplicationCallbackExecCtx at channel's core entry points --- src/core/lib/surface/channel.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 3a359a154df..bcfa5e6207d 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -329,6 +329,7 @@ char* grpc_channel_get_target(grpc_channel* channel) { void grpc_channel_get_info(grpc_channel* channel, const grpc_channel_info* channel_info) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_channel_element* elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); @@ -336,6 +337,7 @@ void grpc_channel_get_info(grpc_channel* channel, } void grpc_channel_reset_connect_backoff(grpc_channel* channel) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1, (channel)); @@ -386,6 +388,7 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel, grpc_slice method, const grpc_slice* host, gpr_timespec deadline, void* reserved) { GPR_ASSERT(!reserved); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_call* call = grpc_channel_create_call_internal( channel, parent_call, propagation_mask, cq, nullptr, @@ -449,6 +452,7 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method, "grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)", 4, (channel, method, host, reserved)); GPR_ASSERT(!reserved); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_core::MutexLock lock(&channel->registration_table->mu); @@ -481,6 +485,7 @@ grpc_call* grpc_channel_create_registered_call( registered_call_handle, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_call* call = grpc_channel_create_call_internal( channel, parent_call, propagation_mask, completion_queue, nullptr, @@ -532,6 +537,7 @@ void grpc_channel_destroy_internal(grpc_channel* channel) { } void grpc_channel_destroy(grpc_channel* channel) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_channel_destroy_internal(channel); } From 415b71c98dce7af2de0b488ca9c5fdece512781a Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 7 May 2020 08:12:41 -0700 Subject: [PATCH 2/3] Defensive programming: ACEC is an optimization and not strictly needed --- src/core/lib/iomgr/exec_ctx.h | 5 +++++ src/core/lib/surface/completion_queue.cc | 9 ++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 4e746bc8505..ebea7cf6bfe 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -357,6 +357,11 @@ class ApplicationCallbackExecCtx { /** Global shutdown for ApplicationCallbackExecCtx. Called by init. */ static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); } + static bool Available() { + return reinterpret_cast( + gpr_tls_get(&callback_exec_ctx_)) != nullptr; + } + private: uintptr_t flags_{0u}; grpc_experimental_completion_queue_functor* head_{nullptr}; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 60cbb244678..7c0c9aeb972 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -874,8 +874,15 @@ static void cq_end_op_for_callback( cq_finish_shutdown_callback(cq); } + // If possible, schedule the callback onto an existing thread-local + // ApplicationCallbackExecCtx, which is a work queue. This is possible for: + // 1. The callback is internally-generated and there is an ACEC available + // 2. The callback is marked inlineable and there is an ACEC available + // 3. We are already running in a background poller thread (which always has + // an ACEC available at the base of the stack). auto* functor = static_cast(tag); - if (internal || functor->inlineable || + if (((internal || functor->inlineable) && + grpc_core::ApplicationCallbackExecCtx::Available()) || grpc_iomgr_is_any_background_poller_thread()) { grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE)); From 611dccb4926ae9f48e4f03e5fde39993ecf0a0b4 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 7 May 2020 08:46:22 -0700 Subject: [PATCH 3/3] Add a test without WritesDone and reorder destruction --- .../end2end/client_callback_end2end_test.cc | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index c428cab8cce..4f8bfeba372 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -299,7 +299,7 @@ class ClientCallbackEnd2endTest } } - void SendGenericEchoAsBidi(int num_rpcs, int reuses) { + void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) { const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { @@ -308,8 +308,8 @@ class ClientCallbackEnd2endTest ByteBuffer> { public: Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name, - const grpc::string& test_str, int reuses) - : reuses_remaining_(reuses) { + const grpc::string& test_str, int reuses, bool do_writes_done) + : reuses_remaining_(reuses), do_writes_done_(do_writes_done) { activate_ = [this, test, method_name, test_str] { if (reuses_remaining_ > 0) { cli_ctx_.reset(new ClientContext); @@ -329,7 +329,11 @@ class ClientCallbackEnd2endTest }; activate_(); } - void OnWriteDone(bool /*ok*/) override { StartWritesDone(); } + void OnWriteDone(bool /*ok*/) override { + if (do_writes_done_) { + StartWritesDone(); + } + } void OnReadDone(bool /*ok*/) override { EchoResponse response; EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); @@ -355,7 +359,10 @@ class ClientCallbackEnd2endTest std::mutex mu_; std::condition_variable cv_; bool done_ = false; - } rpc{this, kMethodName, test_string, reuses}; + const bool do_writes_done_; + }; + + Client rpc(this, kMethodName, test_string, reuses, do_writes_done); rpc.Await(); } @@ -517,13 +524,19 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { MAYBE_SKIP_TEST; ResetStub(); - SendGenericEchoAsBidi(10, 1); + SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { MAYBE_SKIP_TEST; ResetStub(); - SendGenericEchoAsBidi(10, 10); + SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true); +} + +TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) { + MAYBE_SKIP_TEST; + ResetStub(); + SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false); } #if GRPC_ALLOW_EXCEPTIONS