diff --git a/src/core/lib/iomgr/logical_thread.cc b/src/core/lib/iomgr/logical_thread.cc index aa608cf8891..de1ab580cd5 100644 --- a/src/core/lib/iomgr/logical_thread.cc +++ b/src/core/lib/iomgr/logical_thread.cc @@ -40,7 +40,9 @@ void LogicalThreadImpl::Run(std::function callback, this, location.file(), location.line()); } const size_t prev_size = size_.FetchAdd(1); - if (prev_size == 0) { + // The logical thread should not have been orphaned. + GPR_DEBUG_ASSERT(prev_size > 0); + if (prev_size == 1) { // There is no other closure executing right now on this logical thread. // Execute this closure immediately. if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { @@ -49,11 +51,6 @@ void LogicalThreadImpl::Run(std::function callback, callback(); // Loan this thread to the logical thread and drain the queue. DrainQueue(); - // It is possible that while draining the queue, one of the callbacks ended - // up orphaning the logical thread. In that case, delete the object. - if (orphaned_.Load(MemoryOrder::ACQUIRE)) { - delete this; - } } else { CallbackWrapper* cb_wrapper = new CallbackWrapper(std::move(callback), location); @@ -67,10 +64,15 @@ void LogicalThreadImpl::Run(std::function callback, } void LogicalThreadImpl::Orphan() { - if (size_.Load(MemoryOrder::ACQUIRE) == 0) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { + gpr_log(GPR_INFO, "LogicalThread::Orphan() %p", this); + } + size_t prev_size = size_.FetchSub(1); + if (prev_size == 0) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { + gpr_log(GPR_INFO, " Destroying"); + } delete this; - } else { - orphaned_.Store(true, MemoryOrder::RELEASE); } } @@ -85,11 +87,20 @@ void LogicalThreadImpl::DrainQueue() { } size_t prev_size = size_.FetchSub(1); GPR_DEBUG_ASSERT(prev_size >= 1); + // It is possible that while draining the queue, one of the callbacks ended + // up orphaning the logical thread. In that case, delete the object. if (prev_size == 1) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { + gpr_log(GPR_INFO, " Queue Drained. Destroying"); + } + delete this; + return; + } + if (prev_size == 2) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { gpr_log(GPR_INFO, " Queue Drained"); } - break; + return; } // There is atleast one callback on the queue. Pop the callback from the // queue and execute it. diff --git a/src/core/lib/iomgr/logical_thread.h b/src/core/lib/iomgr/logical_thread.h index b3294a6a527..9184d2ce106 100644 --- a/src/core/lib/iomgr/logical_thread.h +++ b/src/core/lib/iomgr/logical_thread.h @@ -44,8 +44,9 @@ class LogicalThreadImpl : public Orphanable { private: void DrainQueue(); - Atomic size_{0}; - Atomic orphaned_{false}; + // An initial size of 1 keeps track of whether the logical thread has been + // orphaned. + Atomic size_{1}; MultiProducerSingleConsumerQueue queue_; }; diff --git a/test/core/iomgr/logical_thread_test.cc b/test/core/iomgr/logical_thread_test.cc index 88dde0b5df4..0d449e171ae 100644 --- a/test/core/iomgr/logical_thread_test.cc +++ b/test/core/iomgr/logical_thread_test.cc @@ -31,20 +31,16 @@ namespace { TEST(LogicalThreadTest, NoOp) { - grpc_core::ExecCtx exec_ctx; - { auto lock = grpc_core::MakeRefCounted(); } + auto lock = grpc_core::MakeRefCounted(); } TEST(LogicalThreadTest, ExecuteOne) { - grpc_core::ExecCtx exec_ctx; - { - auto lock = grpc_core::MakeRefCounted(); - gpr_event done; - gpr_event_init(&done); - lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION); - EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != - nullptr); - } + auto lock = grpc_core::MakeRefCounted(); + gpr_event done; + gpr_event_init(&done); + lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION); + EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != + nullptr); } class TestThread { @@ -97,14 +93,11 @@ class TestThread { }; TEST(LogicalThreadTest, ExecuteMany) { - grpc_core::ExecCtx exec_ctx; + auto lock = grpc_core::MakeRefCounted(); { - auto lock = grpc_core::MakeRefCounted(); - { - std::vector> threads; - for (size_t i = 0; i < 100; ++i) { - threads.push_back(std::unique_ptr(new TestThread(lock))); - } + std::vector> threads; + for (size_t i = 0; i < 100; ++i) { + threads.push_back(std::unique_ptr(new TestThread(lock))); } } }