pull/19696/head
Yunjia Wang 5 years ago
parent f50301fde8
commit 3ec5726216
  1. 12
      src/core/lib/iomgr/executor/mpmcqueue.cc
  2. 10
      src/core/lib/iomgr/executor/mpmcqueue.h

@ -33,8 +33,8 @@ inline void* InfLenFIFOQueue::PopFront() {
// Updates Stats when trace flag turned on. // Updates Stats when trace flag turned on.
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), gpr_timespec wait_time =
queue_head_->insert_time); gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time);
stats_.num_completed++; stats_.num_completed++;
stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time); stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time);
stats_.max_queue_time = gpr_time_max( stats_.max_queue_time = gpr_time_max(
@ -103,7 +103,7 @@ void InfLenFIFOQueue::Put(void* elem) {
int curr_count = count_.Load(MemoryOrder::RELAXED); int curr_count = count_.Load(MemoryOrder::RELAXED);
if (queue_tail_ == queue_head_ && curr_count!= 0) { if (queue_tail_ == queue_head_ && curr_count != 0) {
// List is full. Expands list to double size by inserting new chunk of nodes // List is full. Expands list to double size by inserting new chunk of nodes
Node* new_chunk = AllocateNodes(curr_count); Node* new_chunk = AllocateNodes(curr_count);
delete_list_[delete_list_count_++] = new_chunk; delete_list_[delete_list_count_++] = new_chunk;
@ -166,7 +166,7 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
size_t InfLenFIFOQueue::num_node() { size_t InfLenFIFOQueue::num_node() {
size_t num = 1024; size_t num = 1024;
for (size_t i = 1; i < delete_list_count_; ++i) { for (size_t i = 1; i < delete_list_count_; ++i) {
num = num* 2; num = num * 2;
} }
return num; return num;
} }
@ -184,8 +184,6 @@ void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) {
waiter->prev->next = waiter->next; waiter->prev->next = waiter->next;
} }
InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; }
return waiters_.next;
}
} // namespace grpc_core } // namespace grpc_core

@ -76,7 +76,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
int count() const { return count_.Load(MemoryOrder::RELAXED); } int count() const { return count_.Load(MemoryOrder::RELAXED); }
struct Node { struct Node {
Node* next; // Linking Node* next; // Linking
Node* prev; Node* prev;
void* content; // Points to actual element void* content; // Points to actual element
gpr_timespec insert_time; // Time for stats gpr_timespec insert_time; // Time for stats
@ -141,8 +141,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// last added waiter. // last added waiter.
Waiter* TopWaiter(); Waiter* TopWaiter();
Mutex mu_; // Protecting lock Mutex mu_; // Protecting lock
Waiter waiters_; // Head of waiting thread queue Waiter waiters_; // Head of waiting thread queue
Node** delete_list_ = nullptr; // Keeps track of all allocated array entries Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
// for deleting on destruction // for deleting on destruction
@ -154,8 +154,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
Node* queue_tail_ = nullptr; // End of queue, insert position Node* queue_tail_ = nullptr; // End of queue, insert position
Atomic<int> count_{0}; // Number of elements in queue Atomic<int> count_{0}; // Number of elements in queue
Stats stats_; // Stats info Stats stats_; // Stats info
gpr_timespec busy_time; // Start time of busy queue gpr_timespec busy_time; // Start time of busy queue
// Internal Helper. // Internal Helper.
// Allocates an array of nodes of size "num", links all nodes together except // Allocates an array of nodes of size "num", links all nodes together except

Loading…
Cancel
Save