diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 2dee79737ca..a46e5ea28fd 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -49,8 +49,6 @@ class MPMCQueueInterface { // Returns number of elements in the queue currently virtual int count() const GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS }; class InfLenFIFOQueue : public MPMCQueueInterface { diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc index ebbf060961f..8f89b5c08bf 100644 --- a/test/core/iomgr/mpmcqueue_test.cc +++ b/test/core/iomgr/mpmcqueue_test.cc @@ -37,6 +37,47 @@ struct WorkItem { WorkItem(int i) : index(i) { done = false; } }; +// Thread for put items into queue +class ProducerThread { + public: + ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index, + int num_items) + : start_index_(start_index), + num_items_(num_items), + queue_(queue) { + items_ = nullptr; + thd_ = grpc_core::Thread( + "mpmcq_test_mt_put_thd", + [](void* th) { static_cast(th)->Run(); }, this); + } + ~ProducerThread() { + for (int i = 0; i < num_items_; ++i) { + GPR_ASSERT(items_[i]->done); + delete items_[i]; + } + delete[] items_; + } + + void Start() { thd_.Start(); } + void Join() { thd_.Join(); } + + private: + void Run() { + items_ = new WorkItem*[num_items_]; + for (int i = 0; i < num_items_; ++i) { + items_[i] = new WorkItem(start_index_ + i); + queue_->Put(items_[i]); + } + } + + int start_index_; + int num_items_; + grpc_core::InfLenFIFOQueue* queue_; + grpc_core::Thread thd_; + WorkItem** items_; +}; + + static void ConsumerThread(void* args) { grpc_core::InfLenFIFOQueue* queue = static_cast(args); @@ -96,48 +137,6 @@ static void test_FIFO(void) { } } -// Thread for put items into queue -class ProducerThread { - public: - ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index, - int num_items) - : start_index_(start_index), - num_items_(num_items), - queue_(queue) { - items_ = nullptr; - thd_ = grpc_core::Thread( - "mpmcq_test_mt_put_thd", - [](void* th) { static_cast(th)->Run(); }, this); - } - ~ProducerThread() { - for (int i = 0; i < num_items_; ++i) { - GPR_ASSERT(items_[i]->done); - delete items_[i]; - } - delete[] items_; - } - - void Start() { thd_.Start(); } - void Join() { thd_.Join(); } - - private: - void Run() { - items_ = new WorkItem*[num_items_]; - for (int i = 0; i < num_items_; ++i) { - items_[i] = new WorkItem(start_index_ + i); - queue_->Put(items_[i]); - } - } - - int start_index_; - int num_items_; - grpc_core::InfLenFIFOQueue* queue_; - grpc_core::Thread thd_; - WorkItem** items_; -}; - - - static void test_many_thread(void) { gpr_log(GPR_INFO, "test_many_thread"); const int num_work_thd = 10;