From 2a7e593ac4ed4a4eefb1c764d3b1e54879fe38fc Mon Sep 17 00:00:00 2001 From: Yunjia Wang Date: Fri, 21 Jun 2019 18:03:29 -0700 Subject: [PATCH] Change consumer thread to class --- test/core/iomgr/mpmcqueue_test.cc | 72 ++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc index 42852b8b583..792d9b602c2 100644 --- a/test/core/iomgr/mpmcqueue_test.cc +++ b/test/core/iomgr/mpmcqueue_test.cc @@ -58,7 +58,7 @@ class ProducerThread { private: void Run() { items_ = - static_cast(gpr_malloc(num_items_ * sizeof(WorkItem*))); + static_cast(gpr_zalloc(num_items_ * sizeof(WorkItem*))); for (int i = 0; i < num_items_; ++i) { items_[i] = grpc_core::New(start_index_ + i); queue_->Put(items_[i]); @@ -72,38 +72,52 @@ class ProducerThread { WorkItem** items_; }; -static void ConsumerThread(void* args) { - grpc_core::InfLenFIFOQueue* queue = - static_cast(args); +class ConsumerThread { + public: + ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) { + thd_ = grpc_core::Thread( + "mpmcq_test_get_thd", + [](void* th) { static_cast(th)->Run(); }, this); + } + ~ConsumerThread() {} - // count number of Get() called in this thread - int count = 0; + void Start() { thd_.Start(); } + void Join() { thd_.Join(); } - WorkItem* item; - while ((item = static_cast(queue->Get())) != nullptr) { - count++; - GPR_ASSERT(!item->done); - item->done = true; - } + private: + void Run() { + // count number of Get() called in this thread + int count = 0; + + WorkItem* item; + while ((item = static_cast(queue_->Get())) != nullptr) { + count++; + GPR_ASSERT(!item->done); + item->done = true; + } - gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count); -} + gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count); + } + grpc_core::InfLenFIFOQueue* queue_; + grpc_core::Thread thd_; +}; static void test_get_empty(void) { gpr_log(GPR_INFO, "test_get_empty"); grpc_core::InfLenFIFOQueue queue; GPR_ASSERT(queue.count() == 0); const int num_threads = 10; - grpc_core::Thread thds[num_threads]; + ConsumerThread** consumer_thds = static_cast( + gpr_zalloc(num_threads * sizeof(ConsumerThread*))); // Fork threads. Threads should block at the beginning since queue is empty. for (int i = 0; i < num_threads; ++i) { - thds[i] = grpc_core::Thread("mpmcq_test_ge_thd", ConsumerThread, &queue); - thds[i].Start(); + consumer_thds[i] = grpc_core::New(&queue); + consumer_thds[i]->Start(); } WorkItem** items = static_cast( - gpr_malloc(THREAD_LARGE_ITERATION * sizeof(WorkItem*))); + gpr_zalloc(THREAD_LARGE_ITERATION * sizeof(WorkItem*))); for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { items[i] = grpc_core::New(i); queue.Put(static_cast(items[i])); @@ -114,7 +128,7 @@ static void test_get_empty(void) { queue.Put(nullptr); } for (int i = 0; i < num_threads; ++i) { - thds[i].Join(); + consumer_thds[i]->Join(); } gpr_log(GPR_DEBUG, "Checking and Cleaning Up..."); for (int i = 0; i < THREAD_LARGE_ITERATION; ++i) { @@ -122,6 +136,10 @@ static void test_get_empty(void) { grpc_core::Delete(items[i]); } gpr_free(items); + for (int i = 0; i < num_threads; ++i) { + grpc_core::Delete(consumer_thds[i]); + } + gpr_free(consumer_thds); gpr_log(GPR_DEBUG, "Done."); } @@ -145,8 +163,9 @@ static void test_many_thread(void) { const int num_get_thd = 20; grpc_core::InfLenFIFOQueue queue; ProducerThread** work_thds = static_cast( - gpr_malloc(num_work_thd * sizeof(ProducerThread*))); - grpc_core::Thread get_thds[num_get_thd]; + gpr_zalloc(num_work_thd * sizeof(ProducerThread*))); + ConsumerThread** consumer_thds = static_cast( + gpr_zalloc(num_get_thd * sizeof(ConsumerThread*))); gpr_log(GPR_DEBUG, "Fork ProducerThread..."); for (int i = 0; i < num_work_thd; ++i) { @@ -157,9 +176,8 @@ static void test_many_thread(void) { gpr_log(GPR_DEBUG, "ProducerThread Started."); gpr_log(GPR_DEBUG, "Fork Getter Thread..."); for (int i = 0; i < num_get_thd; ++i) { - get_thds[i] = - grpc_core::Thread("mpmcq_test_mt_get_thd", ConsumerThread, &queue); - get_thds[i].Start(); + consumer_thds[i] = grpc_core::New(&queue); + consumer_thds[i]->Start(); } gpr_log(GPR_DEBUG, "Getter Thread Started."); gpr_log(GPR_DEBUG, "Waiting ProducerThread to finish..."); @@ -172,7 +190,7 @@ static void test_many_thread(void) { queue.Put(nullptr); } for (int i = 0; i < num_get_thd; ++i) { - get_thds[i].Join(); + consumer_thds[i]->Join(); } gpr_log(GPR_DEBUG, "All Getter Thread Terminated."); gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up..."); @@ -180,6 +198,10 @@ static void test_many_thread(void) { grpc_core::Delete(work_thds[i]); } gpr_free(work_thds); + for (int i = 0; i < num_get_thd; ++i) { + grpc_core::Delete(consumer_thds[i]); + } + gpr_free(consumer_thds); gpr_log(GPR_DEBUG, "Done."); }