Fix logical thread orphaning

reviewable/pr21361/r3
Yash Tibrewal 5 years ago
parent e36ede8c67
commit 0ae85fe758
  1. 31
      src/core/lib/iomgr/logical_thread.cc
  2. 5
      src/core/lib/iomgr/logical_thread.h
  3. 9
      test/core/iomgr/logical_thread_test.cc

@ -40,7 +40,9 @@ void LogicalThreadImpl::Run(std::function<void()> callback,
this, location.file(), location.line()); this, location.file(), location.line());
} }
const size_t prev_size = size_.FetchAdd(1); const size_t prev_size = size_.FetchAdd(1);
if (prev_size == 0) { // The logical thread should not have been orphaned.
GPR_DEBUG_ASSERT(prev_size > 0);
if (prev_size == 1) {
// There is no other closure executing right now on this logical thread. // There is no other closure executing right now on this logical thread.
// Execute this closure immediately. // Execute this closure immediately.
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
@ -49,11 +51,6 @@ void LogicalThreadImpl::Run(std::function<void()> callback,
callback(); callback();
// Loan this thread to the logical thread and drain the queue. // Loan this thread to the logical thread and drain the queue.
DrainQueue(); DrainQueue();
// It is possible that while draining the queue, one of the callbacks ended
// up orphaning the logical thread. In that case, delete the object.
if (orphaned_.Load(MemoryOrder::ACQUIRE)) {
delete this;
}
} else { } else {
CallbackWrapper* cb_wrapper = CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location); new CallbackWrapper(std::move(callback), location);
@ -67,10 +64,15 @@ void LogicalThreadImpl::Run(std::function<void()> callback,
} }
void LogicalThreadImpl::Orphan() { void LogicalThreadImpl::Orphan() {
if (size_.Load(MemoryOrder::ACQUIRE) == 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, "LogicalThread::Orphan() %p", this);
}
size_t prev_size = size_.FetchSub(1);
if (prev_size == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, " Destroying");
}
delete this; delete this;
} else {
orphaned_.Store(true, MemoryOrder::RELEASE);
} }
} }
@ -85,11 +87,20 @@ void LogicalThreadImpl::DrainQueue() {
} }
size_t prev_size = size_.FetchSub(1); size_t prev_size = size_.FetchSub(1);
GPR_DEBUG_ASSERT(prev_size >= 1); GPR_DEBUG_ASSERT(prev_size >= 1);
// It is possible that while draining the queue, one of the callbacks ended
// up orphaning the logical thread. In that case, delete the object.
if (prev_size == 1) { if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, " Queue Drained. Destroying");
}
delete this;
return;
}
if (prev_size == 2) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, " Queue Drained"); gpr_log(GPR_INFO, " Queue Drained");
} }
break; return;
} }
// There is atleast one callback on the queue. Pop the callback from the // There is atleast one callback on the queue. Pop the callback from the
// queue and execute it. // queue and execute it.

@ -44,8 +44,9 @@ class LogicalThreadImpl : public Orphanable {
private: private:
void DrainQueue(); void DrainQueue();
Atomic<size_t> size_{0}; // An initial size of 1 keeps track of whether the logical thread has been
Atomic<bool> orphaned_{false}; // orphaned.
Atomic<size_t> size_{1};
MultiProducerSingleConsumerQueue queue_; MultiProducerSingleConsumerQueue queue_;
}; };

@ -31,13 +31,10 @@
namespace { namespace {
TEST(LogicalThreadTest, NoOp) { TEST(LogicalThreadTest, NoOp) {
grpc_core::ExecCtx exec_ctx; auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
{ auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); }
} }
TEST(LogicalThreadTest, ExecuteOne) { TEST(LogicalThreadTest, ExecuteOne) {
grpc_core::ExecCtx exec_ctx;
{
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
gpr_event done; gpr_event done;
gpr_event_init(&done); gpr_event_init(&done);
@ -45,7 +42,6 @@ TEST(LogicalThreadTest, ExecuteOne) {
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr); nullptr);
} }
}
class TestThread { class TestThread {
public: public:
@ -97,8 +93,6 @@ class TestThread {
}; };
TEST(LogicalThreadTest, ExecuteMany) { TEST(LogicalThreadTest, ExecuteMany) {
grpc_core::ExecCtx exec_ctx;
{
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
{ {
std::vector<std::unique_ptr<TestThread>> threads; std::vector<std::unique_ptr<TestThread>> threads;
@ -107,7 +101,6 @@ TEST(LogicalThreadTest, ExecuteMany) {
} }
} }
} }
}
} // namespace } // namespace
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save