From f50301fde8bbccb87bc1ae2c1a85bf48c268e9ba Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Fri, 19 Jul 2019 10:36:10 -0700 Subject: [PATCH 1/5] Add LIFO and Chunked List --- src/core/lib/iomgr/executor/mpmcqueue.cc | 120 ++++++++++++++++++----- src/core/lib/iomgr/executor/mpmcqueue.h | 68 ++++++++++--- test/core/iomgr/mpmcqueue_test.cc | 45 +++++++++ 3 files changed, 191 insertions(+), 42 deletions(-) diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 72c318e2bb1..7ef5cf40e14 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -26,18 +26,15 @@ DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool"); inline void* InfLenFIFOQueue::PopFront() { // Caller should already check queue is not empty and has already held the - // mutex. This function will only do the job of removal. + // mutex. This function will assume that there is at least one element in the + // queue (aka queue_head_ content is valid). void* result = queue_head_->content; - Node* head_to_remove = queue_head_; - queue_head_ = queue_head_->next; - count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED); + // Updates Stats when trace flag turned on. if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { - gpr_timespec wait_time = - gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), head_to_remove->insert_time); - - // Updates Stats info + gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), + queue_head_->insert_time); stats_.num_completed++; stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time); stats_.max_queue_time = gpr_time_max( @@ -58,44 +55,87 @@ inline void* InfLenFIFOQueue::PopFront() { gpr_timespec_to_micros(stats_.busy_queue_time)); } - Delete(head_to_remove); + queue_head_ = queue_head_->next; // Signal waiting thread - if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) { - wait_nonempty_.Signal(); + if (count_.Load(MemoryOrder::RELAXED) > 0) { + TopWaiter()->cv.Signal(); } return result; } +InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) { + Node* new_chunk = static_cast(gpr_zalloc(sizeof(Node) * num)); + new_chunk[0].next = &new_chunk[1]; + new_chunk[num - 1].prev = &new_chunk[num - 2]; + for (int i = 1; i < num - 1; ++i) { + new_chunk[i].prev = &new_chunk[i - 1]; + new_chunk[i].next = &new_chunk[i + 1]; + } + return new_chunk; +} + +InfLenFIFOQueue::InfLenFIFOQueue() { + delete_list_size_ = 1024; + delete_list_ = + static_cast(gpr_zalloc(sizeof(Node*) * delete_list_size_)); + + Node* new_chunk = AllocateNodes(1024); + delete_list_[delete_list_count_++] = new_chunk; + queue_head_ = queue_tail_ = new_chunk; + new_chunk[0].prev = &new_chunk[1023]; + new_chunk[1023].next = &new_chunk[0]; + + waiters_.next = &waiters_; + waiters_.prev = &waiters_; +} + InfLenFIFOQueue::~InfLenFIFOQueue() { GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0); - GPR_ASSERT(num_waiters_ == 0); + for (size_t i = 0; i < delete_list_count_; ++i) { + gpr_free(delete_list_[i]); + } + gpr_free(delete_list_); } void InfLenFIFOQueue::Put(void* elem) { MutexLock l(&mu_); - Node* new_node = New(elem); - if (count_.Load(MemoryOrder::RELAXED) == 0) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { - busy_time = gpr_now(GPR_CLOCK_MONOTONIC); + int curr_count = count_.Load(MemoryOrder::RELAXED); + + if (queue_tail_ == queue_head_ && curr_count!= 0) { + // List is full. Expands list to double size by inserting new chunk of nodes + Node* new_chunk = AllocateNodes(curr_count); + delete_list_[delete_list_count_++] = new_chunk; + // Expands delete list on full. + if (delete_list_count_ == delete_list_size_) { + delete_list_size_ = delete_list_size_ * 2; + delete_list_ = static_cast( + gpr_realloc(delete_list_, sizeof(Node*) * delete_list_size_)); } - queue_head_ = queue_tail_ = new_node; - } else { - queue_tail_->next = new_node; - queue_tail_ = queue_tail_->next; + new_chunk[0].prev = queue_tail_->prev; + new_chunk[curr_count - 1].next = queue_head_; + queue_tail_->prev->next = new_chunk; + queue_head_->prev = &new_chunk[curr_count - 1]; + queue_tail_ = new_chunk; } - count_.Store(count_.Load(MemoryOrder::RELAXED) + 1, MemoryOrder::RELAXED); + queue_tail_->content = static_cast(elem); + // Updates Stats info if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { stats_.num_started++; gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64, stats_.num_started); + if (curr_count == 0) { + busy_time = gpr_now(GPR_CLOCK_MONOTONIC); + } + queue_tail_->insert_time = gpr_now(GPR_CLOCK_MONOTONIC); } - if (num_waiters_ > 0) { - wait_nonempty_.Signal(); - } + count_.Store(curr_count + 1, MemoryOrder::RELAXED); + queue_tail_ = queue_tail_->next; + + TopWaiter()->cv.Signal(); } void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { @@ -108,11 +148,12 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { start_time = gpr_now(GPR_CLOCK_MONOTONIC); } - num_waiters_++; + Waiter self; + PushWaiter(&self); do { - wait_nonempty_.Wait(&mu_); + self.cv.Wait(&mu_); } while (count_.Load(MemoryOrder::RELAXED) == 0); - num_waiters_--; + RemoveWaiter(&self); if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && wait_time != nullptr) { *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time); @@ -122,4 +163,29 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { return PopFront(); } +size_t InfLenFIFOQueue::num_node() { + size_t num = 1024; + for (size_t i = 1; i < delete_list_count_; ++i) { + num = num* 2; + } + return num; +} + +void InfLenFIFOQueue::PushWaiter(Waiter* waiter) { + waiter->next = waiters_.next; + waiter->prev = &waiters_; + waiter->next->prev = waiter; + waiter->prev->next = waiter; +} + +void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) { + GPR_DEBUG_ASSERT(waiter != &waiters_); + waiter->next->prev = waiter->prev; + waiter->prev->next = waiter->next; +} + +InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { + return waiters_.next; +} + } // namespace grpc_core diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index c6102b3add0..05e5a0a7579 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -54,7 +54,7 @@ class MPMCQueueInterface { class InfLenFIFOQueue : public MPMCQueueInterface { public: // Creates a new MPMC Queue. The queue created will have infinite length. - InfLenFIFOQueue() {} + InfLenFIFOQueue(); // Releases all resources held by the queue. The queue must be empty, and no // one waits on conditional variables. @@ -66,8 +66,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // Removes the oldest element from the queue and returns it. // This routine will cause the thread to block if queue is currently empty. - // Argument wait_time should be passed in when turning on the trace flag - // grpc_thread_pool_trace (for collecting stats info purpose.) + // Argument wait_time should be passed in when trace flag turning on (for + // collecting stats info purpose.) void* Get(gpr_timespec* wait_time = nullptr); // Returns number of elements in queue currently. @@ -75,24 +75,30 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // quickly. int count() const { return count_.Load(MemoryOrder::RELAXED); } - private: - // For Internal Use Only. - // Removes the oldest element from the queue and returns it. This routine - // will NOT check whether queue is empty, and it will NOT acquire mutex. - // Caller should do the check and acquire mutex before callling. - void* PopFront(); - struct Node { Node* next; // Linking + Node* prev; void* content; // Points to actual element gpr_timespec insert_time; // Time for stats - Node(void* c) : content(c) { - next = nullptr; - insert_time = gpr_now(GPR_CLOCK_MONOTONIC); + Node() { + next = prev = nullptr; + content = nullptr; } }; + // For test purpose only. Returns number of nodes allocated in queue. + // All allocated nodes will not be free until destruction of queue. + size_t num_node(); + + private: + // For Internal Use Only. + // Removes the oldest element from the queue and returns it. This routine + // will NOT check whether queue is empty, and it will NOT acquire mutex. + // Caller MUST check that queue is not empty and must acquire mutex before + // callling. + void* PopFront(); + // Stats of queue. This will only be collect when debug trace mode is on. // All printed stats info will have time measurement in microsecond. struct Stats { @@ -115,15 +121,47 @@ class InfLenFIFOQueue : public MPMCQueueInterface { } }; + // Node for waiting thread queue. Stands for one waiting thread, should have + // exact one thread waiting on its CondVar. + // Using a doubly linked list for waiting thread queue to wake up waiting + // threads in LIFO order to reduce cache misses. + struct Waiter { + CondVar cv; + Waiter* next; + Waiter* prev; + }; + + // Pushs waiter to the front of queue, require caller held mutex + void PushWaiter(Waiter* waiter); + + // Removes waiter from queue, require caller held mutex + void RemoveWaiter(Waiter* waiter); + + // Returns pointer to the waiter that should be waken up next, should be the + // last added waiter. + Waiter* TopWaiter(); + Mutex mu_; // Protecting lock - CondVar wait_nonempty_; // Wait on empty queue on get - int num_waiters_ = 0; // Number of waiters + Waiter waiters_; // Head of waiting thread queue + + Node** delete_list_ = nullptr; // Keeps track of all allocated array entries + // for deleting on destruction + size_t delete_list_count_ = 0; // Number of entries in list + size_t delete_list_size_ = 0; // Size of the list. List will be expanded to + // double size on full Node* queue_head_ = nullptr; // Head of the queue, remove position Node* queue_tail_ = nullptr; // End of queue, insert position Atomic count_{0}; // Number of elements in queue + Stats stats_; // Stats info gpr_timespec busy_time; // Start time of busy queue + + // Internal Helper. + // Allocates an array of nodes of size "num", links all nodes together except + // the first node's prev and last node's next. They should be set by caller + // manually afterward. + Node* AllocateNodes(int num); }; } // namespace grpc_core diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc index a301832f608..68d1da65405 100644 --- a/test/core/iomgr/mpmcqueue_test.cc +++ b/test/core/iomgr/mpmcqueue_test.cc @@ -119,6 +119,50 @@ static void test_FIFO(void) { } } +static void test_space_efficiency(void) { + gpr_log(GPR_INFO, "test_space_efficiency"); + grpc_core::InfLenFIFOQueue queue; + for (int i = 0; i < 1024; ++i) { + queue.Put(static_cast(grpc_core::New(i))); + } + GPR_ASSERT(queue.num_node() == 1024); + for (int i = 0; i < 1024; ++i) { + WorkItem* item = static_cast(queue.Get()); + queue.Put(item); + } + GPR_ASSERT(queue.num_node() == 1024); + for (int i = 0; i < 1024; ++i) { + WorkItem* item = static_cast(queue.Get()); + grpc_core::Delete(item); + } + GPR_ASSERT(queue.num_node() == 1024); + GPR_ASSERT(queue.count() == 0); + // queue empty now + for (int i = 0; i < 4000; ++i) { + queue.Put(static_cast(grpc_core::New(i))); + } + GPR_ASSERT(queue.count() == 4000); + GPR_ASSERT(queue.num_node() == 4096); + for (int i = 0; i < 2000; ++i) { + WorkItem* item = static_cast(queue.Get()); + grpc_core::Delete(item); + } + GPR_ASSERT(queue.count() == 2000); + GPR_ASSERT(queue.num_node() == 4096); + for (int i = 0; i < 1000; ++i) { + queue.Put(static_cast(grpc_core::New(i))); + } + GPR_ASSERT(queue.count() == 3000); + GPR_ASSERT(queue.num_node() == 4096); + for (int i = 0; i < 3000; ++i) { + WorkItem* item = static_cast(queue.Get()); + grpc_core::Delete(item); + } + GPR_ASSERT(queue.count() == 0); + GPR_ASSERT(queue.num_node() == 4096); + gpr_log(GPR_DEBUG, "Done."); +} + static void test_many_thread(void) { gpr_log(GPR_INFO, "test_many_thread"); const int num_producer_threads = 10; @@ -172,6 +216,7 @@ int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); test_FIFO(); + test_space_efficiency(); test_many_thread(); grpc_shutdown(); return 0; From 3ec57262161aafe4a5d0d1c0a52400016c97ce74 Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Fri, 19 Jul 2019 10:57:28 -0700 Subject: [PATCH 2/5] Re-format --- src/core/lib/iomgr/executor/mpmcqueue.cc | 12 +++++------- src/core/lib/iomgr/executor/mpmcqueue.h | 10 +++++----- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 7ef5cf40e14..15b2938a3ac 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -33,8 +33,8 @@ inline void* InfLenFIFOQueue::PopFront() { // Updates Stats when trace flag turned on. if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { - gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), - queue_head_->insert_time); + gpr_timespec wait_time = + gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time); stats_.num_completed++; stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time); stats_.max_queue_time = gpr_time_max( @@ -103,7 +103,7 @@ void InfLenFIFOQueue::Put(void* elem) { int curr_count = count_.Load(MemoryOrder::RELAXED); - if (queue_tail_ == queue_head_ && curr_count!= 0) { + if (queue_tail_ == queue_head_ && curr_count != 0) { // List is full. Expands list to double size by inserting new chunk of nodes Node* new_chunk = AllocateNodes(curr_count); delete_list_[delete_list_count_++] = new_chunk; @@ -166,7 +166,7 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { size_t InfLenFIFOQueue::num_node() { size_t num = 1024; for (size_t i = 1; i < delete_list_count_; ++i) { - num = num* 2; + num = num * 2; } return num; } @@ -184,8 +184,6 @@ void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) { waiter->prev->next = waiter->next; } -InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { - return waiters_.next; -} +InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; } } // namespace grpc_core diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 05e5a0a7579..49bebf8af4b 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -76,7 +76,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface { int count() const { return count_.Load(MemoryOrder::RELAXED); } struct Node { - Node* next; // Linking + Node* next; // Linking Node* prev; void* content; // Points to actual element gpr_timespec insert_time; // Time for stats @@ -141,8 +141,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // last added waiter. Waiter* TopWaiter(); - Mutex mu_; // Protecting lock - Waiter waiters_; // Head of waiting thread queue + Mutex mu_; // Protecting lock + Waiter waiters_; // Head of waiting thread queue Node** delete_list_ = nullptr; // Keeps track of all allocated array entries // for deleting on destruction @@ -154,8 +154,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface { Node* queue_tail_ = nullptr; // End of queue, insert position Atomic count_{0}; // Number of elements in queue - Stats stats_; // Stats info - gpr_timespec busy_time; // Start time of busy queue + Stats stats_; // Stats info + gpr_timespec busy_time; // Start time of busy queue // Internal Helper. // Allocates an array of nodes of size "num", links all nodes together except From 0161de3a566d030891c3ffb245b6ba17e46c613e Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Tue, 23 Jul 2019 17:11:45 -0700 Subject: [PATCH 3/5] Addressing comments --- src/core/lib/iomgr/executor/mpmcqueue.cc | 20 +++++------- src/core/lib/iomgr/executor/mpmcqueue.h | 9 +++++- test/core/iomgr/mpmcqueue_test.cc | 40 ++++++++++++++---------- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 15b2938a3ac..377a135e1f3 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -27,7 +27,7 @@ DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool"); inline void* InfLenFIFOQueue::PopFront() { // Caller should already check queue is not empty and has already held the // mutex. This function will assume that there is at least one element in the - // queue (aka queue_head_ content is valid). + // queue (i.e. queue_head_->content is valid). void* result = queue_head_->content; count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED); @@ -65,6 +65,7 @@ inline void* InfLenFIFOQueue::PopFront() { } InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) { + num_nodes_ = num_nodes_ + num; Node* new_chunk = static_cast(gpr_zalloc(sizeof(Node) * num)); new_chunk[0].next = &new_chunk[1]; new_chunk[num - 1].prev = &new_chunk[num - 2]; @@ -76,11 +77,11 @@ InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) { } InfLenFIFOQueue::InfLenFIFOQueue() { - delete_list_size_ = 1024; + delete_list_size_ = kDeleteListInitSize; delete_list_ = static_cast(gpr_zalloc(sizeof(Node*) * delete_list_size_)); - Node* new_chunk = AllocateNodes(1024); + Node* new_chunk = AllocateNodes(kDeleteListInitSize); delete_list_[delete_list_count_++] = new_chunk; queue_head_ = queue_tail_ = new_chunk; new_chunk[0].prev = &new_chunk[1023]; @@ -126,10 +127,11 @@ void InfLenFIFOQueue::Put(void* elem) { stats_.num_started++; gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64, stats_.num_started); + auto current_time = gpr_now(GPR_CLOCK_MONOTONIC); if (curr_count == 0) { - busy_time = gpr_now(GPR_CLOCK_MONOTONIC); + busy_time = current_time; } - queue_tail_->insert_time = gpr_now(GPR_CLOCK_MONOTONIC); + queue_tail_->insert_time = current_time; } count_.Store(curr_count + 1, MemoryOrder::RELAXED); @@ -163,14 +165,6 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { return PopFront(); } -size_t InfLenFIFOQueue::num_node() { - size_t num = 1024; - for (size_t i = 1; i < delete_list_count_; ++i) { - num = num * 2; - } - return num; -} - void InfLenFIFOQueue::PushWaiter(Waiter* waiter) { waiter->next = waiters_.next; waiter->prev = &waiters_; diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 49bebf8af4b..41e7e2a11be 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -89,7 +89,10 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // For test purpose only. Returns number of nodes allocated in queue. // All allocated nodes will not be free until destruction of queue. - size_t num_node(); + int num_nodes() const { return num_nodes_; } + + // For test purpose only. Returns the initial number of nodes in queue. + int init_num_nodes() const { return kQueueInitNumNodes; } private: // For Internal Use Only. @@ -144,6 +147,9 @@ class InfLenFIFOQueue : public MPMCQueueInterface { Mutex mu_; // Protecting lock Waiter waiters_; // Head of waiting thread queue + const int kDeleteListInitSize = 1024; // Initial size for delete list + const int kQueueInitNumNodes = 1024; // Initial number of nodes allocated + Node** delete_list_ = nullptr; // Keeps track of all allocated array entries // for deleting on destruction size_t delete_list_count_ = 0; // Number of entries in list @@ -153,6 +159,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface { Node* queue_head_ = nullptr; // Head of the queue, remove position Node* queue_tail_ = nullptr; // End of queue, insert position Atomic count_{0}; // Number of elements in queue + int num_nodes_ = 0; // Number of nodes allocated Stats stats_; // Stats info gpr_timespec busy_time; // Start time of busy queue diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc index 68d1da65405..732c43a06f5 100644 --- a/test/core/iomgr/mpmcqueue_test.cc +++ b/test/core/iomgr/mpmcqueue_test.cc @@ -119,47 +119,53 @@ static void test_FIFO(void) { } } +// Test if queue's behavior of expanding is correct. (Only does expansion when +// it gets full, and each time expands to doubled size). static void test_space_efficiency(void) { gpr_log(GPR_INFO, "test_space_efficiency"); grpc_core::InfLenFIFOQueue queue; - for (int i = 0; i < 1024; ++i) { + for (int i = 0; i < queue.init_num_nodes(); ++i) { queue.Put(static_cast(grpc_core::New(i))); } - GPR_ASSERT(queue.num_node() == 1024); - for (int i = 0; i < 1024; ++i) { + // List should not have been expanded at this time. + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); + for (int i = 0; i < queue.init_num_nodes(); ++i) { WorkItem* item = static_cast(queue.Get()); queue.Put(item); } - GPR_ASSERT(queue.num_node() == 1024); - for (int i = 0; i < 1024; ++i) { + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); + for (int i = 0; i < queue.init_num_nodes(); ++i) { WorkItem* item = static_cast(queue.Get()); grpc_core::Delete(item); } - GPR_ASSERT(queue.num_node() == 1024); + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); GPR_ASSERT(queue.count() == 0); // queue empty now - for (int i = 0; i < 4000; ++i) { + for (int i = 0; i < queue.init_num_nodes() * 2; ++i) { queue.Put(static_cast(grpc_core::New(i))); } - GPR_ASSERT(queue.count() == 4000); - GPR_ASSERT(queue.num_node() == 4096); - for (int i = 0; i < 2000; ++i) { + GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2); + // List should have been expanded once. + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2); + for (int i = 0; i < queue.init_num_nodes(); ++i) { WorkItem* item = static_cast(queue.Get()); grpc_core::Delete(item); } - GPR_ASSERT(queue.count() == 2000); - GPR_ASSERT(queue.num_node() == 4096); - for (int i = 0; i < 1000; ++i) { + GPR_ASSERT(queue.count() == queue.init_num_nodes()); + // List will never shrink, should keep same number of node as before. + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2); + for (int i = 0; i < queue.init_num_nodes() + 1; ++i) { queue.Put(static_cast(grpc_core::New(i))); } - GPR_ASSERT(queue.count() == 3000); - GPR_ASSERT(queue.num_node() == 4096); - for (int i = 0; i < 3000; ++i) { + GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1); + // List should have been expanded twice. + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4); + for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) { WorkItem* item = static_cast(queue.Get()); grpc_core::Delete(item); } GPR_ASSERT(queue.count() == 0); - GPR_ASSERT(queue.num_node() == 4096); + GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4); gpr_log(GPR_DEBUG, "Done."); } From f78f3e4d6c712b92ff214b207c5221f3283a86e6 Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Wed, 24 Jul 2019 10:36:51 -0700 Subject: [PATCH 4/5] Re-format --- src/core/lib/iomgr/executor/mpmcqueue.cc | 2 +- src/core/lib/iomgr/executor/mpmcqueue.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 377a135e1f3..62226450a18 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -65,7 +65,7 @@ inline void* InfLenFIFOQueue::PopFront() { } InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) { - num_nodes_ = num_nodes_ + num; + num_nodes_ = num_nodes_ + num; Node* new_chunk = static_cast(gpr_zalloc(sizeof(Node) * num)); new_chunk[0].next = &new_chunk[1]; new_chunk[num - 1].prev = &new_chunk[num - 2]; diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 41e7e2a11be..0d44b7231d3 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -159,7 +159,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface { Node* queue_head_ = nullptr; // Head of the queue, remove position Node* queue_tail_ = nullptr; // End of queue, insert position Atomic count_{0}; // Number of elements in queue - int num_nodes_ = 0; // Number of nodes allocated + int num_nodes_ = 0; // Number of nodes allocated Stats stats_; // Stats info gpr_timespec busy_time; // Start time of busy queue From 7c1081964180add6101925ee7fb7b7dea01f3636 Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Thu, 25 Jul 2019 15:31:26 -0700 Subject: [PATCH 5/5] Fix comments --- src/core/lib/iomgr/executor/mpmcqueue.cc | 6 +++--- src/core/lib/iomgr/executor/mpmcqueue.h | 8 +++++--- test/core/iomgr/mpmcqueue_test.cc | 9 +++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 62226450a18..74096a4c5b0 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -81,11 +81,11 @@ InfLenFIFOQueue::InfLenFIFOQueue() { delete_list_ = static_cast(gpr_zalloc(sizeof(Node*) * delete_list_size_)); - Node* new_chunk = AllocateNodes(kDeleteListInitSize); + Node* new_chunk = AllocateNodes(kQueueInitNumNodes); delete_list_[delete_list_count_++] = new_chunk; queue_head_ = queue_tail_ = new_chunk; - new_chunk[0].prev = &new_chunk[1023]; - new_chunk[1023].next = &new_chunk[0]; + new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1]; + new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0]; waiters_.next = &waiters_; waiters_.prev = &waiters_; diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 0d44b7231d3..ab5c484e094 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -88,7 +88,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface { }; // For test purpose only. Returns number of nodes allocated in queue. - // All allocated nodes will not be free until destruction of queue. + // Any allocated node will be alive until the destruction of the queue. int num_nodes() const { return num_nodes_; } // For test purpose only. Returns the initial number of nodes in queue. @@ -147,8 +147,10 @@ class InfLenFIFOQueue : public MPMCQueueInterface { Mutex mu_; // Protecting lock Waiter waiters_; // Head of waiting thread queue - const int kDeleteListInitSize = 1024; // Initial size for delete list - const int kQueueInitNumNodes = 1024; // Initial number of nodes allocated + // Initial size for delete list + static const int kDeleteListInitSize = 1024; + // Initial number of nodes allocated + static const int kQueueInitNumNodes = 1024; Node** delete_list_ = nullptr; // Keeps track of all allocated array entries // for deleting on destruction diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc index 732c43a06f5..9ebc1cc7363 100644 --- a/test/core/iomgr/mpmcqueue_test.cc +++ b/test/core/iomgr/mpmcqueue_test.cc @@ -127,7 +127,7 @@ static void test_space_efficiency(void) { for (int i = 0; i < queue.init_num_nodes(); ++i) { queue.Put(static_cast(grpc_core::New(i))); } - // List should not have been expanded at this time. + // Queue should not have been expanded at this time. GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); for (int i = 0; i < queue.init_num_nodes(); ++i) { WorkItem* item = static_cast(queue.Get()); @@ -138,6 +138,7 @@ static void test_space_efficiency(void) { WorkItem* item = static_cast(queue.Get()); grpc_core::Delete(item); } + // Queue never shrinks even it is empty. GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); GPR_ASSERT(queue.count() == 0); // queue empty now @@ -145,20 +146,20 @@ static void test_space_efficiency(void) { queue.Put(static_cast(grpc_core::New(i))); } GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2); - // List should have been expanded once. + // Queue should have been expanded once. GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2); for (int i = 0; i < queue.init_num_nodes(); ++i) { WorkItem* item = static_cast(queue.Get()); grpc_core::Delete(item); } GPR_ASSERT(queue.count() == queue.init_num_nodes()); - // List will never shrink, should keep same number of node as before. + // Queue will never shrink, should keep same number of node as before. GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2); for (int i = 0; i < queue.init_num_nodes() + 1; ++i) { queue.Put(static_cast(grpc_core::New(i))); } GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1); - // List should have been expanded twice. + // Queue should have been expanded twice. GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4); for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) { WorkItem* item = static_cast(queue.Get());