From 8ac0def89739723ca8f53b0534d02230c2b13e88 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 4 Nov 2021 13:41:27 -0700 Subject: [PATCH] Add Schedule and DrainQueue to WorkSerializer (#27902) * Add Schedule and DrainQueue to WorkSerializer * Fix comments * Reviewer comments * Use acq_rel semantics instead of seq_cst * Reviewer comments * s/dummy/no-op * Reviewer comments * Fix * Reviewer comments * Reviewer comments --- src/core/lib/iomgr/work_serializer.cc | 152 ++++++++++++++++++------ src/core/lib/iomgr/work_serializer.h | 16 ++- test/core/iomgr/work_serializer_test.cc | 89 +++++++++++++- 3 files changed, 216 insertions(+), 41 deletions(-) diff --git a/src/core/lib/iomgr/work_serializer.cc b/src/core/lib/iomgr/work_serializer.cc index 92a711deda7..a396c035951 100644 --- a/src/core/lib/iomgr/work_serializer.cc +++ b/src/core/lib/iomgr/work_serializer.cc @@ -24,28 +24,54 @@ namespace grpc_core { DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer"); -struct CallbackWrapper { - CallbackWrapper(std::function cb, const grpc_core::DebugLocation& loc) - : callback(std::move(cb)), location(loc) {} - - MultiProducerSingleConsumerQueue::Node mpscq_node; - const std::function callback; - const DebugLocation location; -}; - class WorkSerializer::WorkSerializerImpl : public Orphanable { public: void Run(std::function callback, const grpc_core::DebugLocation& location); - + void Schedule(std::function callback, + const grpc_core::DebugLocation& location); + void DrainQueue(); void Orphan() override; private: - void DrainQueue(); + struct CallbackWrapper { + CallbackWrapper(std::function cb, + const grpc_core::DebugLocation& loc) + : callback(std::move(cb)), location(loc) {} + + MultiProducerSingleConsumerQueue::Node mpscq_node; + const std::function callback; + const DebugLocation location; + }; + + // Callers of DrainQueueOwned should make sure to grab the lock on the + // workserializer with + // + // prev_ref_pair = + // refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel); + // + // and only invoke DrainQueueOwned() if there was previously no owner. Note + // that the queue size is also incremented as part of the fetch_add to allow + // the callers to add a callback to the queue if another thread already holds + // the lock to the work serializer. + void DrainQueueOwned(); + + // First 16 bits indicate ownership of the WorkSerializer, next 48 bits are + // queue size (i.e., refs). + static uint64_t MakeRefPair(uint16_t owners, uint64_t size) { + GPR_ASSERT(size >> 48 == 0); + return (static_cast(owners) << 48) + static_cast(size); + } + static uint32_t GetOwners(uint64_t ref_pair) { + return static_cast(ref_pair >> 48); + } + static uint32_t GetSize(uint64_t ref_pair) { + return static_cast(ref_pair & 0xffffffffffffu); + } // An initial size of 1 keeps track of whether the work serializer has been // orphaned. - std::atomic size_{1}; + std::atomic refs_{MakeRefPair(0, 1)}; MultiProducerSingleConsumerQueue queue_; }; @@ -55,23 +81,25 @@ void WorkSerializer::WorkSerializerImpl::Run( gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]", this, location.file(), location.line()); } - const size_t prev_size = size_.fetch_add(1); + // Increment queue size for the new callback and owner count to attempt to + // take ownership of the WorkSerializer. + const uint64_t prev_ref_pair = + refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel); // The work serializer should not have been orphaned. - GPR_DEBUG_ASSERT(prev_size > 0); - if (prev_size == 1) { - // There is no other closure executing right now on this work serializer. - // Execute this closure immediately. + GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0); + if (GetOwners(prev_ref_pair) == 0) { + // We took ownership of the WorkSerializer. Invoke callback and drain queue. if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Executing immediately"); } callback(); - // Loan this thread to the work serializer thread and drain the queue. - DrainQueue(); + DrainQueueOwned(); } else { + // Another thread is holding the WorkSerializer, so decrement the ownership + // count we just added and queue the callback. + refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel); CallbackWrapper* cb_wrapper = new CallbackWrapper(std::move(callback), location); - // There already are closures executing on this work serializer. Simply add - // this closure to the queue. if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper); } @@ -79,12 +107,27 @@ void WorkSerializer::WorkSerializerImpl::Run( } } +void WorkSerializer::WorkSerializerImpl::Schedule( + std::function callback, const grpc_core::DebugLocation& location) { + CallbackWrapper* cb_wrapper = + new CallbackWrapper(std::move(callback), location); + if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { + gpr_log(GPR_INFO, + "WorkSerializer::Schedule() %p Scheduling callback %p [%s:%d]", + this, cb_wrapper, location.file(), location.line()); + } + refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel); + queue_.Push(&cb_wrapper->mpscq_node); +} + void WorkSerializer::WorkSerializerImpl::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this); } - size_t prev_size = size_.fetch_sub(1); - if (prev_size == 1) { + uint64_t prev_ref_pair = + refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel); + if (GetSize(prev_ref_pair) == 1) { + GPR_DEBUG_ASSERT(GetOwners(prev_ref_pair) == 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Destroying"); } @@ -93,30 +136,52 @@ void WorkSerializer::WorkSerializerImpl::Orphan() { } // The thread that calls this loans itself to the work serializer so as to -// execute all the scheduled callback. This is called from within -// WorkSerializer::Run() after executing a callback immediately, and hence size_ -// is at least 1. +// execute all the scheduled callbacks. void WorkSerializer::WorkSerializerImpl::DrainQueue() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { + gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this); + } + // Attempt to take ownership of the WorkSerializer. Also increment the queue + // size as required by `DrainQueueOwned()`. + const uint64_t prev_ref_pair = + refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel); + if (GetOwners(prev_ref_pair) == 0) { + // We took ownership of the WorkSerializer. Drain the queue. + DrainQueueOwned(); + } else { + // Another thread is holding the WorkSerializer, so decrement the ownership + // count we just added and queue a no-op callback. + refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel); + CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION); + queue_.Push(&cb_wrapper->mpscq_node); + } +} + +void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { + gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this); + } while (true) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { - gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this); - } - size_t prev_size = size_.fetch_sub(1); - GPR_DEBUG_ASSERT(prev_size >= 1); - // It is possible that while draining the queue, one of the callbacks ended + auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1)); + // It is possible that while draining the queue, the last callback ended // up orphaning the work serializer. In that case, delete the object. - if (prev_size == 1) { + if (GetSize(prev_ref_pair) == 1) { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Queue Drained. Destroying"); } delete this; return; } - if (prev_size == 2) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { - gpr_log(GPR_INFO, " Queue Drained"); + if (GetSize(prev_ref_pair) == 2) { + // Queue drained. Give up ownership but only if queue remains empty. Note + // that we are using relaxed memory order semantics for the load on + // failure since we don't care about that value. + uint64_t expected = MakeRefPair(1, 1); + if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1), + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return; } - return; } // There is at least one callback on the queue. Pop the callback from the // queue and execute it. @@ -124,8 +189,8 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() { bool empty_unused; while ((cb_wrapper = reinterpret_cast( queue_.PopAndCheckEnd(&empty_unused))) == nullptr) { - // This can happen either due to a race condition within the mpscq - // implementation or because of a race with Run() + // This can happen due to a race condition within the mpscq + // implementation or because of a race with Run()/Schedule(). if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Queue returned nullptr, trying again"); } @@ -140,7 +205,9 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() { } } +// // WorkSerializer +// WorkSerializer::WorkSerializer() : impl_(MakeOrphanable()) {} @@ -152,4 +219,11 @@ void WorkSerializer::Run(std::function callback, impl_->Run(std::move(callback), location); } +void WorkSerializer::Schedule(std::function callback, + const grpc_core::DebugLocation& location) { + impl_->Schedule(std::move(callback), location); +} + +void WorkSerializer::DrainQueue() { impl_->DrainQueue(); } + } // namespace grpc_core diff --git a/src/core/lib/iomgr/work_serializer.h b/src/core/lib/iomgr/work_serializer.h index 78debea3a66..8fa70a7a660 100644 --- a/src/core/lib/iomgr/work_serializer.h +++ b/src/core/lib/iomgr/work_serializer.h @@ -45,13 +45,20 @@ namespace grpc_core { // other callbacks from other threads might also be executed before Run() // returns. Since an arbitrary set of callbacks might be executed when Run() is // called, generally no locks should be held while calling Run(). +// If a thread wants to preclude the possibility of the callback being invoked +// inline in Run() (for example, if a mutex lock is held and executing callbacks +// inline would cause a deadlock), it should use Schedule() instead and then +// invoke DrainQueue() when it is safe to invoke the callback. class ABSL_LOCKABLE WorkSerializer { public: WorkSerializer(); ~WorkSerializer(); - // Runs a given callback. + // Runs a given callback on the work serializer. If there is no other thread + // currently executing the WorkSerializer, the callback is run immediately. In + // this case, the current thread is also borrowed for draining the queue for + // any callbacks that get added in the meantime. // // If you want to use clang thread annotation to make sure that callback is // called by WorkSerializer only, you need to add the annotation to both the @@ -70,6 +77,13 @@ class ABSL_LOCKABLE WorkSerializer { void Run(std::function callback, const grpc_core::DebugLocation& location); + // Schedule \a callback to be run later when the queue of callbacks is + // drained. + void Schedule(std::function callback, + const grpc_core::DebugLocation& location); + // Drains the queue of callbacks. + void DrainQueue(); + private: class WorkSerializerImpl; diff --git a/test/core/iomgr/work_serializer_test.cc b/test/core/iomgr/work_serializer_test.cc index 09258747800..624d6988713 100644 --- a/test/core/iomgr/work_serializer_test.cc +++ b/test/core/iomgr/work_serializer_test.cc @@ -35,7 +35,7 @@ namespace { TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; } -TEST(WorkSerializerTest, ExecuteOne) { +TEST(WorkSerializerTest, ExecuteOneRun) { grpc_core::WorkSerializer lock; gpr_event done; gpr_event_init(&done); @@ -45,6 +45,18 @@ TEST(WorkSerializerTest, ExecuteOne) { nullptr); } +TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) { + grpc_core::WorkSerializer lock; + gpr_event done; + gpr_event_init(&done); + lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast(1)); }, + DEBUG_LOCATION); + EXPECT_EQ(gpr_event_get(&done), nullptr); + lock.DrainQueue(); + EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != + nullptr); +} + class TestThread { public: explicit TestThread(grpc_core::WorkSerializer* lock) @@ -103,6 +115,81 @@ TEST(WorkSerializerTest, ExecuteMany) { } } } + +class TestThreadScheduleAndDrain { + public: + explicit TestThreadScheduleAndDrain(grpc_core::WorkSerializer* lock) + : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) { + gpr_event_init(&done_); + thread_.Start(); + } + + ~TestThreadScheduleAndDrain() { + EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)), + nullptr); + thread_.Join(); + } + + private: + static void ExecuteManyLoop(void* arg) { + TestThreadScheduleAndDrain* self = + static_cast(arg); + size_t n = 1; + for (size_t i = 0; i < 10; i++) { + for (size_t j = 0; j < 10000; j++) { + struct ExecutionArgs { + size_t* counter; + size_t value; + }; + ExecutionArgs* c = new ExecutionArgs; + c->counter = &self->counter_; + c->value = n++; + self->lock_->Schedule( + [c]() { + EXPECT_TRUE(*c->counter == c->value - 1); + *c->counter = c->value; + delete c; + }, + DEBUG_LOCATION); + } + self->lock_->DrainQueue(); + // sleep for a little bit, to test other threads picking up the load + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); + } + self->lock_->Run( + [self]() { gpr_event_set(&self->done_, reinterpret_cast(1)); }, + DEBUG_LOCATION); + } + + grpc_core::WorkSerializer* lock_ = nullptr; + grpc_core::Thread thread_; + size_t counter_ = 0; + gpr_event done_; +}; + +TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) { + grpc_core::WorkSerializer lock; + { + std::vector> threads; + for (size_t i = 0; i < 100; ++i) { + threads.push_back(absl::make_unique(&lock)); + } + } +} + +TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) { + grpc_core::WorkSerializer lock; + { + std::vector> run_threads; + std::vector> schedule_threads; + for (size_t i = 0; i < 50; ++i) { + run_threads.push_back(absl::make_unique(&lock)); + schedule_threads.push_back( + absl::make_unique(&lock)); + } + } +} + } // namespace int main(int argc, char** argv) {