Modify format printing and Debug trace.

pull/19358/head
Yunjia Wang 6 years ago
parent e1146bd27d
commit 662aa1f153
  1. 44
      src/core/lib/iomgr/threadpool/mpmcqueue.cc
  2. 49
      src/core/lib/iomgr/threadpool/mpmcqueue.h

@ -16,21 +16,24 @@
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/threadpool/mpmcqueue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <inttypes.h>
#include <string.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
DebugOnlyTraceFlag thread_pool_trace(false, "thread_pool_trace");
inline void* MPMCQueue::PopFront() {
void* result = queue_head_->content;
Node* head_to_remove = queue_head_;
@ -55,6 +58,10 @@ inline void* MPMCQueue::PopFront() {
gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time));
}
if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) {
PrintStats();
}
// Singal waiting thread
if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) {
wait_nonempty_.Signal();
@ -70,10 +77,11 @@ MPMCQueue::MPMCQueue()
MPMCQueue::~MPMCQueue() {
GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
ReleasableMutexLock l(&mu_);
MutexLock l(&mu_);
GPR_ASSERT(num_waiters_ == 0);
l.Unlock();
PrintStats();
if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) {
PrintStats();
}
}
void MPMCQueue::Put(void* elem) {
@ -91,6 +99,9 @@ void MPMCQueue::Put(void* elem) {
// Update Stats info
stats_.num_started++;
if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) {
PrintStats();
}
if (num_waiters_ > 0) {
wait_nonempty_.Signal();
@ -111,28 +122,15 @@ void* MPMCQueue::Get() {
}
void MPMCQueue::PrintStats() {
MutexLock l(&mu_);
gpr_log(GPR_INFO, "STATS INFO:");
gpr_log(GPR_INFO, "num_started: %lu", stats_.num_started);
gpr_log(GPR_INFO, "num_completed: %lu", stats_.num_completed);
gpr_log(GPR_INFO, "total_queue_cycles: %d",
gpr_log(GPR_INFO, "num_started: %" PRIu64, stats_.num_started);
gpr_log(GPR_INFO, "num_completed: %" PRIu64, stats_.num_completed);
gpr_log(GPR_INFO, "total_queue_cycles: %" PRId32,
gpr_time_to_millis(stats_.total_queue_cycles));
gpr_log(GPR_INFO, "max_queue_cycles: %d",
gpr_log(GPR_INFO, "max_queue_cycles: %" PRId32,
gpr_time_to_millis(stats_.max_queue_cycles));
gpr_log(GPR_INFO, "busy_time_cycles: %d",
gpr_log(GPR_INFO, "busy_time_cycles: %" PRId32,
gpr_time_to_millis(stats_.busy_time_cycles));
}
MPMCQueue::Stats* MPMCQueue::queue_stats() {
MPMCQueue::Stats* result = new Stats();
MutexLock l(&mu_);
result->total_queue_cycles =
gpr_time_add(result->total_queue_cycles, stats_.total_queue_cycles);
result->max_queue_cycles =
gpr_time_add(result->max_queue_cycles, stats_.max_queue_cycles);
result->busy_time_cycles =
gpr_time_add(result->busy_time_cycles, stats_.busy_time_cycles);
return result;
}
} // namespace grpc_core

@ -48,25 +48,6 @@ class MPMCQueueInterface {
class MPMCQueue : public MPMCQueueInterface {
public:
struct Stats { // Stats of queue
uint64_t num_started; // Number of elements have been added to queue
uint64_t num_completed; // Number of elements have been removed from
// the queue
gpr_timespec total_queue_cycles; // Total waiting time that all the
// removed elements have spent in queue
gpr_timespec max_queue_cycles; // Max waiting time among all removed
// elements
gpr_timespec busy_time_cycles; // Accumulated amount of time that queue
// was not empty
Stats() {
num_started = 0;
num_completed = 0;
total_queue_cycles = gpr_time_0(GPR_TIMESPAN);
max_queue_cycles = gpr_time_0(GPR_TIMESPAN);
busy_time_cycles = gpr_time_0(GPR_TIMESPAN);
}
};
// Create a new Multiple-Producer-Multiple-Consumer Queue. The queue created
// will have infinite length.
explicit MPMCQueue();
@ -88,16 +69,12 @@ class MPMCQueue : public MPMCQueueInterface {
// quickly.
int count() const { return count_.Load(MemoryOrder::RELAXED); }
// Print out Stats. Time measurement are printed in millisecond.
void PrintStats();
// Return a copy of current stats info. This info will be changed quickly
// when queue is still running. This copy will not deleted by queue.
Stats* queue_stats();
private:
void* PopFront();
// Print out Stats. Time measurement are printed in millisecond.
void PrintStats();
struct Node {
Node* next; // Linking
void* content; // Points to actual element
@ -109,6 +86,26 @@ class MPMCQueue : public MPMCQueueInterface {
}
};
struct Stats { // Stats of queue
uint64_t num_started; // Number of elements have been added to queue
uint64_t num_completed; // Number of elements have been removed from
// the queue
gpr_timespec total_queue_cycles; // Total waiting time that all the
// removed elements have spent in queue
gpr_timespec max_queue_cycles; // Max waiting time among all removed
// elements
gpr_timespec busy_time_cycles; // Accumulated amount of time that queue
// was not empty
Stats() {
num_started = 0;
num_completed = 0;
total_queue_cycles = gpr_time_0(GPR_TIMESPAN);
max_queue_cycles = gpr_time_0(GPR_TIMESPAN);
busy_time_cycles = gpr_time_0(GPR_TIMESPAN);
}
};
Mutex mu_; // Protecting lock
CondVar wait_nonempty_; // Wait on empty queue on get
int num_waiters_; // Number of waiters

Loading…
Cancel
Save