From 6fc7d2b18f51a2a9d1069d2e1c168d76b810fd44 Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Mon, 17 Jun 2019 14:47:28 -0700 Subject: [PATCH] fix undefined reference to operator delete for MPMCQueue interface class --- build.yaml | 20 +++--- src/core/lib/iomgr/threadpool/mpmcqueue.cc | 44 ++++++------ src/core/lib/iomgr/threadpool/mpmcqueue.h | 78 ++++++++-------------- test/core/iomgr/mpmcqueue_test.cc | 43 +++--------- 4 files changed, 67 insertions(+), 118 deletions(-) diff --git a/build.yaml b/build.yaml index 91d3cfe8c75..a02d7f01fcb 100644 --- a/build.yaml +++ b/build.yaml @@ -3268,15 +3268,6 @@ targets: - grpc - gpr uses_polling: false -- name: mpmcqueue_test - build: test - language: c - src: - - test/core/iomgr/mpmcqueue_test.cc - deps: - - grpc_test_util - - grpc - - gpr - name: multiple_server_queues_test build: test language: c @@ -5239,6 +5230,17 @@ targets: - grpc++ - grpc - gpr +- name: mpmcqueue_test + build: test + language: c++ + src: + - test/core/iomgr/mpmcqueue_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr - name: nonblocking_test gtest: true build: test diff --git a/src/core/lib/iomgr/threadpool/mpmcqueue.cc b/src/core/lib/iomgr/threadpool/mpmcqueue.cc index f33ab0468a8..e641ba70faa 100644 --- a/src/core/lib/iomgr/threadpool/mpmcqueue.cc +++ b/src/core/lib/iomgr/threadpool/mpmcqueue.cc @@ -16,46 +16,42 @@ * */ -#include - #include "src/core/lib/iomgr/threadpool/mpmcqueue.h" -#include - #include #include #include +#include #include #include +#include #include "src/core/lib/gprpp/sync.h" namespace grpc_core { - - inline void* MPMCQueue::PopFront() { void* result = queue_head_->content; Node* head_to_remove = queue_head_; queue_head_ = queue_head_->next; count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED); - gpr_timespec wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), - head_to_remove->insert_time); - // gpr_free(head_to_remove); + gpr_timespec wait_time = + gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), head_to_remove->insert_time); + delete head_to_remove; // Update Stats info stats_.num_completed++; - stats_.total_queue_cycles = gpr_time_add(stats_.total_queue_cycles, - wait_time); + stats_.total_queue_cycles = + gpr_time_add(stats_.total_queue_cycles, wait_time); stats_.max_queue_cycles = gpr_time_max( gpr_convert_clock_type(stats_.max_queue_cycles, GPR_TIMESPAN), wait_time); if (count_.Load(MemoryOrder::RELAXED) == 0) { - stats_.busy_time_cycles = gpr_time_add( - stats_.busy_time_cycles, - gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time)); + stats_.busy_time_cycles = + gpr_time_add(stats_.busy_time_cycles, + gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time)); } // Singal waiting thread @@ -66,7 +62,8 @@ inline void* MPMCQueue::PopFront() { return result; } -MPMCQueue::MPMCQueue() : num_waiters_(0), queue_head_(0), queue_tail_(0) { +MPMCQueue::MPMCQueue() : num_waiters_(0), queue_head_(nullptr), + queue_tail_(nullptr) { count_.Store(0, MemoryOrder::RELAXED); } @@ -80,10 +77,7 @@ MPMCQueue::~MPMCQueue() { void MPMCQueue::Put(void* elem) { MutexLock l(&mu_); - // Node* new_node = static_cast(gpr_malloc(sizeof(Node))); - // new_node->next = nullptr; - // new_node->content = elem; - // new_node->insert_time = gpr_now(GPR_CLOCK_PRECISE); + Node* new_node = static_cast(new Node(elem)); if (count_.Load(MemoryOrder::RELAXED) == 0) { busy_time = gpr_now(GPR_CLOCK_PRECISE); @@ -131,12 +125,12 @@ void MPMCQueue::PrintStats() { MPMCQueue::Stats* MPMCQueue::queue_stats() { MPMCQueue::Stats* result = new Stats(); MutexLock l(&mu_); - result->total_queue_cycles = gpr_time_add(result->total_queue_cycles, - stats_.total_queue_cycles); - result->max_queue_cycles = gpr_time_add(result->max_queue_cycles, - stats_.max_queue_cycles); - result->busy_time_cycles = gpr_time_add(result->busy_time_cycles, - stats_.busy_time_cycles); + result->total_queue_cycles = + gpr_time_add(result->total_queue_cycles, stats_.total_queue_cycles); + result->max_queue_cycles = + gpr_time_add(result->max_queue_cycles, stats_.max_queue_cycles); + result->busy_time_cycles = + gpr_time_add(result->busy_time_cycles, stats_.busy_time_cycles); return result; } diff --git a/src/core/lib/iomgr/threadpool/mpmcqueue.h b/src/core/lib/iomgr/threadpool/mpmcqueue.h index 153784ae939..18d30e876ab 100644 --- a/src/core/lib/iomgr/threadpool/mpmcqueue.h +++ b/src/core/lib/iomgr/threadpool/mpmcqueue.h @@ -16,15 +16,15 @@ * */ -#ifndef GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H -#define GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H +#ifndef GRPC_CORE_LIB_IOMGR_THREADPOOL_MPMCQUEUE_H +#define GRPC_CORE_LIB_IOMGR_THREADPOOL_MPMCQUEUE_H +#include #include +#include -#include #include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/sync.h" -#include namespace grpc_core { @@ -36,7 +36,7 @@ class MPMCQueueInterface { // Put elem into queue immediately at the end of queue. // This might cause to block on full queue depending on implementation. - virtual void Put(void *elem) = 0; + virtual void Put(void* elem) = 0; // Remove the oldest element from the queue and return it. // This might cause to block on empty queue depending on implementation. @@ -48,16 +48,16 @@ class MPMCQueueInterface { class MPMCQueue : public MPMCQueueInterface { public: - struct Stats { // Stats of queue - uint64_t num_started; // Number of elements have been added to queue - uint64_t num_completed; // Number of elements have been removed from - // the queue - gpr_timespec total_queue_cycles; // Total waiting time that all the - // removed elements have spent in queue - gpr_timespec max_queue_cycles; // Max waiting time among all removed - // elements - gpr_timespec busy_time_cycles; // Accumulated amount of time that queue - // was not empty + struct Stats { // Stats of queue + uint64_t num_started; // Number of elements have been added to queue + uint64_t num_completed; // Number of elements have been removed from + // the queue + gpr_timespec total_queue_cycles; // Total waiting time that all the + // removed elements have spent in queue + gpr_timespec max_queue_cycles; // Max waiting time among all removed + // elements + gpr_timespec busy_time_cycles; // Accumulated amount of time that queue + // was not empty Stats() { num_started = 0; @@ -66,23 +66,7 @@ class MPMCQueue : public MPMCQueueInterface { max_queue_cycles = gpr_time_0(GPR_TIMESPAN); busy_time_cycles = gpr_time_0(GPR_TIMESPAN); } - void* operator new(size_t n) { - void* p = gpr_malloc(n); - return p; - } - - void operator delete(void* p) { - gpr_free(p); - } }; - void* operator new(size_t n) { - void* p = gpr_malloc(n); - return p; - } - - void operator delete(void* p) { - gpr_free(p); - } // Create a new Multiple-Producer-Multiple-Consumer Queue. The queue created // will have infinite length. explicit MPMCQueue(); @@ -115,34 +99,26 @@ class MPMCQueue : public MPMCQueueInterface { void* PopFront(); struct Node { - Node *next; // Linking - void *content; // Points to actual element - gpr_timespec insert_time; // Time for stats + Node* next; // Linking + void* content; // Points to actual element + gpr_timespec insert_time; // Time for stats Node(void* c) : content(c) { next = nullptr; insert_time = gpr_now(GPR_CLOCK_PRECISE); } - void* operator new(size_t n) { - void* p = gpr_malloc(n); - return p; - } - - void operator delete(void* p) { - gpr_free(p); - } }; - Mutex mu_; // Protecting lock - CondVar wait_nonempty_; // Wait on empty queue on get - int num_waiters_; // Number of waiters + Mutex mu_; // Protecting lock + CondVar wait_nonempty_; // Wait on empty queue on get + int num_waiters_; // Number of waiters - Node *queue_head_; // Head of the queue, remove position - Node *queue_tail_; // End of queue, insert position - Atomic count_; // Number of elements in queue - Stats stats_; // Stats info - gpr_timespec busy_time; // Start time of busy queue + Node* queue_head_; // Head of the queue, remove position + Node* queue_tail_; // End of queue, insert position + Atomic count_; // Number of elements in queue + Stats stats_; // Stats info + gpr_timespec busy_time; // Start time of busy queue }; } // namespace grpc_core -#endif /* GRPC_CORE_LIB_IOMGR_MPMCQUEUE_H */ +#endif /* GRPC_CORE_LIB_IOMGR_THREADPOOL_MPMCQUEUE_H */ diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc index d562f3c6722..60d06d30d77 100644 --- a/test/core/iomgr/mpmcqueue_test.cc +++ b/test/core/iomgr/mpmcqueue_test.cc @@ -42,17 +42,7 @@ struct WorkItem { int index; bool done; - WorkItem(int i) : index(i) { - done = false; - } - void* operator new(size_t n) { - void* p = gpr_malloc(n); - return p; - } - - void operator delete(void* p) { - gpr_free(p); - } + WorkItem(int i) : index(i) { done = false; } }; static void test_small_queue(void) { @@ -132,38 +122,28 @@ static void test_large_queue(void) { class WorkThread { public: WorkThread(grpc_core::MPMCQueue* mpmcqueue, int start_index, int num_items) - : start_index_(start_index), num_items_(num_items), + : start_index_(start_index), + num_items_(num_items), mpmcqueue_(mpmcqueue) { items_ = NULL; thd_ = grpc_core::Thread( "mpmcq_test_mt_put_thd", - [](void* th) { static_cast(th)->Run(); }, - this); + [](void* th) { static_cast(th)->Run(); }, this); } ~WorkThread() { for (int i = 0; i < num_items_; ++i) { GPR_ASSERT(items_[i]->done); delete items_[i]; } - gpr_free(items_); + delete[] items_; } void Start() { thd_.Start(); } void Join() { thd_.Join(); } - void* operator new(size_t n) { - void* p = gpr_malloc(n); - return p; - } - - void operator delete(void* p) { - gpr_free(p); - } - private: void Run() { - items_ = static_cast( - gpr_malloc(sizeof(WorkItem*) * num_items_)); + items_ = new WorkItem*[num_items_]; for (int i = 0; i < num_items_; ++i) { items_[i] = new WorkItem(start_index_ + i); mpmcqueue_->Put(items_[i]); @@ -177,7 +157,6 @@ class WorkThread { WorkItem** items_; }; - static void test_many_get_thd(void* args) { grpc_core::MPMCQueue* mpmcqueue = static_cast(args); @@ -199,8 +178,7 @@ static void test_many_thread(void) { const int num_work_thd = 10; const int num_get_thd = 20; grpc_core::MPMCQueue mpmcqueue; - WorkThread** work_thds = - static_cast(gpr_malloc(sizeof(WorkThread*) * num_work_thd)); + WorkThread** work_thds = new WorkThread*[num_work_thd]; grpc_core::Thread get_thds[num_get_thd]; gpr_log(GPR_DEBUG, "Fork WorkThread..."); @@ -212,8 +190,8 @@ static void test_many_thread(void) { gpr_log(GPR_DEBUG, "WorkThread Started."); gpr_log(GPR_DEBUG, "For Getter Thread..."); for (int i = 0; i < num_get_thd; ++i) { - get_thds[i] = grpc_core::Thread("mpmcq_test_mt_get_thd", - test_many_get_thd, &mpmcqueue); + get_thds[i] = grpc_core::Thread("mpmcq_test_mt_get_thd", test_many_get_thd, + &mpmcqueue); get_thds[i].Start(); } gpr_log(GPR_DEBUG, "Getter Thread Started."); @@ -234,11 +212,10 @@ static void test_many_thread(void) { for (int i = 0; i < num_work_thd; ++i) { delete work_thds[i]; } - gpr_free(work_thds); + delete[] work_thds; gpr_log(GPR_DEBUG, "Done."); } - int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init();