diff --git a/src/core/lib/iomgr/threadpool/mpmcqueue.cc b/src/core/lib/iomgr/threadpool/mpmcqueue.cc index b2e09b9c81b..64f7d19a2a7 100644 --- a/src/core/lib/iomgr/threadpool/mpmcqueue.cc +++ b/src/core/lib/iomgr/threadpool/mpmcqueue.cc @@ -16,21 +16,24 @@ * */ -#include - #include "src/core/lib/iomgr/threadpool/mpmcqueue.h" #include #include #include +#include #include #include +#include #include +#include "src/core/lib/debug/stats.h" #include "src/core/lib/gprpp/sync.h" namespace grpc_core { +DebugOnlyTraceFlag thread_pool_trace(false, "thread_pool_trace"); + inline void* MPMCQueue::PopFront() { void* result = queue_head_->content; Node* head_to_remove = queue_head_; @@ -55,6 +58,10 @@ inline void* MPMCQueue::PopFront() { gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time)); } + if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) { + PrintStats(); + } + // Singal waiting thread if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) { wait_nonempty_.Signal(); @@ -70,10 +77,11 @@ MPMCQueue::MPMCQueue() MPMCQueue::~MPMCQueue() { GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0); - ReleasableMutexLock l(&mu_); + MutexLock l(&mu_); GPR_ASSERT(num_waiters_ == 0); - l.Unlock(); - PrintStats(); + if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) { + PrintStats(); + } } void MPMCQueue::Put(void* elem) { @@ -91,6 +99,9 @@ void MPMCQueue::Put(void* elem) { // Update Stats info stats_.num_started++; + if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) { + PrintStats(); + } if (num_waiters_ > 0) { wait_nonempty_.Signal(); @@ -111,28 +122,15 @@ void* MPMCQueue::Get() { } void MPMCQueue::PrintStats() { - MutexLock l(&mu_); gpr_log(GPR_INFO, "STATS INFO:"); - gpr_log(GPR_INFO, "num_started: %lu", stats_.num_started); - gpr_log(GPR_INFO, "num_completed: %lu", stats_.num_completed); - gpr_log(GPR_INFO, "total_queue_cycles: %d", + gpr_log(GPR_INFO, "num_started: %" PRIu64, stats_.num_started); + gpr_log(GPR_INFO, "num_completed: %" PRIu64, stats_.num_completed); + gpr_log(GPR_INFO, "total_queue_cycles: %" PRId32, gpr_time_to_millis(stats_.total_queue_cycles)); - gpr_log(GPR_INFO, "max_queue_cycles: %d", + gpr_log(GPR_INFO, "max_queue_cycles: %" PRId32, gpr_time_to_millis(stats_.max_queue_cycles)); - gpr_log(GPR_INFO, "busy_time_cycles: %d", + gpr_log(GPR_INFO, "busy_time_cycles: %" PRId32, gpr_time_to_millis(stats_.busy_time_cycles)); } -MPMCQueue::Stats* MPMCQueue::queue_stats() { - MPMCQueue::Stats* result = new Stats(); - MutexLock l(&mu_); - result->total_queue_cycles = - gpr_time_add(result->total_queue_cycles, stats_.total_queue_cycles); - result->max_queue_cycles = - gpr_time_add(result->max_queue_cycles, stats_.max_queue_cycles); - result->busy_time_cycles = - gpr_time_add(result->busy_time_cycles, stats_.busy_time_cycles); - return result; -} - } // namespace grpc_core diff --git a/src/core/lib/iomgr/threadpool/mpmcqueue.h b/src/core/lib/iomgr/threadpool/mpmcqueue.h index 8c8395e1d9d..b39c9d81f4f 100644 --- a/src/core/lib/iomgr/threadpool/mpmcqueue.h +++ b/src/core/lib/iomgr/threadpool/mpmcqueue.h @@ -48,25 +48,6 @@ class MPMCQueueInterface { class MPMCQueue : public MPMCQueueInterface { public: - struct Stats { // Stats of queue - uint64_t num_started; // Number of elements have been added to queue - uint64_t num_completed; // Number of elements have been removed from - // the queue - gpr_timespec total_queue_cycles; // Total waiting time that all the - // removed elements have spent in queue - gpr_timespec max_queue_cycles; // Max waiting time among all removed - // elements - gpr_timespec busy_time_cycles; // Accumulated amount of time that queue - // was not empty - - Stats() { - num_started = 0; - num_completed = 0; - total_queue_cycles = gpr_time_0(GPR_TIMESPAN); - max_queue_cycles = gpr_time_0(GPR_TIMESPAN); - busy_time_cycles = gpr_time_0(GPR_TIMESPAN); - } - }; // Create a new Multiple-Producer-Multiple-Consumer Queue. The queue created // will have infinite length. explicit MPMCQueue(); @@ -88,16 +69,12 @@ class MPMCQueue : public MPMCQueueInterface { // quickly. int count() const { return count_.Load(MemoryOrder::RELAXED); } - // Print out Stats. Time measurement are printed in millisecond. - void PrintStats(); - - // Return a copy of current stats info. This info will be changed quickly - // when queue is still running. This copy will not deleted by queue. - Stats* queue_stats(); - private: void* PopFront(); + // Print out Stats. Time measurement are printed in millisecond. + void PrintStats(); + struct Node { Node* next; // Linking void* content; // Points to actual element @@ -109,6 +86,26 @@ class MPMCQueue : public MPMCQueueInterface { } }; + struct Stats { // Stats of queue + uint64_t num_started; // Number of elements have been added to queue + uint64_t num_completed; // Number of elements have been removed from + // the queue + gpr_timespec total_queue_cycles; // Total waiting time that all the + // removed elements have spent in queue + gpr_timespec max_queue_cycles; // Max waiting time among all removed + // elements + gpr_timespec busy_time_cycles; // Accumulated amount of time that queue + // was not empty + + Stats() { + num_started = 0; + num_completed = 0; + total_queue_cycles = gpr_time_0(GPR_TIMESPAN); + max_queue_cycles = gpr_time_0(GPR_TIMESPAN); + busy_time_cycles = gpr_time_0(GPR_TIMESPAN); + } + }; + Mutex mu_; // Protecting lock CondVar wait_nonempty_; // Wait on empty queue on get int num_waiters_; // Number of waiters