pull/19519/head
Yunjia Wang 6 years ago
parent 74c862c682
commit 500cb1f99b
  1. 10
      src/core/lib/iomgr/executor/threadpool.h
  2. 11
      test/cpp/microbenchmarks/bm_threadpool.cc

@ -19,9 +19,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor/mpmcqueue.h"
@ -100,7 +99,7 @@ class ThreadPoolWorker {
// A fixed size thread pool implementation of abstract thread pool interface.
// In this implementation, the number of threads in pool is fixed, but the
// capacity of closure queue is unlimited.
class ThreadPool : public ThreadPoolInterface {
class ThreadPool : public ThreadPoolInterface {
public:
// Creates a thread pool with size of "num_threads", with default thread name
// "ThreadPoolWorker" and all thread options set to default.
@ -108,7 +107,7 @@ class ThreadPool : public ThreadPoolInterface {
// Same as ThreadPool(int num_threads) constructor, except
// that it also sets "thd_name" as the name of all threads in the thread pool.
ThreadPool(int num_threads, const char *thd_name);
ThreadPool(int num_threads, const char* thd_name);
// Same as ThreadPool(const char *thd_name, int num_threads) constructor,
// except that is also set thread_options for threads.
@ -117,7 +116,7 @@ class ThreadPool : public ThreadPoolInterface {
// value 0, default ThreadPool stack size will be used. The current default
// stack size of this implementation is 1952K for mobile platform and 64K for
// all others.
ThreadPool(int num_threads, const char *thd_name,
ThreadPool(int num_threads, const char* thd_name,
const Thread::Options& thread_options);
// Waits for all pending closures to complete, then shuts down thread pool.
@ -148,7 +147,6 @@ class ThreadPool : public ThreadPoolInterface {
bool HasBeenShutDown();
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_THREADPOOL_THREADPOOL_H */

@ -47,6 +47,7 @@ class BlockingCounter {
cv_.wait(l);
}
}
private:
int count_;
std::mutex mu_;
@ -61,7 +62,7 @@ class BlockingCounter {
class AddAnotherFunctor : public grpc_experimental_completion_queue_functor {
public:
AddAnotherFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
int num_add)
int num_add)
: pool_(pool), counter_(counter), num_add_(num_add) {
functor_run = &AddAnotherFunctor::Run;
internal_next = this;
@ -132,7 +133,6 @@ BENCHMARK(BM_ThreadPool8AddAnother)
->UseRealTime()
->Ranges({{1, 1024}, {524288, 1048576}}); // 512K ~ 1M
static void BM_ThreadPool16AddAnother(benchmark::State& state) {
ThreadPoolAddAnotherHelper(state, 16);
}
@ -140,7 +140,6 @@ BENCHMARK(BM_ThreadPool16AddAnother)
->UseRealTime()
->Ranges({{1, 1024}, {524288, 1048576}});
static void BM_ThreadPool32AddAnother(benchmark::State& state) {
ThreadPoolAddAnotherHelper(state, 32);
}
@ -176,10 +175,8 @@ BENCHMARK(BM_ThreadPool2048AddAnother)
->UseRealTime()
->Ranges({{1, 1024}, {524288, 1048576}});
// A functor class that will delete self on end of running.
class SuicideFunctorForAdd
: public grpc_experimental_completion_queue_functor {
class SuicideFunctorForAdd : public grpc_experimental_completion_queue_functor {
public:
SuicideFunctorForAdd() {
functor_run = &SuicideFunctorForAdd::Run;
@ -273,7 +270,7 @@ class ShortWorkFunctorForAdd
val_ = 0;
}
~ShortWorkFunctorForAdd() {}
static void Run(grpc_experimental_completion_queue_functor *cb, int ok) {
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
auto* callback = static_cast<ShortWorkFunctorForAdd*>(cb);
for (int i = 0; i < 1000; ++i) {
callback->val_++;

Loading…
Cancel
Save