Add constructor test case

pull/19544/head
Yunjia Wang 5 years ago
parent 7bc9aba863
commit c15d246f6a
  1. 4
      src/core/lib/iomgr/executor/mpmcqueue.h
  2. 2
      src/core/lib/iomgr/executor/threadpool.h
  3. 63
      test/core/iomgr/threadpool_test.cc

@ -66,8 +66,8 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// Removes the oldest element from the queue and returns it. // Removes the oldest element from the queue and returns it.
// This routine will cause the thread to block if queue is currently empty. // This routine will cause the thread to block if queue is currently empty.
// Argument wait_time should be passed in when trace flag turning on (for // Argument wait_time should be passed in when turning on the trace flag
// collecting stats info purpose.) // grpc_thread_pool_trace (for collecting stats info purpose.)
void* Get(gpr_timespec* wait_time = nullptr); void* Get(gpr_timespec* wait_time = nullptr);
// Returns number of elements in queue currently. // Returns number of elements in queue currently.

@ -79,8 +79,6 @@ class ThreadPoolWorker {
void Start() { thd_.Start(); } void Start() { thd_.Start(); }
void Join() { thd_.Join(); } void Join() { thd_.Join(); }
// GRPC_ABSTRACT_BASE_CLASS
private: private:
// struct for tracking stats of thread // struct for tracking stats of thread
struct Stats { struct Stats {

@ -20,16 +20,16 @@
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
#define SMALL_THREAD_POOL_SIZE 20 const int kSmallThreadPoolSize = 20;
#define LARGE_THREAD_POOL_SIZE 100 const int kLargeThreadPoolSize = 100;
#define THREAD_SMALL_ITERATION 100 const int kThreadSmallIter = 100;
#define THREAD_LARGE_ITERATION 10000 const int kThreadLargeIter = 10000;
// Simple functor for testing. It will count how many times being called. // Simple functor for testing. It will count how many times being called.
class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor { class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
public: public:
friend class SimpleFunctorCheckForAdd; friend class SimpleFunctorCheckForAdd;
SimpleFunctorForAdd() : count_(0) { SimpleFunctorForAdd() {
functor_run = &SimpleFunctorForAdd::Run; functor_run = &SimpleFunctorForAdd::Run;
internal_next = this; internal_next = this;
internal_success = 0; internal_success = 0;
@ -51,32 +51,32 @@ class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
class SimpleFunctorCheckForAdd class SimpleFunctorCheckForAdd
: public grpc_experimental_completion_queue_functor { : public grpc_experimental_completion_queue_functor {
public: public:
SimpleFunctorCheckForAdd( SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) {
struct grpc_experimental_completion_queue_functor* cb, int ok) {
functor_run = &SimpleFunctorCheckForAdd::Run; functor_run = &SimpleFunctorCheckForAdd::Run;
internal_next = cb;
internal_success = ok; internal_success = ok;
} }
~SimpleFunctorCheckForAdd() {} ~SimpleFunctorCheckForAdd() {}
static void Run(struct grpc_experimental_completion_queue_functor* cb, static void Run(struct grpc_experimental_completion_queue_functor* cb,
int ok) { int ok) {
auto* callback = static_cast<SimpleFunctorCheckForAdd*>(cb); auto* callback = static_cast<SimpleFunctorCheckForAdd*>(cb);
auto* cb_check = static_cast<SimpleFunctorForAdd*>(callback->internal_next); (*callback->count_)++;
GPR_ASSERT(cb_check->count_.Load(grpc_core::MemoryOrder::RELAXED) == ok); GPR_ASSERT(*callback->count_ == callback->internal_success);
} }
private:
int* count_;
}; };
static void test_add(void) { static void test_add(void) {
gpr_log(GPR_INFO, "test_add"); gpr_log(GPR_INFO, "test_add");
grpc_core::ThreadPool* pool = grpc_core::ThreadPool* pool =
grpc_core::New<grpc_core::ThreadPool>(SMALL_THREAD_POOL_SIZE, "test_add"); grpc_core::New<grpc_core::ThreadPool>(kSmallThreadPoolSize, "test_add");
SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>(); SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>();
for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { for (int i = 0; i < kThreadSmallIter; ++i) {
pool->Add(functor); pool->Add(functor);
} }
grpc_core::Delete(pool); grpc_core::Delete(pool);
GPR_ASSERT(functor->count() == THREAD_SMALL_ITERATION); GPR_ASSERT(functor->count() == kThreadSmallIter);
grpc_core::Delete(functor); grpc_core::Delete(functor);
gpr_log(GPR_DEBUG, "Done."); gpr_log(GPR_DEBUG, "Done.");
} }
@ -108,18 +108,32 @@ class WorkThread {
grpc_core::Thread thd_; grpc_core::Thread thd_;
}; };
static void test_constructor(void) {
// Size is 0 case
grpc_core::ThreadPool* pool_size_zero =
grpc_core::New<grpc_core::ThreadPool>(0);
GPR_ASSERT(pool_size_zero->pool_capacity() == 0);
Delete(pool_size_zero);
// Tests options
grpc_core::Thread::Options options;
options.set_stack_size(192 * 1024); // Random non-default value
grpc_core::ThreadPool* pool =
grpc_core::New<grpc_core::ThreadPool>(0, "test_constructor", options);
GPR_ASSERT(pool->thread_options().stack_size() == options.stack_size());
Delete(pool);
}
static void test_multi_add(void) { static void test_multi_add(void) {
gpr_log(GPR_INFO, "test_multi_add"); gpr_log(GPR_INFO, "test_multi_add");
const int num_work_thds = 10; const int num_work_thds = 10;
grpc_core::ThreadPool* pool = grpc_core::New<grpc_core::ThreadPool>( grpc_core::ThreadPool* pool = grpc_core::New<grpc_core::ThreadPool>(
LARGE_THREAD_POOL_SIZE, "test_multi_add"); kLargeThreadPoolSize, "test_multi_add");
SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>(); SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>();
WorkThread** work_thds = static_cast<WorkThread**>( WorkThread** work_thds = static_cast<WorkThread**>(
gpr_zalloc(sizeof(WorkThread*) * num_work_thds)); gpr_zalloc(sizeof(WorkThread*) * num_work_thds));
gpr_log(GPR_DEBUG, "Fork threads for adding..."); gpr_log(GPR_DEBUG, "Fork threads for adding...");
for (int i = 0; i < num_work_thds; ++i) { for (int i = 0; i < num_work_thds; ++i) {
work_thds[i] = work_thds[i] = grpc_core::New<WorkThread>(pool, functor, kThreadLargeIter);
grpc_core::New<WorkThread>(pool, functor, THREAD_LARGE_ITERATION);
work_thds[i]->Start(); work_thds[i]->Start();
} }
// Wait for all threads finish // Wait for all threads finish
@ -133,29 +147,27 @@ static void test_multi_add(void) {
gpr_log(GPR_DEBUG, "Waiting for all closures finish..."); gpr_log(GPR_DEBUG, "Waiting for all closures finish...");
// Destructor of thread pool will wait for all closures to finish // Destructor of thread pool will wait for all closures to finish
grpc_core::Delete(pool); grpc_core::Delete(pool);
GPR_ASSERT(functor->count() == THREAD_LARGE_ITERATION * num_work_thds); GPR_ASSERT(functor->count() == kThreadLargeIter * num_work_thds);
grpc_core::Delete(functor); grpc_core::Delete(functor);
gpr_log(GPR_DEBUG, "Done."); gpr_log(GPR_DEBUG, "Done.");
} }
static void test_one_thread_FIFO(void) { static void test_one_thread_FIFO(void) {
gpr_log(GPR_INFO, "test_one_thread_FIFO"); gpr_log(GPR_INFO, "test_one_thread_FIFO");
int counter = 0;
grpc_core::ThreadPool* pool = grpc_core::ThreadPool* pool =
grpc_core::New<grpc_core::ThreadPool>(1, "test_one_thread_FIFO"); grpc_core::New<grpc_core::ThreadPool>(1, "test_one_thread_FIFO");
SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>();
SimpleFunctorCheckForAdd** check_functors = SimpleFunctorCheckForAdd** check_functors =
static_cast<SimpleFunctorCheckForAdd**>(gpr_zalloc( static_cast<SimpleFunctorCheckForAdd**>(
sizeof(SimpleFunctorCheckForAdd*) * THREAD_SMALL_ITERATION)); gpr_zalloc(sizeof(SimpleFunctorCheckForAdd*) * kThreadSmallIter));
for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) { for (int i = 0; i < kThreadSmallIter; ++i) {
pool->Add(functor);
check_functors[i] = check_functors[i] =
grpc_core::New<SimpleFunctorCheckForAdd>(functor, i + 1); grpc_core::New<SimpleFunctorCheckForAdd>(i + 1, &counter);
pool->Add(check_functors[i]); pool->Add(check_functors[i]);
} }
// Destructor of pool will wait until all closures finished. // Destructor of pool will wait until all closures finished.
grpc_core::Delete(pool); grpc_core::Delete(pool);
grpc_core::Delete(functor); for (int i = 0; i < kThreadSmallIter; ++i) {
for (int i = 0; i < THREAD_SMALL_ITERATION; ++i) {
grpc_core::Delete(check_functors[i]); grpc_core::Delete(check_functors[i]);
} }
gpr_free(check_functors); gpr_free(check_functors);
@ -165,6 +177,7 @@ static void test_one_thread_FIFO(void) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv); grpc::testing::TestEnvironment env(argc, argv);
grpc_init(); grpc_init();
test_constructor();
test_add(); test_add();
test_multi_add(); test_multi_add();
test_one_thread_FIFO(); test_one_thread_FIFO();

Loading…
Cancel
Save