Reviewer comments

reviewable/pr21846/r2
Yash Tibrewal 5 years ago
parent 74309886bb
commit 60ec586e2c
  1. 33
      src/core/lib/iomgr/work_serializer.cc
  2. 31
      src/core/lib/iomgr/work_serializer.h

@ -33,6 +33,22 @@ struct CallbackWrapper {
const DebugLocation location;
};
class WorkSerializerImpl : public Orphanable {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void Orphan() override;
private:
void DrainQueue();
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
Atomic<size_t> size_{1};
MultiProducerSingleConsumerQueue queue_;
};
void WorkSerializerImpl::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
@ -79,7 +95,7 @@ void WorkSerializerImpl::Orphan() {
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callback. This is called from within
// WorkSerializer::Run() after executing a callback immediately, and hence size_
// is atleast 1.
// is at least 1.
void WorkSerializerImpl::DrainQueue() {
while (true) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
@ -102,7 +118,7 @@ void WorkSerializerImpl::DrainQueue() {
}
return;
}
// There is atleast one callback on the queue. Pop the callback from the
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
CallbackWrapper* cb_wrapper = nullptr;
bool empty_unused;
@ -123,4 +139,17 @@ void WorkSerializerImpl::DrainQueue() {
delete cb_wrapper;
}
}
// WorkSerializer
WorkSerializer::WorkSerializer()
: impl_(MakeOrphanable<WorkSerializerImpl>()) {}
WorkSerializer::~WorkSerializer() {}
void WorkSerializer::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
impl_->Run(callback, location);
}
} // namespace grpc_core

@ -32,38 +32,29 @@
#define GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H
namespace grpc_core {
extern DebugOnlyTraceFlag grpc_work_serializer_trace;
class WorkSerializerImpl : public Orphanable {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void Orphan() override;
private:
void DrainQueue();
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
Atomic<size_t> size_{1};
MultiProducerSingleConsumerQueue queue_;
};
class WorkSerializerImpl;
// WorkSerializer is a mechanism to schedule callbacks in a synchronized manner.
// All callbacks scheduled on a WorkSerializer instance will be executed
// serially in a borrowed thread. The API provides a FIFO guarantee to the
// execution of callbacks scheduled on the thread.
// When a thread calls Run() with a callback, the thread is considered borrowed.
// The callback might run inline, or it might run asynchronously in a different
// thread that is already inside of Run(). If the callback runs directly inline,
// other callbacks from other threads might also be executed before Run()
// returns. Since an arbitrary set of callbacks might be executed when Run() is
// called, generally no locks should be held while calling Run().
class WorkSerializer {
public:
WorkSerializer() { impl_ = MakeOrphanable<WorkSerializerImpl>(); }
WorkSerializer();
~WorkSerializer();
// TODO(yashkt): Replace grpc_core::DebugLocation with absl::SourceLocation
// once we can start using it directly.
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
impl_->Run(callback, location);
}
const grpc_core::DebugLocation& location);
private:
OrphanablePtr<WorkSerializerImpl> impl_;

Loading…
Cancel
Save