Change Get() signature, modify shut_down assertion & memoryorder

pull/19544/head
Yunjia Wang 5 years ago
parent 6518c2c67d
commit 53b75d8c92
  1. 19
      src/core/lib/iomgr/executor/mpmcqueue.cc
  2. 12
      src/core/lib/iomgr/executor/mpmcqueue.h
  3. 25
      src/core/lib/iomgr/executor/threadpool.cc
  4. 5
      src/core/lib/iomgr/executor/threadpool.h
  5. 15
      test/core/iomgr/threadpool_test.cc

@ -98,33 +98,26 @@ void InfLenFIFOQueue::Put(void* elem) {
}
}
void* InfLenFIFOQueue::Get() {
MutexLock l(&mu_);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
num_waiters_++;
do {
wait_nonempty_.Wait(&mu_);
} while (count_.Load(MemoryOrder::RELAXED) == 0);
num_waiters_--;
}
GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
return PopFront();
}
void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
MutexLock l(&mu_);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
gpr_timespec start_time;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}
num_waiters_++;
do {
wait_nonempty_.Wait(&mu_);
} while (count_.Load(MemoryOrder::RELAXED) == 0);
num_waiters_--;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
*wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
}
}
GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
return PopFront();
}

@ -42,7 +42,8 @@ class MPMCQueueInterface {
// Removes the oldest element from the queue and return it.
// This might cause to block on empty queue depending on implementation.
virtual void* Get() GRPC_ABSTRACT;
// Optional argument for collecting stats purpose.
virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT;
// Returns number of elements in the queue currently
virtual int count() const GRPC_ABSTRACT;
@ -65,12 +66,9 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// Removes the oldest element from the queue and returns it.
// This routine will cause the thread to block if queue is currently empty.
void* Get();
// Same as Get(), but will record how long waited when getting.
// This routine should be only called when debug trace is on and wants to
// collect stats data.
void* Get(gpr_timespec* wait_time);
// Argument wait_time should be passed in when trace flag turning on (for
// collecting stats info purpose.)
void* Get(gpr_timespec* wait_time = nullptr);
// Returns number of elements in queue currently.
// There might be concurrently add/remove on queue, so count might change

@ -29,21 +29,21 @@ void ThreadPoolWorker::Run() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
// Updates stats and print
gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN);
elem = static_cast<InfLenFIFOQueue*>(queue_)->Get(&wait_time);
stats_.sleep_cycles = gpr_time_add(stats_.sleep_cycles, wait_time);
elem = queue_->Get(&wait_time);
stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time);
gpr_log(GPR_INFO,
"ThreadPool Worker [%s %d] Stats: sleep_cycles %f",
thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_cycles));
"ThreadPool Worker [%s %d] Stats: sleep_time %f",
thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time));
} else {
elem = static_cast<InfLenFIFOQueue*>(queue_)->Get();
elem = queue_->Get(nullptr);
}
if (elem == nullptr) {
break;
}
// Runs closure
grpc_experimental_completion_queue_functor* closure =
auto* closure =
static_cast<grpc_experimental_completion_queue_functor*>(elem);
closure->functor_run(closure->internal_next, closure->internal_success);
closure->functor_run(closure, closure->internal_success);
}
}
@ -70,7 +70,8 @@ size_t ThreadPool::DefaultStackSize() {
}
bool ThreadPool::HasBeenShutDown() {
return shut_down_.Load(MemoryOrder::ACQUIRE);
// For debug checking purpose, using RELAXED order is sufficient.
return shut_down_.Load(MemoryOrder::RELAXED);
}
ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
@ -99,7 +100,8 @@ ThreadPool::ThreadPool(int num_threads, const char* thd_name,
}
ThreadPool::~ThreadPool() {
shut_down_.Store(true, MemoryOrder::RELEASE);
// For debug checking purpose, using RELAXED order is sufficient.
shut_down_.Store(true, MemoryOrder::RELAXED);
for (int i = 0; i < num_threads_; ++i) {
queue_->Put(nullptr);
@ -117,12 +119,9 @@ ThreadPool::~ThreadPool() {
}
void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) {
if (HasBeenShutDown()) {
gpr_log(GPR_ERROR, "ThreadPool Has Already Been Shut Down.");
} else {
GPR_DEBUG_ASSERT(!HasBeenShutDown());
queue_->Put(static_cast<void*>(closure));
}
}
int ThreadPool::num_pending_closures() const { return queue_->count(); }

@ -84,8 +84,8 @@ class ThreadPoolWorker {
private:
// struct for tracking stats of thread
struct Stats {
gpr_timespec sleep_cycles;
Stats() { sleep_cycles = gpr_time_0(GPR_TIMESPAN); }
gpr_timespec sleep_time;
Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
};
void Run(); // Pulls closures from queue and executes them
@ -145,6 +145,7 @@ class ThreadPool : public ThreadPoolInterface {
// For ThreadPool, default stack size for mobile platform is 1952K. for other
// platforms is 64K.
size_t DefaultStackSize();
// Internal Use Only for debug checking.
bool HasBeenShutDown();
};

@ -25,8 +25,6 @@
#define THREAD_SMALL_ITERATION 100
#define THREAD_LARGE_ITERATION 10000
grpc_core::Mutex mu;
// Simple functor for testing. It will count how many times being called.
class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
public:
@ -40,17 +38,15 @@ class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
static void Run(struct grpc_experimental_completion_queue_functor* cb,
int ok) {
auto* callback = static_cast<SimpleFunctorForAdd*>(cb);
grpc_core::MutexLock l(&mu);
callback->count_++;
callback->count_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
}
int count() {
grpc_core::MutexLock l(&mu);
return count_;
return count_.Load(grpc_core::MemoryOrder::RELAXED);
}
private:
int count_;
grpc_core::Atomic<int> count_{0};
};
// Checks the given SimpleFunctorForAdd's count with a given number.
@ -66,8 +62,9 @@ class SimpleFunctorCheckForAdd
~SimpleFunctorCheckForAdd() {}
static void Run(struct grpc_experimental_completion_queue_functor* cb,
int ok) {
auto* callback = static_cast<SimpleFunctorForAdd*>(cb);
GPR_ASSERT(callback->count_ == ok);
auto* callback = static_cast<SimpleFunctorCheckForAdd*>(cb);
auto* cb_check = static_cast<SimpleFunctorForAdd*>(callback->internal_next);
GPR_ASSERT(cb_check->count_.Load(grpc_core::MemoryOrder::RELAXED) == ok);
}
};

Loading…
Cancel
Save