diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 5c62221e9d4..c6102b3add0 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -66,8 +66,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // Removes the oldest element from the queue and returns it. // This routine will cause the thread to block if queue is currently empty. - // Argument wait_time should be passed in when trace flag turning on (for - // collecting stats info purpose.) + // Argument wait_time should be passed in when turning on the trace flag + // grpc_thread_pool_trace (for collecting stats info purpose.) void* Get(gpr_timespec* wait_time = nullptr); // Returns number of elements in queue currently. diff --git a/src/core/lib/iomgr/executor/threadpool.h b/src/core/lib/iomgr/executor/threadpool.h index d68cbbf7328..3f79bbe9d08 100644 --- a/src/core/lib/iomgr/executor/threadpool.h +++ b/src/core/lib/iomgr/executor/threadpool.h @@ -79,8 +79,6 @@ class ThreadPoolWorker { void Start() { thd_.Start(); } void Join() { thd_.Join(); } - // GRPC_ABSTRACT_BASE_CLASS - private: // struct for tracking stats of thread struct Stats { diff --git a/test/core/iomgr/threadpool_test.cc b/test/core/iomgr/threadpool_test.cc index e0a9d73f48b..4b36c3dc185 100644 --- a/test/core/iomgr/threadpool_test.cc +++ b/test/core/iomgr/threadpool_test.cc @@ -20,16 +20,16 @@ #include "test/core/util/test_config.h" -#define SMALL_THREAD_POOL_SIZE 20 -#define LARGE_THREAD_POOL_SIZE 100 -#define THREAD_SMALL_ITERATION 100 -#define THREAD_LARGE_ITERATION 10000 +const int kSmallThreadPoolSize = 20; +const int kLargeThreadPoolSize = 100; +const int kThreadSmallIter = 100; +const int kThreadLargeIter = 10000; // Simple functor for testing. It will count how many times being called. class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor { public: friend class SimpleFunctorCheckForAdd; - SimpleFunctorForAdd() : count_(0) { + SimpleFunctorForAdd() { functor_run = &SimpleFunctorForAdd::Run; internal_next = this; internal_success = 0; @@ -51,32 +51,32 @@ class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor { class SimpleFunctorCheckForAdd : public grpc_experimental_completion_queue_functor { public: - SimpleFunctorCheckForAdd( - struct grpc_experimental_completion_queue_functor* cb, int ok) { + SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) { functor_run = &SimpleFunctorCheckForAdd::Run; - internal_next = cb; internal_success = ok; } ~SimpleFunctorCheckForAdd() {} static void Run(struct grpc_experimental_completion_queue_functor* cb, int ok) { auto* callback = static_cast(cb); - auto* cb_check = static_cast(callback->internal_next); - GPR_ASSERT(cb_check->count_.Load(grpc_core::MemoryOrder::RELAXED) == ok); + (*callback->count_)++; + GPR_ASSERT(*callback->count_ == callback->internal_success); } + private: + int* count_; }; static void test_add(void) { gpr_log(GPR_INFO, "test_add"); grpc_core::ThreadPool* pool = - grpc_core::New(SMALL_THREAD_POOL_SIZE, "test_add"); + grpc_core::New(kSmallThreadPoolSize, "test_add"); SimpleFunctorForAdd* functor = grpc_core::New(); - for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { + for (int i = 0; i < kThreadSmallIter; ++i) { pool->Add(functor); } grpc_core::Delete(pool); - GPR_ASSERT(functor->count() == THREAD_SMALL_ITERATION); + GPR_ASSERT(functor->count() == kThreadSmallIter); grpc_core::Delete(functor); gpr_log(GPR_DEBUG, "Done."); } @@ -108,18 +108,32 @@ class WorkThread { grpc_core::Thread thd_; }; +static void test_constructor(void) { + // Size is 0 case + grpc_core::ThreadPool* pool_size_zero = + grpc_core::New(0); + GPR_ASSERT(pool_size_zero->pool_capacity() == 0); + Delete(pool_size_zero); + // Tests options + grpc_core::Thread::Options options; + options.set_stack_size(192 * 1024); // Random non-default value + grpc_core::ThreadPool* pool = + grpc_core::New(0, "test_constructor", options); + GPR_ASSERT(pool->thread_options().stack_size() == options.stack_size()); + Delete(pool); +} + static void test_multi_add(void) { gpr_log(GPR_INFO, "test_multi_add"); const int num_work_thds = 10; grpc_core::ThreadPool* pool = grpc_core::New( - LARGE_THREAD_POOL_SIZE, "test_multi_add"); + kLargeThreadPoolSize, "test_multi_add"); SimpleFunctorForAdd* functor = grpc_core::New(); WorkThread** work_thds = static_cast( gpr_zalloc(sizeof(WorkThread*) * num_work_thds)); gpr_log(GPR_DEBUG, "Fork threads for adding..."); for (int i = 0; i < num_work_thds; ++i) { - work_thds[i] = - grpc_core::New(pool, functor, THREAD_LARGE_ITERATION); + work_thds[i] = grpc_core::New(pool, functor, kThreadLargeIter); work_thds[i]->Start(); } // Wait for all threads finish @@ -133,29 +147,27 @@ static void test_multi_add(void) { gpr_log(GPR_DEBUG, "Waiting for all closures finish..."); // Destructor of thread pool will wait for all closures to finish grpc_core::Delete(pool); - GPR_ASSERT(functor->count() == THREAD_LARGE_ITERATION * num_work_thds); + GPR_ASSERT(functor->count() == kThreadLargeIter * num_work_thds); grpc_core::Delete(functor); gpr_log(GPR_DEBUG, "Done."); } static void test_one_thread_FIFO(void) { gpr_log(GPR_INFO, "test_one_thread_FIFO"); + int counter = 0; grpc_core::ThreadPool* pool = grpc_core::New(1, "test_one_thread_FIFO"); - SimpleFunctorForAdd* functor = grpc_core::New(); SimpleFunctorCheckForAdd** check_functors = - static_cast(gpr_zalloc( - sizeof(SimpleFunctorCheckForAdd*) * THREAD_SMALL_ITERATION)); - for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { - pool->Add(functor); + static_cast( + gpr_zalloc(sizeof(SimpleFunctorCheckForAdd*) * kThreadSmallIter)); + for (int i = 0; i < kThreadSmallIter; ++i) { check_functors[i] = - grpc_core::New(functor, i + 1); + grpc_core::New(i + 1, &counter); pool->Add(check_functors[i]); } // Destructor of pool will wait until all closures finished. grpc_core::Delete(pool); - grpc_core::Delete(functor); - for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { + for (int i = 0; i < kThreadSmallIter; ++i) { grpc_core::Delete(check_functors[i]); } gpr_free(check_functors); @@ -165,6 +177,7 @@ static void test_one_thread_FIFO(void) { int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); + test_constructor(); test_add(); test_multi_add(); test_one_thread_FIFO();