Add Schedule and DrainQueue to WorkSerializer (#27902)

* Add Schedule and DrainQueue to WorkSerializer

* Fix comments

* Reviewer comments

* Use acq_rel semantics instead of seq_cst

* Reviewer comments

* s/dummy/no-op

* Reviewer comments

* Fix

* Reviewer comments

* Reviewer comments
pull/27949/head
Yash Tibrewal 3 years ago committed by GitHub
parent 65e300e55d
commit 8ac0def897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 152
      src/core/lib/iomgr/work_serializer.cc
  2. 16
      src/core/lib/iomgr/work_serializer.h
  3. 89
      test/core/iomgr/work_serializer_test.cc

@ -24,28 +24,54 @@ namespace grpc_core {
DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
: callback(std::move(cb)), location(loc) {}
MultiProducerSingleConsumerQueue::Node mpscq_node;
const std::function<void()> callback;
const DebugLocation location;
};
class WorkSerializer::WorkSerializerImpl : public Orphanable {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void Schedule(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void DrainQueue();
void Orphan() override;
private:
void DrainQueue();
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb,
const grpc_core::DebugLocation& loc)
: callback(std::move(cb)), location(loc) {}
MultiProducerSingleConsumerQueue::Node mpscq_node;
const std::function<void()> callback;
const DebugLocation location;
};
// Callers of DrainQueueOwned should make sure to grab the lock on the
// workserializer with
//
// prev_ref_pair =
// refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
//
// and only invoke DrainQueueOwned() if there was previously no owner. Note
// that the queue size is also incremented as part of the fetch_add to allow
// the callers to add a callback to the queue if another thread already holds
// the lock to the work serializer.
void DrainQueueOwned();
// First 16 bits indicate ownership of the WorkSerializer, next 48 bits are
// queue size (i.e., refs).
static uint64_t MakeRefPair(uint16_t owners, uint64_t size) {
GPR_ASSERT(size >> 48 == 0);
return (static_cast<uint64_t>(owners) << 48) + static_cast<int64_t>(size);
}
static uint32_t GetOwners(uint64_t ref_pair) {
return static_cast<uint32_t>(ref_pair >> 48);
}
static uint32_t GetSize(uint64_t ref_pair) {
return static_cast<uint32_t>(ref_pair & 0xffffffffffffu);
}
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
std::atomic<size_t> size_{1};
std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
MultiProducerSingleConsumerQueue queue_;
};
@ -55,23 +81,25 @@ void WorkSerializer::WorkSerializerImpl::Run(
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
}
const size_t prev_size = size_.fetch_add(1);
// Increment queue size for the new callback and owner count to attempt to
// take ownership of the WorkSerializer.
const uint64_t prev_ref_pair =
refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
// The work serializer should not have been orphaned.
GPR_DEBUG_ASSERT(prev_size > 0);
if (prev_size == 1) {
// There is no other closure executing right now on this work serializer.
// Execute this closure immediately.
GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
if (GetOwners(prev_ref_pair) == 0) {
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
callback();
// Loan this thread to the work serializer thread and drain the queue.
DrainQueue();
DrainQueueOwned();
} else {
// Another thread is holding the WorkSerializer, so decrement the ownership
// count we just added and queue the callback.
refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
// There already are closures executing on this work serializer. Simply add
// this closure to the queue.
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
}
@ -79,12 +107,27 @@ void WorkSerializer::WorkSerializerImpl::Run(
}
}
void WorkSerializer::WorkSerializerImpl::Schedule(
std::function<void()> callback, const grpc_core::DebugLocation& location) {
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO,
"WorkSerializer::Schedule() %p Scheduling callback %p [%s:%d]",
this, cb_wrapper, location.file(), location.line());
}
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
queue_.Push(&cb_wrapper->mpscq_node);
}
void WorkSerializer::WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
size_t prev_size = size_.fetch_sub(1);
if (prev_size == 1) {
uint64_t prev_ref_pair =
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
if (GetSize(prev_ref_pair) == 1) {
GPR_DEBUG_ASSERT(GetOwners(prev_ref_pair) == 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Destroying");
}
@ -93,30 +136,52 @@ void WorkSerializer::WorkSerializerImpl::Orphan() {
}
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callback. This is called from within
// WorkSerializer::Run() after executing a callback immediately, and hence size_
// is at least 1.
// execute all the scheduled callbacks.
void WorkSerializer::WorkSerializerImpl::DrainQueue() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
// Attempt to take ownership of the WorkSerializer. Also increment the queue
// size as required by `DrainQueueOwned()`.
const uint64_t prev_ref_pair =
refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
if (GetOwners(prev_ref_pair) == 0) {
// We took ownership of the WorkSerializer. Drain the queue.
DrainQueueOwned();
} else {
// Another thread is holding the WorkSerializer, so decrement the ownership
// count we just added and queue a no-op callback.
refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
CallbackWrapper* cb_wrapper = new CallbackWrapper([]() {}, DEBUG_LOCATION);
queue_.Push(&cb_wrapper->mpscq_node);
}
}
void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
}
while (true) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
size_t prev_size = size_.fetch_sub(1);
GPR_DEBUG_ASSERT(prev_size >= 1);
// It is possible that while draining the queue, one of the callbacks ended
auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
// It is possible that while draining the queue, the last callback ended
// up orphaning the work serializer. In that case, delete the object.
if (prev_size == 1) {
if (GetSize(prev_ref_pair) == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained. Destroying");
}
delete this;
return;
}
if (prev_size == 2) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained");
if (GetSize(prev_ref_pair) == 2) {
// Queue drained. Give up ownership but only if queue remains empty. Note
// that we are using relaxed memory order semantics for the load on
// failure since we don't care about that value.
uint64_t expected = MakeRefPair(1, 1);
if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
return;
}
return;
}
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
@ -124,8 +189,8 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
bool empty_unused;
while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
// This can happen either due to a race condition within the mpscq
// implementation or because of a race with Run()
// This can happen due to a race condition within the mpscq
// implementation or because of a race with Run()/Schedule().
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
}
@ -140,7 +205,9 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
}
}
//
// WorkSerializer
//
WorkSerializer::WorkSerializer()
: impl_(MakeOrphanable<WorkSerializerImpl>()) {}
@ -152,4 +219,11 @@ void WorkSerializer::Run(std::function<void()> callback,
impl_->Run(std::move(callback), location);
}
void WorkSerializer::Schedule(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
impl_->Schedule(std::move(callback), location);
}
void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }
} // namespace grpc_core

@ -45,13 +45,20 @@ namespace grpc_core {
// other callbacks from other threads might also be executed before Run()
// returns. Since an arbitrary set of callbacks might be executed when Run() is
// called, generally no locks should be held while calling Run().
// If a thread wants to preclude the possibility of the callback being invoked
// inline in Run() (for example, if a mutex lock is held and executing callbacks
// inline would cause a deadlock), it should use Schedule() instead and then
// invoke DrainQueue() when it is safe to invoke the callback.
class ABSL_LOCKABLE WorkSerializer {
public:
WorkSerializer();
~WorkSerializer();
// Runs a given callback.
// Runs a given callback on the work serializer. If there is no other thread
// currently executing the WorkSerializer, the callback is run immediately. In
// this case, the current thread is also borrowed for draining the queue for
// any callbacks that get added in the meantime.
//
// If you want to use clang thread annotation to make sure that callback is
// called by WorkSerializer only, you need to add the annotation to both the
@ -70,6 +77,13 @@ class ABSL_LOCKABLE WorkSerializer {
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
// Schedule \a callback to be run later when the queue of callbacks is
// drained.
void Schedule(std::function<void()> callback,
const grpc_core::DebugLocation& location);
// Drains the queue of callbacks.
void DrainQueue();
private:
class WorkSerializerImpl;

@ -35,7 +35,7 @@
namespace {
TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; }
TEST(WorkSerializerTest, ExecuteOne) {
TEST(WorkSerializerTest, ExecuteOneRun) {
grpc_core::WorkSerializer lock;
gpr_event done;
gpr_event_init(&done);
@ -45,6 +45,18 @@ TEST(WorkSerializerTest, ExecuteOne) {
nullptr);
}
TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
grpc_core::WorkSerializer lock;
gpr_event done;
gpr_event_init(&done);
lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
DEBUG_LOCATION);
EXPECT_EQ(gpr_event_get(&done), nullptr);
lock.DrainQueue();
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
}
class TestThread {
public:
explicit TestThread(grpc_core::WorkSerializer* lock)
@ -103,6 +115,81 @@ TEST(WorkSerializerTest, ExecuteMany) {
}
}
}
class TestThreadScheduleAndDrain {
public:
explicit TestThreadScheduleAndDrain(grpc_core::WorkSerializer* lock)
: lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
}
~TestThreadScheduleAndDrain() {
EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
nullptr);
thread_.Join();
}
private:
static void ExecuteManyLoop(void* arg) {
TestThreadScheduleAndDrain* self =
static_cast<TestThreadScheduleAndDrain*>(arg);
size_t n = 1;
for (size_t i = 0; i < 10; i++) {
for (size_t j = 0; j < 10000; j++) {
struct ExecutionArgs {
size_t* counter;
size_t value;
};
ExecutionArgs* c = new ExecutionArgs;
c->counter = &self->counter_;
c->value = n++;
self->lock_->Schedule(
[c]() {
EXPECT_TRUE(*c->counter == c->value - 1);
*c->counter = c->value;
delete c;
},
DEBUG_LOCATION);
}
self->lock_->DrainQueue();
// sleep for a little bit, to test other threads picking up the load
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
}
self->lock_->Run(
[self]() { gpr_event_set(&self->done_, reinterpret_cast<void*>(1)); },
DEBUG_LOCATION);
}
grpc_core::WorkSerializer* lock_ = nullptr;
grpc_core::Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
grpc_core::WorkSerializer lock;
{
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
for (size_t i = 0; i < 100; ++i) {
threads.push_back(absl::make_unique<TestThreadScheduleAndDrain>(&lock));
}
}
}
TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
grpc_core::WorkSerializer lock;
{
std::vector<std::unique_ptr<TestThread>> run_threads;
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
for (size_t i = 0; i < 50; ++i) {
run_threads.push_back(absl::make_unique<TestThread>(&lock));
schedule_threads.push_back(
absl::make_unique<TestThreadScheduleAndDrain>(&lock));
}
}
}
} // namespace
int main(int argc, char** argv) {

Loading…
Cancel
Save