From 53b75d8c921cc80431802fe45f74699fa72c4829 Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Tue, 9 Jul 2019 11:54:41 -0700 Subject: [PATCH] Change Get() signature, modify shut_down assertion & memoryorder --- src/core/lib/iomgr/executor/mpmcqueue.cc | 23 +++++++------------ src/core/lib/iomgr/executor/mpmcqueue.h | 12 +++++----- src/core/lib/iomgr/executor/threadpool.cc | 27 +++++++++++------------ src/core/lib/iomgr/executor/threadpool.h | 5 +++-- test/core/iomgr/threadpool_test.cc | 15 +++++-------- 5 files changed, 35 insertions(+), 47 deletions(-) diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 581045245c4..7f916258b82 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -98,32 +98,25 @@ void InfLenFIFOQueue::Put(void* elem) { } } -void* InfLenFIFOQueue::Get() { - MutexLock l(&mu_); - if (count_.Load(MemoryOrder::RELAXED) == 0) { - num_waiters_++; - do { - wait_nonempty_.Wait(&mu_); - } while (count_.Load(MemoryOrder::RELAXED) == 0); - num_waiters_--; - } - GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0); - return PopFront(); -} - void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { MutexLock l(&mu_); if (count_.Load(MemoryOrder::RELAXED) == 0) { gpr_timespec start_time; - start_time = gpr_now(GPR_CLOCK_MONOTONIC); + if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && + wait_time != nullptr) { + start_time = gpr_now(GPR_CLOCK_MONOTONIC); + } num_waiters_++; do { wait_nonempty_.Wait(&mu_); } while (count_.Load(MemoryOrder::RELAXED) == 0); num_waiters_--; - *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time); + if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && + wait_time != nullptr) { + *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time); + } } GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0); return PopFront(); diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index fdb3de00e73..5c62221e9d4 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -42,7 +42,8 @@ class MPMCQueueInterface { // Removes the oldest element from the queue and return it. // This might cause to block on empty queue depending on implementation. - virtual void* Get() GRPC_ABSTRACT; + // Optional argument for collecting stats purpose. + virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT; // Returns number of elements in the queue currently virtual int count() const GRPC_ABSTRACT; @@ -65,12 +66,9 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // Removes the oldest element from the queue and returns it. // This routine will cause the thread to block if queue is currently empty. - void* Get(); - - // Same as Get(), but will record how long waited when getting. - // This routine should be only called when debug trace is on and wants to - // collect stats data. - void* Get(gpr_timespec* wait_time); + // Argument wait_time should be passed in when trace flag turning on (for + // collecting stats info purpose.) + void* Get(gpr_timespec* wait_time = nullptr); // Returns number of elements in queue currently. // There might be concurrently add/remove on queue, so count might change diff --git a/src/core/lib/iomgr/executor/threadpool.cc b/src/core/lib/iomgr/executor/threadpool.cc index deeadc0f758..166bf0a34a0 100644 --- a/src/core/lib/iomgr/executor/threadpool.cc +++ b/src/core/lib/iomgr/executor/threadpool.cc @@ -29,21 +29,21 @@ void ThreadPoolWorker::Run() { if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { // Updates stats and print gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN); - elem = static_cast(queue_)->Get(&wait_time); - stats_.sleep_cycles = gpr_time_add(stats_.sleep_cycles, wait_time); + elem = queue_->Get(&wait_time); + stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time); gpr_log(GPR_INFO, - "ThreadPool Worker [%s %d] Stats: sleep_cycles %f", - thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_cycles)); + "ThreadPool Worker [%s %d] Stats: sleep_time %f", + thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time)); } else { - elem = static_cast(queue_)->Get(); + elem = queue_->Get(nullptr); } if (elem == nullptr) { break; } // Runs closure - grpc_experimental_completion_queue_functor* closure = + auto* closure = static_cast(elem); - closure->functor_run(closure->internal_next, closure->internal_success); + closure->functor_run(closure, closure->internal_success); } } @@ -70,7 +70,8 @@ size_t ThreadPool::DefaultStackSize() { } bool ThreadPool::HasBeenShutDown() { - return shut_down_.Load(MemoryOrder::ACQUIRE); + // For debug checking purpose, using RELAXED order is sufficient. + return shut_down_.Load(MemoryOrder::RELAXED); } ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) { @@ -99,7 +100,8 @@ ThreadPool::ThreadPool(int num_threads, const char* thd_name, } ThreadPool::~ThreadPool() { - shut_down_.Store(true, MemoryOrder::RELEASE); + // For debug checking purpose, using RELAXED order is sufficient. + shut_down_.Store(true, MemoryOrder::RELAXED); for (int i = 0; i < num_threads_; ++i) { queue_->Put(nullptr); @@ -117,11 +119,8 @@ ThreadPool::~ThreadPool() { } void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) { - if (HasBeenShutDown()) { - gpr_log(GPR_ERROR, "ThreadPool Has Already Been Shut Down."); - } else { - queue_->Put(static_cast(closure)); - } + GPR_DEBUG_ASSERT(!HasBeenShutDown()); + queue_->Put(static_cast(closure)); } int ThreadPool::num_pending_closures() const { return queue_->count(); } diff --git a/src/core/lib/iomgr/executor/threadpool.h b/src/core/lib/iomgr/executor/threadpool.h index f0c54104ef1..d68cbbf7328 100644 --- a/src/core/lib/iomgr/executor/threadpool.h +++ b/src/core/lib/iomgr/executor/threadpool.h @@ -84,8 +84,8 @@ class ThreadPoolWorker { private: // struct for tracking stats of thread struct Stats { - gpr_timespec sleep_cycles; - Stats() { sleep_cycles = gpr_time_0(GPR_TIMESPAN); } + gpr_timespec sleep_time; + Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); } }; void Run(); // Pulls closures from queue and executes them @@ -145,6 +145,7 @@ class ThreadPool : public ThreadPoolInterface { // For ThreadPool, default stack size for mobile platform is 1952K. for other // platforms is 64K. size_t DefaultStackSize(); + // Internal Use Only for debug checking. bool HasBeenShutDown(); }; diff --git a/test/core/iomgr/threadpool_test.cc b/test/core/iomgr/threadpool_test.cc index a56db5b647a..c093c421b7b 100644 --- a/test/core/iomgr/threadpool_test.cc +++ b/test/core/iomgr/threadpool_test.cc @@ -25,8 +25,6 @@ #define THREAD_SMALL_ITERATION 100 #define THREAD_LARGE_ITERATION 10000 -grpc_core::Mutex mu; - // Simple functor for testing. It will count how many times being called. class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor { public: @@ -40,17 +38,15 @@ class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor { static void Run(struct grpc_experimental_completion_queue_functor* cb, int ok) { auto* callback = static_cast(cb); - grpc_core::MutexLock l(&mu); - callback->count_++; + callback->count_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); } int count() { - grpc_core::MutexLock l(&mu); - return count_; + return count_.Load(grpc_core::MemoryOrder::RELAXED); } private: - int count_; + grpc_core::Atomic count_{0}; }; // Checks the given SimpleFunctorForAdd's count with a given number. @@ -66,8 +62,9 @@ class SimpleFunctorCheckForAdd ~SimpleFunctorCheckForAdd() {} static void Run(struct grpc_experimental_completion_queue_functor* cb, int ok) { - auto* callback = static_cast(cb); - GPR_ASSERT(callback->count_ == ok); + auto* callback = static_cast(cb); + auto* cb_check = static_cast(callback->internal_next); + GPR_ASSERT(cb_check->count_.Load(grpc_core::MemoryOrder::RELAXED) == ok); } };