diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 1601802d1f8..c2469a2006b 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -328,4 +328,10 @@ void Combiner::Run(grpc_closure* closure, grpc_error_handle error) { void Combiner::FinallyRun(grpc_closure* closure, grpc_error_handle error) { combiner_finally_exec(this, closure, error); } + +void Combiner::ForceOffload() { + gpr_atm_no_barrier_store(&initiating_exec_ctx_or_null, 0); + ExecCtx::Get()->SetReadyToFinishFlag(); +} + } // namespace grpc_core diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 0ab496b1bd7..2a4cd7e6da4 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -36,6 +36,8 @@ class Combiner { void Run(grpc_closure* closure, grpc_error_handle error); // TODO(yashkt) : Remove this method void FinallyRun(grpc_closure* closure, grpc_error_handle error); + // Force the next combiner execution to be offloaded + void ForceOffload(); Combiner* next_combiner_on_this_exec_ctx = nullptr; MultiProducerSingleConsumerQueue queue; // either: diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 3e3227cb2cd..34a85df79e7 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -184,6 +184,8 @@ class GRPC_DLL ExecCtx { } } + void SetReadyToFinishFlag() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; } + Timestamp Now() { return Timestamp::Now(); } void InvalidateNow() { time_cache_.InvalidateCache(); } void SetNowIomgrShutdown() { diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index de6f8c3aa7f..be51f13ea7a 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/combiner.h" +#include + #include #include @@ -26,11 +28,11 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/thd.h" #include "test/core/util/test_config.h" TEST(CombinerTest, TestNoOp) { - gpr_log(GPR_DEBUG, "test_no_op"); grpc_core::ExecCtx exec_ctx; GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op"); } @@ -40,8 +42,6 @@ static void set_event_to_true(void* value, grpc_error_handle /*error*/) { } TEST(CombinerTest, TestExecuteOne) { - gpr_log(GPR_DEBUG, "test_execute_one"); - grpc_core::Combiner* lock = grpc_combiner_create(); gpr_event done; gpr_event_init(&done); @@ -94,8 +94,6 @@ static void execute_many_loop(void* a) { } TEST(CombinerTest, TestExecuteMany) { - gpr_log(GPR_DEBUG, "test_execute_many"); - grpc_core::Combiner* lock = grpc_combiner_create(); grpc_core::Thread thds[10]; thd_args ta[GPR_ARRAY_SIZE(thds)]; @@ -122,13 +120,11 @@ static void in_finally(void* /*arg*/, grpc_error_handle /*error*/) { } static void add_finally(void* arg, grpc_error_handle /*error*/) { - static_cast(arg)->Run( + static_cast(arg)->FinallyRun( GRPC_CLOSURE_CREATE(in_finally, arg, nullptr), absl::OkStatus()); } TEST(CombinerTest, TestExecuteFinally) { - gpr_log(GPR_DEBUG, "test_execute_finally"); - grpc_core::Combiner* lock = grpc_combiner_create(); grpc_core::ExecCtx exec_ctx; gpr_event_init(&got_in_finally); @@ -140,6 +136,39 @@ TEST(CombinerTest, TestExecuteFinally) { GRPC_COMBINER_UNREF(lock, "test_execute_finally"); } +TEST(CombinerTest, TestForceOffload) { + grpc_core::Combiner* lock = grpc_combiner_create(); + grpc_core::ExecCtx exec_ctx; + grpc_core::Notification done; + const auto start_thread = std::this_thread::get_id(); + lock->Run(grpc_core::NewClosure([&](grpc_error_handle) { + // Initial execution should get done in the exec ctx flush below, + // so thread stays the same. + EXPECT_EQ(start_thread, std::this_thread::get_id()); + lock->Run(grpc_core::NewClosure([&](grpc_error_handle) { + // Next one should stick to the same thread too + // (proves we're not offloading all the time). + EXPECT_EQ(start_thread, std::this_thread::get_id()); + // Force the offload. + lock->ForceOffload(); + lock->Run( + grpc_core::NewClosure([&](grpc_error_handle) { + // We should see *not* the starting thread being + // the executor now. + EXPECT_NE(start_thread, + std::this_thread::get_id()); + done.Notify(); + }), + absl::OkStatus()); + }), + absl::OkStatus()); + }), + absl::OkStatus()); + exec_ctx.Flush(); + done.WaitForNotification(); + GRPC_COMBINER_UNREF(lock, "test_force_offload"); +} + int main(int argc, char** argv) { grpc::testing::TestEnvironment env(&argc, argv); ::testing::InitGoogleTest(&argc, argv);