Merge pull request #19544 from yunjiaw26/thread_pool

Threadpool
pull/19690/head
yunjiaw26 6 years ago committed by GitHub
commit 1e607d3dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 3
      BUILD.gn
  3. 41
      CMakeLists.txt
  4. 42
      Makefile
  5. 12
      build.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 4
      grpc.gyp
  12. 2
      package.xml
  13. 13
      src/core/lib/iomgr/executor/mpmcqueue.cc
  14. 7
      src/core/lib/iomgr/executor/mpmcqueue.h
  15. 138
      src/core/lib/iomgr/executor/threadpool.cc
  16. 153
      src/core/lib/iomgr/executor/threadpool.h
  17. 1
      src/python/grpcio/grpc_core_dependencies.py
  18. 11
      test/core/iomgr/BUILD
  19. 192
      test/core/iomgr/threadpool_test.cc
  20. 1
      tools/doxygen/Doxyfile.c++.internal
  21. 2
      tools/doxygen/Doxyfile.core.internal
  22. 19
      tools/run_tests/generated/sources_and_headers.json
  23. 24
      tools/run_tests/generated/tests.json

@ -704,6 +704,7 @@ grpc_cc_library(
"src/core/lib/iomgr/exec_ctx.cc",
"src/core/lib/iomgr/executor.cc",
"src/core/lib/iomgr/executor/mpmcqueue.cc",
"src/core/lib/iomgr/executor/threadpool.cc",
"src/core/lib/iomgr/fork_posix.cc",
"src/core/lib/iomgr/fork_windows.cc",
"src/core/lib/iomgr/gethostname_fallback.cc",
@ -862,6 +863,7 @@ grpc_cc_library(
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/executor/mpmcqueue.h",
"src/core/lib/iomgr/executor/threadpool.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/gevent_util.h",
"src/core/lib/iomgr/grpc_if_nametoindex.h",

@ -527,6 +527,8 @@ config("grpc_config") {
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/executor/mpmcqueue.cc",
"src/core/lib/iomgr/executor/mpmcqueue.h",
"src/core/lib/iomgr/executor/threadpool.cc",
"src/core/lib/iomgr/executor/threadpool.h",
"src/core/lib/iomgr/fork_posix.cc",
"src/core/lib/iomgr/fork_windows.cc",
"src/core/lib/iomgr/gethostname.h",
@ -1242,6 +1244,7 @@ config("grpc_config") {
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/executor/mpmcqueue.h",
"src/core/lib/iomgr/executor/threadpool.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/grpc_if_nametoindex.h",
"src/core/lib/iomgr/internal_errqueue.h",

@ -427,6 +427,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c tcp_server_posix_test)
endif()
add_dependencies(buildtests_c tcp_server_uv_test)
add_dependencies(buildtests_c threadpool_test)
add_dependencies(buildtests_c time_averaged_stats_test)
add_dependencies(buildtests_c timeout_encoding_test)
add_dependencies(buildtests_c timer_heap_test)
@ -1035,6 +1036,7 @@ add_library(grpc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -1474,6 +1476,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -1895,6 +1898,7 @@ add_library(grpc_test_util
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -2229,6 +2233,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -2539,6 +2544,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -3580,6 +3586,7 @@ add_library(grpc++_cronet
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -10517,6 +10524,40 @@ target_link_libraries(tcp_server_uv_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(threadpool_test
test/core/iomgr/threadpool_test.cc
)
target_include_directories(threadpool_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
)
target_link_libraries(threadpool_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
)
# avoid dependency on libstdc++
if (_gRPC_CORE_NOSTDCXX_FLAGS)
set_target_properties(threadpool_test PROPERTIES LINKER_LANGUAGE C)
target_compile_options(threadpool_test PRIVATE $<$<COMPILE_LANGUAGE:CXX>:${_gRPC_CORE_NOSTDCXX_FLAGS}>)
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(time_averaged_stats_test
test/core/iomgr/time_averaged_stats_test.cc
)

@ -1132,6 +1132,7 @@ tcp_client_uv_test: $(BINDIR)/$(CONFIG)/tcp_client_uv_test
tcp_posix_test: $(BINDIR)/$(CONFIG)/tcp_posix_test
tcp_server_posix_test: $(BINDIR)/$(CONFIG)/tcp_server_posix_test
tcp_server_uv_test: $(BINDIR)/$(CONFIG)/tcp_server_uv_test
threadpool_test: $(BINDIR)/$(CONFIG)/threadpool_test
time_averaged_stats_test: $(BINDIR)/$(CONFIG)/time_averaged_stats_test
timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test
timer_heap_test: $(BINDIR)/$(CONFIG)/timer_heap_test
@ -1553,6 +1554,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/tcp_posix_test \
$(BINDIR)/$(CONFIG)/tcp_server_posix_test \
$(BINDIR)/$(CONFIG)/tcp_server_uv_test \
$(BINDIR)/$(CONFIG)/threadpool_test \
$(BINDIR)/$(CONFIG)/time_averaged_stats_test \
$(BINDIR)/$(CONFIG)/timeout_encoding_test \
$(BINDIR)/$(CONFIG)/timer_heap_test \
@ -2189,6 +2191,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/tcp_server_posix_test || ( echo test tcp_server_posix_test failed ; exit 1 )
$(E) "[RUN] Testing tcp_server_uv_test"
$(Q) $(BINDIR)/$(CONFIG)/tcp_server_uv_test || ( echo test tcp_server_uv_test failed ; exit 1 )
$(E) "[RUN] Testing threadpool_test"
$(Q) $(BINDIR)/$(CONFIG)/threadpool_test || ( echo test threadpool_test failed ; exit 1 )
$(E) "[RUN] Testing time_averaged_stats_test"
$(Q) $(BINDIR)/$(CONFIG)/time_averaged_stats_test || ( echo test time_averaged_stats_test failed ; exit 1 )
$(E) "[RUN] Testing timeout_encoding_test"
@ -3546,6 +3550,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -3976,6 +3981,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -4387,6 +4393,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -4705,6 +4712,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -4986,6 +4994,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -5997,6 +6006,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -13440,6 +13450,38 @@ endif
endif
THREADPOOL_TEST_SRC = \
test/core/iomgr/threadpool_test.cc \
THREADPOOL_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(THREADPOOL_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/threadpool_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/threadpool_test: $(THREADPOOL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(THREADPOOL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/threadpool_test
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/threadpool_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_threadpool_test: $(THREADPOOL_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(THREADPOOL_TEST_OBJS:.o=.dep)
endif
endif
TIME_AVERAGED_STATS_TEST_SRC = \
test/core/iomgr/time_averaged_stats_test.cc \

@ -281,6 +281,7 @@ filegroups:
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/executor/mpmcqueue.cc
- src/core/lib/iomgr/executor/threadpool.cc
- src/core/lib/iomgr/fork_posix.cc
- src/core/lib/iomgr/fork_windows.cc
- src/core/lib/iomgr/gethostname_fallback.cc
@ -469,6 +470,7 @@ filegroups:
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/executor/mpmcqueue.h
- src/core/lib/iomgr/executor/threadpool.h
- src/core/lib/iomgr/gethostname.h
- src/core/lib/iomgr/grpc_if_nametoindex.h
- src/core/lib/iomgr/internal_errqueue.h
@ -3734,6 +3736,16 @@ targets:
- gpr
exclude_iomgrs:
- native
- name: threadpool_test
build: test
language: c
src:
- test/core/iomgr/threadpool_test.cc
deps:
- grpc_test_util
- grpc
- gpr
uses_polling: false
- name: time_averaged_stats_test
build: test
language: c

@ -128,6 +128,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \

@ -103,6 +103,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\exec_ctx.cc " +
"src\\core\\lib\\iomgr\\executor.cc " +
"src\\core\\lib\\iomgr\\executor\\mpmcqueue.cc " +
"src\\core\\lib\\iomgr\\executor\\threadpool.cc " +
"src\\core\\lib\\iomgr\\fork_posix.cc " +
"src\\core\\lib\\iomgr\\fork_windows.cc " +
"src\\core\\lib\\iomgr\\gethostname_fallback.cc " +

@ -479,6 +479,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',
@ -685,6 +686,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',

@ -438,6 +438,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',
@ -592,6 +593,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',
@ -1094,6 +1096,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',

@ -372,6 +372,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
s.files += %w( src/core/lib/iomgr/executor.h )
s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.h )
s.files += %w( src/core/lib/iomgr/executor/threadpool.h )
s.files += %w( src/core/lib/iomgr/gethostname.h )
s.files += %w( src/core/lib/iomgr/grpc_if_nametoindex.h )
s.files += %w( src/core/lib/iomgr/internal_errqueue.h )
@ -526,6 +527,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/exec_ctx.cc )
s.files += %w( src/core/lib/iomgr/executor.cc )
s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.cc )
s.files += %w( src/core/lib/iomgr/executor/threadpool.cc )
s.files += %w( src/core/lib/iomgr/fork_posix.cc )
s.files += %w( src/core/lib/iomgr/fork_windows.cc )
s.files += %w( src/core/lib/iomgr/gethostname_fallback.cc )

@ -310,6 +310,7 @@
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',
@ -687,6 +688,7 @@
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',
@ -938,6 +940,7 @@
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',
@ -1165,6 +1168,7 @@
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',

@ -377,6 +377,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/mpmcqueue.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/threadpool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/grpc_if_nametoindex.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/internal_errqueue.h" role="src" />
@ -531,6 +532,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/mpmcqueue.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/threadpool.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/fork_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/fork_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname_fallback.cc" role="src" />

@ -98,14 +98,25 @@ void InfLenFIFOQueue::Put(void* elem) {
}
}
void* InfLenFIFOQueue::Get() {
void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
MutexLock l(&mu_);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
gpr_timespec start_time;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}
num_waiters_++;
do {
wait_nonempty_.Wait(&mu_);
} while (count_.Load(MemoryOrder::RELAXED) == 0);
num_waiters_--;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
*wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
}
}
GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
return PopFront();

@ -42,7 +42,8 @@ class MPMCQueueInterface {
// Removes the oldest element from the queue and return it.
// This might cause to block on empty queue depending on implementation.
virtual void* Get() GRPC_ABSTRACT;
// Optional argument for collecting stats purpose.
virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT;
// Returns number of elements in the queue currently
virtual int count() const GRPC_ABSTRACT;
@ -65,7 +66,9 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// Removes the oldest element from the queue and returns it.
// This routine will cause the thread to block if queue is currently empty.
void* Get();
// Argument wait_time should be passed in when turning on the trace flag
// grpc_thread_pool_trace (for collecting stats info purpose.)
void* Get(gpr_timespec* wait_time = nullptr);
// Returns number of elements in queue currently.
// There might be concurrently add/remove on queue, so count might change

@ -0,0 +1,138 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/executor/threadpool.h"
namespace grpc_core {
void ThreadPoolWorker::Run() {
while (true) {
void* elem;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
// Updates stats and print
gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN);
elem = queue_->Get(&wait_time);
stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time);
gpr_log(GPR_INFO,
"ThreadPool Worker [%s %d] Stats: sleep_time %f",
thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time));
} else {
elem = queue_->Get(nullptr);
}
if (elem == nullptr) {
break;
}
// Runs closure
auto* closure =
static_cast<grpc_experimental_completion_queue_functor*>(elem);
closure->functor_run(closure, closure->internal_success);
}
}
void ThreadPool::SharedThreadPoolConstructor() {
// All worker threads in thread pool must be joinable.
thread_options_.set_joinable(true);
// Create at least 1 worker thread.
if (num_threads_ <= 0) num_threads_ = 1;
queue_ = New<InfLenFIFOQueue>();
threads_ = static_cast<ThreadPoolWorker**>(
gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*)));
for (int i = 0; i < num_threads_; ++i) {
threads_[i] =
New<ThreadPoolWorker>(thd_name_, this, queue_, thread_options_, i);
threads_[i]->Start();
}
}
size_t ThreadPool::DefaultStackSize() {
#if defined(__ANDROID__) || defined(__APPLE__)
return 1952 * 1024;
#else
return 64 * 1024;
#endif
}
void ThreadPool::AssertHasNotBeenShutDown() {
// For debug checking purpose, using RELAXED order is sufficient.
GPR_DEBUG_ASSERT(!shut_down_.Load(MemoryOrder::RELAXED));
}
ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
thd_name_ = "ThreadPoolWorker";
thread_options_ = Thread::Options();
thread_options_.set_stack_size(DefaultStackSize());
SharedThreadPoolConstructor();
}
ThreadPool::ThreadPool(int num_threads, const char* thd_name)
: num_threads_(num_threads), thd_name_(thd_name) {
thread_options_ = Thread::Options();
thread_options_.set_stack_size(DefaultStackSize());
SharedThreadPoolConstructor();
}
ThreadPool::ThreadPool(int num_threads, const char* thd_name,
const Thread::Options& thread_options)
: num_threads_(num_threads),
thd_name_(thd_name),
thread_options_(thread_options) {
if (thread_options_.stack_size() == 0) {
thread_options_.set_stack_size(DefaultStackSize());
}
SharedThreadPoolConstructor();
}
ThreadPool::~ThreadPool() {
// For debug checking purpose, using RELAXED order is sufficient.
shut_down_.Store(true, MemoryOrder::RELAXED);
for (int i = 0; i < num_threads_; ++i) {
queue_->Put(nullptr);
}
for (int i = 0; i < num_threads_; ++i) {
threads_[i]->Join();
}
for (int i = 0; i < num_threads_; ++i) {
Delete(threads_[i]);
}
gpr_free(threads_);
Delete(queue_);
}
void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) {
AssertHasNotBeenShutDown();
queue_->Put(static_cast<void*>(closure));
}
int ThreadPool::num_pending_closures() const { return queue_->count(); }
int ThreadPool::pool_capacity() const { return num_threads_; }
const Thread::Options& ThreadPool::thread_options() const {
return thread_options_;
}
const char* ThreadPool::thread_name() const { return thd_name_; }
} // namespace grpc_core

@ -0,0 +1,153 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor/mpmcqueue.h"
namespace grpc_core {
// A base abstract base class for threadpool.
// Threadpool is an executor that maintains a pool of threads sitting around
// and waiting for closures. A threadpool also maintains a queue of pending
// closures, when closures appearing in the queue, the threads in pool will
// pull them out and execute them.
class ThreadPoolInterface {
public:
// Waits for all pending closures to complete, then shuts down thread pool.
virtual ~ThreadPoolInterface() {}
// Schedules a given closure for execution later.
// Depending on specific subclass implementation, this routine might cause
// current thread to be blocked (in case of unable to schedule).
// Closure should contain a function pointer and arguments it will take, more
// details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
virtual void Add(grpc_experimental_completion_queue_functor* closure)
GRPC_ABSTRACT;
// Returns the current number of pending closures
virtual int num_pending_closures() const GRPC_ABSTRACT;
// Returns the capacity of pool (number of worker threads in pool)
virtual int pool_capacity() const GRPC_ABSTRACT;
// Thread option accessor
virtual const Thread::Options& thread_options() const GRPC_ABSTRACT;
// Returns the thread name for threads in this ThreadPool.
virtual const char* thread_name() const GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
// Worker thread for threadpool. Executes closures in the queue, until getting a
// NULL closure.
class ThreadPoolWorker {
public:
ThreadPoolWorker(const char* thd_name, ThreadPoolInterface* pool,
MPMCQueueInterface* queue, Thread::Options& options,
int index)
: queue_(queue), thd_name_(thd_name), index_(index) {
thd_ = Thread(thd_name,
[](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); },
this, nullptr, options);
}
~ThreadPoolWorker() {}
void Start() { thd_.Start(); }
void Join() { thd_.Join(); }
private:
// struct for tracking stats of thread
struct Stats {
gpr_timespec sleep_time;
Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
};
void Run(); // Pulls closures from queue and executes them
MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from
Thread thd_; // Thread wrapped in
Stats stats_; // Stats to be collected in run time
const char* thd_name_; // Name of thread
int index_; // Index in thread pool
};
// A fixed size thread pool implementation of abstract thread pool interface.
// In this implementation, the number of threads in pool is fixed, but the
// capacity of closure queue is unlimited.
class ThreadPool : public ThreadPoolInterface {
public:
// Creates a thread pool with size of "num_threads", with default thread name
// "ThreadPoolWorker" and all thread options set to default. If the given size
// is 0 or less, there will be 1 worker thread created inside pool.
ThreadPool(int num_threads);
// Same as ThreadPool(int num_threads) constructor, except
// that it also sets "thd_name" as the name of all threads in the thread pool.
ThreadPool(int num_threads, const char* thd_name);
// Same as ThreadPool(const char *thd_name, int num_threads) constructor,
// except that is also set thread_options for threads.
// Notes for stack size:
// If the stack size field of the passed in Thread::Options is set to default
// value 0, default ThreadPool stack size will be used. The current default
// stack size of this implementation is 1952K for mobile platform and 64K for
// all others.
ThreadPool(int num_threads, const char* thd_name,
const Thread::Options& thread_options);
// Waits for all pending closures to complete, then shuts down thread pool.
~ThreadPool() override;
// Adds given closure into pending queue immediately. Since closure queue has
// infinite length, this routine will not block.
void Add(grpc_experimental_completion_queue_functor* closure) override;
int num_pending_closures() const override;
int pool_capacity() const override;
const Thread::Options& thread_options() const override;
const char* thread_name() const override;
private:
int num_threads_ = 0;
const char* thd_name_ = nullptr;
Thread::Options thread_options_;
ThreadPoolWorker** threads_ = nullptr; // Array of worker threads
MPMCQueueInterface* queue_ = nullptr; // Closure queue
Atomic<bool> shut_down_{false}; // Destructor has been called if set to true
void SharedThreadPoolConstructor();
// For ThreadPool, default stack size for mobile platform is 1952K. for other
// platforms is 64K.
size_t DefaultStackSize();
// Internal Use Only for debug checking.
void AssertHasNotBeenShutDown();
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */

@ -102,6 +102,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',

@ -281,6 +281,17 @@ grpc_cc_test(
tags = ["no_windows"],
)
grpc_cc_test(
name = "threadpool_test",
srcs = ["threadpool_test.cc"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "time_averaged_stats_test",
srcs = ["time_averaged_stats_test.cc"],

@ -0,0 +1,192 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/executor/threadpool.h"
#include "test/core/util/test_config.h"
static const int kSmallThreadPoolSize = 20;
static const int kLargeThreadPoolSize = 100;
static const int kThreadSmallIter = 100;
static const int kThreadLargeIter = 10000;
static void test_size_zero(void) {
gpr_log(GPR_INFO, "test_size_zero");
grpc_core::ThreadPool* pool_size_zero =
grpc_core::New<grpc_core::ThreadPool>(0);
GPR_ASSERT(pool_size_zero->pool_capacity() == 1);
Delete(pool_size_zero);
}
static void test_constructor_option(void) {
gpr_log(GPR_INFO, "test_constructor_option");
// Tests options
grpc_core::Thread::Options options;
options.set_stack_size(192 * 1024); // Random non-default value
grpc_core::ThreadPool* pool = grpc_core::New<grpc_core::ThreadPool>(
0, "test_constructor_option", options);
GPR_ASSERT(pool->thread_options().stack_size() == options.stack_size());
Delete(pool);
}
// Simple functor for testing. It will count how many times being called.
class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
public:
friend class SimpleFunctorCheckForAdd;
SimpleFunctorForAdd() {
functor_run = &SimpleFunctorForAdd::Run;
internal_next = this;
internal_success = 0;
}
~SimpleFunctorForAdd() {}
static void Run(struct grpc_experimental_completion_queue_functor* cb,
int ok) {
auto* callback = static_cast<SimpleFunctorForAdd*>(cb);
callback->count_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
}
int count() { return count_.Load(grpc_core::MemoryOrder::RELAXED); }
private:
grpc_core::Atomic<int> count_{0};
};
static void test_add(void) {
gpr_log(GPR_INFO, "test_add");
grpc_core::ThreadPool* pool =
grpc_core::New<grpc_core::ThreadPool>(kSmallThreadPoolSize, "test_add");
SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>();
for (int i = 0; i < kThreadSmallIter; ++i) {
pool->Add(functor);
}
grpc_core::Delete(pool);
GPR_ASSERT(functor->count() == kThreadSmallIter);
grpc_core::Delete(functor);
gpr_log(GPR_DEBUG, "Done.");
}
// Thread that adds closures to pool
class WorkThread {
public:
WorkThread(grpc_core::ThreadPool* pool, SimpleFunctorForAdd* cb, int num_add)
: num_add_(num_add), cb_(cb), pool_(pool) {
thd_ = grpc_core::Thread(
"thread_pool_test_add_thd",
[](void* th) { static_cast<WorkThread*>(th)->Run(); }, this);
}
~WorkThread() {}
void Start() { thd_.Start(); }
void Join() { thd_.Join(); }
private:
void Run() {
for (int i = 0; i < num_add_; ++i) {
pool_->Add(cb_);
}
}
int num_add_;
SimpleFunctorForAdd* cb_;
grpc_core::ThreadPool* pool_;
grpc_core::Thread thd_;
};
static void test_multi_add(void) {
gpr_log(GPR_INFO, "test_multi_add");
const int num_work_thds = 10;
grpc_core::ThreadPool* pool = grpc_core::New<grpc_core::ThreadPool>(
kLargeThreadPoolSize, "test_multi_add");
SimpleFunctorForAdd* functor = grpc_core::New<SimpleFunctorForAdd>();
WorkThread** work_thds = static_cast<WorkThread**>(
gpr_zalloc(sizeof(WorkThread*) * num_work_thds));
gpr_log(GPR_DEBUG, "Fork threads for adding...");
for (int i = 0; i < num_work_thds; ++i) {
work_thds[i] = grpc_core::New<WorkThread>(pool, functor, kThreadLargeIter);
work_thds[i]->Start();
}
// Wait for all threads finish
gpr_log(GPR_DEBUG, "Waiting for all work threads finish...");
for (int i = 0; i < num_work_thds; ++i) {
work_thds[i]->Join();
grpc_core::Delete(work_thds[i]);
}
gpr_free(work_thds);
gpr_log(GPR_DEBUG, "Done.");
gpr_log(GPR_DEBUG, "Waiting for all closures finish...");
// Destructor of thread pool will wait for all closures to finish
grpc_core::Delete(pool);
GPR_ASSERT(functor->count() == kThreadLargeIter * num_work_thds);
grpc_core::Delete(functor);
gpr_log(GPR_DEBUG, "Done.");
}
// Checks the current count with a given number.
class SimpleFunctorCheckForAdd
: public grpc_experimental_completion_queue_functor {
public:
SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) {
functor_run = &SimpleFunctorCheckForAdd::Run;
internal_success = ok;
}
~SimpleFunctorCheckForAdd() {}
static void Run(struct grpc_experimental_completion_queue_functor* cb,
int ok) {
auto* callback = static_cast<SimpleFunctorCheckForAdd*>(cb);
(*callback->count_)++;
GPR_ASSERT(*callback->count_ == callback->internal_success);
}
private:
int* count_;
};
static void test_one_thread_FIFO(void) {
gpr_log(GPR_INFO, "test_one_thread_FIFO");
int counter = 0;
grpc_core::ThreadPool* pool =
grpc_core::New<grpc_core::ThreadPool>(1, "test_one_thread_FIFO");
SimpleFunctorCheckForAdd** check_functors =
static_cast<SimpleFunctorCheckForAdd**>(
gpr_zalloc(sizeof(SimpleFunctorCheckForAdd*) * kThreadSmallIter));
for (int i = 0; i < kThreadSmallIter; ++i) {
check_functors[i] =
grpc_core::New<SimpleFunctorCheckForAdd>(i + 1, &counter);
pool->Add(check_functors[i]);
}
// Destructor of pool will wait until all closures finished.
grpc_core::Delete(pool);
for (int i = 0; i < kThreadSmallIter; ++i) {
grpc_core::Delete(check_functors[i]);
}
gpr_free(check_functors);
gpr_log(GPR_DEBUG, "Done.");
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
test_size_zero();
test_constructor_option();
test_add();
test_multi_add();
test_one_thread_FIFO();
grpc_shutdown();
return 0;
}

@ -1145,6 +1145,7 @@ src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.h \
src/core/lib/iomgr/executor/mpmcqueue.h \
src/core/lib/iomgr/executor/threadpool.h \
src/core/lib/iomgr/gethostname.h \
src/core/lib/iomgr/grpc_if_nametoindex.h \
src/core/lib/iomgr/internal_errqueue.h \

@ -1237,6 +1237,8 @@ src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor.h \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/mpmcqueue.h \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/executor/threadpool.h \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname.h \

@ -2251,6 +2251,22 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "threadpool_test",
"src": [
"test/core/iomgr/threadpool_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -8541,6 +8557,7 @@
"src/core/lib/iomgr/exec_ctx.cc",
"src/core/lib/iomgr/executor.cc",
"src/core/lib/iomgr/executor/mpmcqueue.cc",
"src/core/lib/iomgr/executor/threadpool.cc",
"src/core/lib/iomgr/fork_posix.cc",
"src/core/lib/iomgr/fork_windows.cc",
"src/core/lib/iomgr/gethostname_fallback.cc",
@ -8730,6 +8747,7 @@
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/executor/mpmcqueue.h",
"src/core/lib/iomgr/executor/threadpool.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/grpc_if_nametoindex.h",
"src/core/lib/iomgr/internal_errqueue.h",
@ -8889,6 +8907,7 @@
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/executor/mpmcqueue.h",
"src/core/lib/iomgr/executor/threadpool.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/grpc_if_nametoindex.h",
"src/core/lib/iomgr/internal_errqueue.h",

@ -2793,6 +2793,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "threadpool_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save