diff --git a/BUILD b/BUILD index b5d7761a9a1..474fc053ad6 100644 --- a/BUILD +++ b/BUILD @@ -704,6 +704,7 @@ grpc_cc_library( "src/core/lib/iomgr/exec_ctx.cc", "src/core/lib/iomgr/executor.cc", "src/core/lib/iomgr/executor/mpmcqueue.cc", + "src/core/lib/iomgr/executor/threadpool.cc", "src/core/lib/iomgr/fork_posix.cc", "src/core/lib/iomgr/fork_windows.cc", "src/core/lib/iomgr/gethostname_fallback.cc", @@ -862,6 +863,7 @@ grpc_cc_library( "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/executor/mpmcqueue.h", + "src/core/lib/iomgr/executor/threadpool.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/gevent_util.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", diff --git a/BUILD.gn b/BUILD.gn index 177f4082adf..9dacd14bb45 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -527,6 +527,8 @@ config("grpc_config") { "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/executor/mpmcqueue.cc", "src/core/lib/iomgr/executor/mpmcqueue.h", + "src/core/lib/iomgr/executor/threadpool.cc", + "src/core/lib/iomgr/executor/threadpool.h", "src/core/lib/iomgr/fork_posix.cc", "src/core/lib/iomgr/fork_windows.cc", "src/core/lib/iomgr/gethostname.h", @@ -1242,6 +1244,7 @@ config("grpc_config") { "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/executor/mpmcqueue.h", + "src/core/lib/iomgr/executor/threadpool.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", "src/core/lib/iomgr/internal_errqueue.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index a68f5caff81..726638aa004 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -427,6 +427,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_c tcp_server_posix_test) endif() add_dependencies(buildtests_c tcp_server_uv_test) +add_dependencies(buildtests_c threadpool_test) add_dependencies(buildtests_c time_averaged_stats_test) add_dependencies(buildtests_c timeout_encoding_test) add_dependencies(buildtests_c timer_heap_test) @@ -1035,6 +1036,7 @@ add_library(grpc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/executor/mpmcqueue.cc + src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -1474,6 +1476,7 @@ add_library(grpc_cronet src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/executor/mpmcqueue.cc + src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -1895,6 +1898,7 @@ add_library(grpc_test_util src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/executor/mpmcqueue.cc + src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -2229,6 +2233,7 @@ add_library(grpc_test_util_unsecure src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/executor/mpmcqueue.cc + src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -2539,6 +2544,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/executor/mpmcqueue.cc + src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -3580,6 +3586,7 @@ add_library(grpc++_cronet src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/executor/mpmcqueue.cc + src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -10517,6 +10524,40 @@ target_link_libraries(tcp_server_uv_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(threadpool_test + test/core/iomgr/threadpool_test.cc +) + + +target_include_directories(threadpool_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} +) + +target_link_libraries(threadpool_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr +) + + # avoid dependency on libstdc++ + if (_gRPC_CORE_NOSTDCXX_FLAGS) + set_target_properties(threadpool_test PROPERTIES LINKER_LANGUAGE C) + target_compile_options(threadpool_test PRIVATE $<$:${_gRPC_CORE_NOSTDCXX_FLAGS}>) + endif() + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(time_averaged_stats_test test/core/iomgr/time_averaged_stats_test.cc ) diff --git a/Makefile b/Makefile index 952b20155fd..4ca15ac0f47 100644 --- a/Makefile +++ b/Makefile @@ -1132,6 +1132,7 @@ tcp_client_uv_test: $(BINDIR)/$(CONFIG)/tcp_client_uv_test tcp_posix_test: $(BINDIR)/$(CONFIG)/tcp_posix_test tcp_server_posix_test: $(BINDIR)/$(CONFIG)/tcp_server_posix_test tcp_server_uv_test: $(BINDIR)/$(CONFIG)/tcp_server_uv_test +threadpool_test: $(BINDIR)/$(CONFIG)/threadpool_test time_averaged_stats_test: $(BINDIR)/$(CONFIG)/time_averaged_stats_test timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test timer_heap_test: $(BINDIR)/$(CONFIG)/timer_heap_test @@ -1553,6 +1554,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/tcp_posix_test \ $(BINDIR)/$(CONFIG)/tcp_server_posix_test \ $(BINDIR)/$(CONFIG)/tcp_server_uv_test \ + $(BINDIR)/$(CONFIG)/threadpool_test \ $(BINDIR)/$(CONFIG)/time_averaged_stats_test \ $(BINDIR)/$(CONFIG)/timeout_encoding_test \ $(BINDIR)/$(CONFIG)/timer_heap_test \ @@ -2189,6 +2191,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/tcp_server_posix_test || ( echo test tcp_server_posix_test failed ; exit 1 ) $(E) "[RUN] Testing tcp_server_uv_test" $(Q) $(BINDIR)/$(CONFIG)/tcp_server_uv_test || ( echo test tcp_server_uv_test failed ; exit 1 ) + $(E) "[RUN] Testing threadpool_test" + $(Q) $(BINDIR)/$(CONFIG)/threadpool_test || ( echo test threadpool_test failed ; exit 1 ) $(E) "[RUN] Testing time_averaged_stats_test" $(Q) $(BINDIR)/$(CONFIG)/time_averaged_stats_test || ( echo test time_averaged_stats_test failed ; exit 1 ) $(E) "[RUN] Testing timeout_encoding_test" @@ -3546,6 +3550,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -3976,6 +3981,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -4387,6 +4393,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -4705,6 +4712,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -4986,6 +4994,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -5997,6 +6006,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -13440,6 +13450,38 @@ endif endif +THREADPOOL_TEST_SRC = \ + test/core/iomgr/threadpool_test.cc \ + +THREADPOOL_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(THREADPOOL_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/threadpool_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/threadpool_test: $(THREADPOOL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(THREADPOOL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/threadpool_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/iomgr/threadpool_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_threadpool_test: $(THREADPOOL_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(THREADPOOL_TEST_OBJS:.o=.dep) +endif +endif + + TIME_AVERAGED_STATS_TEST_SRC = \ test/core/iomgr/time_averaged_stats_test.cc \ diff --git a/build.yaml b/build.yaml index c36b3c72594..6d69c419b71 100644 --- a/build.yaml +++ b/build.yaml @@ -281,6 +281,7 @@ filegroups: - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/executor/mpmcqueue.cc + - src/core/lib/iomgr/executor/threadpool.cc - src/core/lib/iomgr/fork_posix.cc - src/core/lib/iomgr/fork_windows.cc - src/core/lib/iomgr/gethostname_fallback.cc @@ -469,6 +470,7 @@ filegroups: - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/executor/mpmcqueue.h + - src/core/lib/iomgr/executor/threadpool.h - src/core/lib/iomgr/gethostname.h - src/core/lib/iomgr/grpc_if_nametoindex.h - src/core/lib/iomgr/internal_errqueue.h @@ -3734,6 +3736,16 @@ targets: - gpr exclude_iomgrs: - native +- name: threadpool_test + build: test + language: c + src: + - test/core/iomgr/threadpool_test.cc + deps: + - grpc_test_util + - grpc + - gpr + uses_polling: false - name: time_averaged_stats_test build: test language: c diff --git a/config.m4 b/config.m4 index 1791a3ca27b..50b789bb7bb 100644 --- a/config.m4 +++ b/config.m4 @@ -128,6 +128,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor/mpmcqueue.cc \ + src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ diff --git a/config.w32 b/config.w32 index ddb5a39bc6c..7048f83facf 100644 --- a/config.w32 +++ b/config.w32 @@ -103,6 +103,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\exec_ctx.cc " + "src\\core\\lib\\iomgr\\executor.cc " + "src\\core\\lib\\iomgr\\executor\\mpmcqueue.cc " + + "src\\core\\lib\\iomgr\\executor\\threadpool.cc " + "src\\core\\lib\\iomgr\\fork_posix.cc " + "src\\core\\lib\\iomgr\\fork_windows.cc " + "src\\core\\lib\\iomgr\\gethostname_fallback.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 76c979b8885..d2680ee5d03 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -479,6 +479,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/executor/mpmcqueue.h', + 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', @@ -685,6 +686,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/executor/mpmcqueue.h', + 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index c3055ae15b9..47cf5b8d47f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -438,6 +438,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/executor/mpmcqueue.h', + 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', @@ -592,6 +593,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor/mpmcqueue.cc', + 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', @@ -1094,6 +1096,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/executor/mpmcqueue.h', + 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', diff --git a/grpc.gemspec b/grpc.gemspec index 0249e0a1fcd..1f4cf237ada 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -372,6 +372,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/exec_ctx.h ) s.files += %w( src/core/lib/iomgr/executor.h ) s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.h ) + s.files += %w( src/core/lib/iomgr/executor/threadpool.h ) s.files += %w( src/core/lib/iomgr/gethostname.h ) s.files += %w( src/core/lib/iomgr/grpc_if_nametoindex.h ) s.files += %w( src/core/lib/iomgr/internal_errqueue.h ) @@ -526,6 +527,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/exec_ctx.cc ) s.files += %w( src/core/lib/iomgr/executor.cc ) s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.cc ) + s.files += %w( src/core/lib/iomgr/executor/threadpool.cc ) s.files += %w( src/core/lib/iomgr/fork_posix.cc ) s.files += %w( src/core/lib/iomgr/fork_windows.cc ) s.files += %w( src/core/lib/iomgr/gethostname_fallback.cc ) diff --git a/grpc.gyp b/grpc.gyp index 0bd5be75f00..5eeab74d4c7 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -310,6 +310,7 @@ 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor/mpmcqueue.cc', + 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', @@ -687,6 +688,7 @@ 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor/mpmcqueue.cc', + 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', @@ -938,6 +940,7 @@ 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor/mpmcqueue.cc', + 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', @@ -1165,6 +1168,7 @@ 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor/mpmcqueue.cc', + 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', diff --git a/package.xml b/package.xml index 4467a11aae4..584c6510a60 100644 --- a/package.xml +++ b/package.xml @@ -377,6 +377,7 @@ + @@ -531,6 +532,7 @@ + diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc index 5eac7e29087..72c318e2bb1 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ b/src/core/lib/iomgr/executor/mpmcqueue.cc @@ -98,14 +98,25 @@ void InfLenFIFOQueue::Put(void* elem) { } } -void* InfLenFIFOQueue::Get() { +void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { MutexLock l(&mu_); + if (count_.Load(MemoryOrder::RELAXED) == 0) { + gpr_timespec start_time; + if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && + wait_time != nullptr) { + start_time = gpr_now(GPR_CLOCK_MONOTONIC); + } + num_waiters_++; do { wait_nonempty_.Wait(&mu_); } while (count_.Load(MemoryOrder::RELAXED) == 0); num_waiters_--; + if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && + wait_time != nullptr) { + *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time); + } } GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0); return PopFront(); diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h index 8ece95699c7..c6102b3add0 100644 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ b/src/core/lib/iomgr/executor/mpmcqueue.h @@ -42,7 +42,8 @@ class MPMCQueueInterface { // Removes the oldest element from the queue and return it. // This might cause to block on empty queue depending on implementation. - virtual void* Get() GRPC_ABSTRACT; + // Optional argument for collecting stats purpose. + virtual void* Get(gpr_timespec* wait_time = nullptr) GRPC_ABSTRACT; // Returns number of elements in the queue currently virtual int count() const GRPC_ABSTRACT; @@ -65,7 +66,9 @@ class InfLenFIFOQueue : public MPMCQueueInterface { // Removes the oldest element from the queue and returns it. // This routine will cause the thread to block if queue is currently empty. - void* Get(); + // Argument wait_time should be passed in when turning on the trace flag + // grpc_thread_pool_trace (for collecting stats info purpose.) + void* Get(gpr_timespec* wait_time = nullptr); // Returns number of elements in queue currently. // There might be concurrently add/remove on queue, so count might change diff --git a/src/core/lib/iomgr/executor/threadpool.cc b/src/core/lib/iomgr/executor/threadpool.cc new file mode 100644 index 00000000000..e203252ef08 --- /dev/null +++ b/src/core/lib/iomgr/executor/threadpool.cc @@ -0,0 +1,138 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/core/lib/iomgr/executor/threadpool.h" + +namespace grpc_core { + +void ThreadPoolWorker::Run() { + while (true) { + void* elem; + + if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { + // Updates stats and print + gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN); + elem = queue_->Get(&wait_time); + stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time); + gpr_log(GPR_INFO, + "ThreadPool Worker [%s %d] Stats: sleep_time %f", + thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time)); + } else { + elem = queue_->Get(nullptr); + } + if (elem == nullptr) { + break; + } + // Runs closure + auto* closure = + static_cast(elem); + closure->functor_run(closure, closure->internal_success); + } +} + +void ThreadPool::SharedThreadPoolConstructor() { + // All worker threads in thread pool must be joinable. + thread_options_.set_joinable(true); + + // Create at least 1 worker thread. + if (num_threads_ <= 0) num_threads_ = 1; + + queue_ = New(); + threads_ = static_cast( + gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*))); + for (int i = 0; i < num_threads_; ++i) { + threads_[i] = + New(thd_name_, this, queue_, thread_options_, i); + threads_[i]->Start(); + } +} + +size_t ThreadPool::DefaultStackSize() { +#if defined(__ANDROID__) || defined(__APPLE__) + return 1952 * 1024; +#else + return 64 * 1024; +#endif +} + +void ThreadPool::AssertHasNotBeenShutDown() { + // For debug checking purpose, using RELAXED order is sufficient. + GPR_DEBUG_ASSERT(!shut_down_.Load(MemoryOrder::RELAXED)); +} + +ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) { + thd_name_ = "ThreadPoolWorker"; + thread_options_ = Thread::Options(); + thread_options_.set_stack_size(DefaultStackSize()); + SharedThreadPoolConstructor(); +} + +ThreadPool::ThreadPool(int num_threads, const char* thd_name) + : num_threads_(num_threads), thd_name_(thd_name) { + thread_options_ = Thread::Options(); + thread_options_.set_stack_size(DefaultStackSize()); + SharedThreadPoolConstructor(); +} + +ThreadPool::ThreadPool(int num_threads, const char* thd_name, + const Thread::Options& thread_options) + : num_threads_(num_threads), + thd_name_(thd_name), + thread_options_(thread_options) { + if (thread_options_.stack_size() == 0) { + thread_options_.set_stack_size(DefaultStackSize()); + } + SharedThreadPoolConstructor(); +} + +ThreadPool::~ThreadPool() { + // For debug checking purpose, using RELAXED order is sufficient. + shut_down_.Store(true, MemoryOrder::RELAXED); + + for (int i = 0; i < num_threads_; ++i) { + queue_->Put(nullptr); + } + + for (int i = 0; i < num_threads_; ++i) { + threads_[i]->Join(); + } + + for (int i = 0; i < num_threads_; ++i) { + Delete(threads_[i]); + } + gpr_free(threads_); + Delete(queue_); +} + +void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) { + AssertHasNotBeenShutDown(); + queue_->Put(static_cast(closure)); +} + +int ThreadPool::num_pending_closures() const { return queue_->count(); } + +int ThreadPool::pool_capacity() const { return num_threads_; } + +const Thread::Options& ThreadPool::thread_options() const { + return thread_options_; +} + +const char* ThreadPool::thread_name() const { return thd_name_; } +} // namespace grpc_core diff --git a/src/core/lib/iomgr/executor/threadpool.h b/src/core/lib/iomgr/executor/threadpool.h new file mode 100644 index 00000000000..7b33fb32f36 --- /dev/null +++ b/src/core/lib/iomgr/executor/threadpool.h @@ -0,0 +1,153 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H +#define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H + +#include + +#include + +#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/executor/mpmcqueue.h" + +namespace grpc_core { + +// A base abstract base class for threadpool. +// Threadpool is an executor that maintains a pool of threads sitting around +// and waiting for closures. A threadpool also maintains a queue of pending +// closures, when closures appearing in the queue, the threads in pool will +// pull them out and execute them. +class ThreadPoolInterface { + public: + // Waits for all pending closures to complete, then shuts down thread pool. + virtual ~ThreadPoolInterface() {} + + // Schedules a given closure for execution later. + // Depending on specific subclass implementation, this routine might cause + // current thread to be blocked (in case of unable to schedule). + // Closure should contain a function pointer and arguments it will take, more + // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h + virtual void Add(grpc_experimental_completion_queue_functor* closure) + GRPC_ABSTRACT; + + // Returns the current number of pending closures + virtual int num_pending_closures() const GRPC_ABSTRACT; + + // Returns the capacity of pool (number of worker threads in pool) + virtual int pool_capacity() const GRPC_ABSTRACT; + + // Thread option accessor + virtual const Thread::Options& thread_options() const GRPC_ABSTRACT; + + // Returns the thread name for threads in this ThreadPool. + virtual const char* thread_name() const GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; + +// Worker thread for threadpool. Executes closures in the queue, until getting a +// NULL closure. +class ThreadPoolWorker { + public: + ThreadPoolWorker(const char* thd_name, ThreadPoolInterface* pool, + MPMCQueueInterface* queue, Thread::Options& options, + int index) + : queue_(queue), thd_name_(thd_name), index_(index) { + thd_ = Thread(thd_name, + [](void* th) { static_cast(th)->Run(); }, + this, nullptr, options); + } + + ~ThreadPoolWorker() {} + + void Start() { thd_.Start(); } + void Join() { thd_.Join(); } + + private: + // struct for tracking stats of thread + struct Stats { + gpr_timespec sleep_time; + Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); } + }; + + void Run(); // Pulls closures from queue and executes them + + MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from + Thread thd_; // Thread wrapped in + Stats stats_; // Stats to be collected in run time + const char* thd_name_; // Name of thread + int index_; // Index in thread pool +}; + +// A fixed size thread pool implementation of abstract thread pool interface. +// In this implementation, the number of threads in pool is fixed, but the +// capacity of closure queue is unlimited. +class ThreadPool : public ThreadPoolInterface { + public: + // Creates a thread pool with size of "num_threads", with default thread name + // "ThreadPoolWorker" and all thread options set to default. If the given size + // is 0 or less, there will be 1 worker thread created inside pool. + ThreadPool(int num_threads); + + // Same as ThreadPool(int num_threads) constructor, except + // that it also sets "thd_name" as the name of all threads in the thread pool. + ThreadPool(int num_threads, const char* thd_name); + + // Same as ThreadPool(const char *thd_name, int num_threads) constructor, + // except that is also set thread_options for threads. + // Notes for stack size: + // If the stack size field of the passed in Thread::Options is set to default + // value 0, default ThreadPool stack size will be used. The current default + // stack size of this implementation is 1952K for mobile platform and 64K for + // all others. + ThreadPool(int num_threads, const char* thd_name, + const Thread::Options& thread_options); + + // Waits for all pending closures to complete, then shuts down thread pool. + ~ThreadPool() override; + + // Adds given closure into pending queue immediately. Since closure queue has + // infinite length, this routine will not block. + void Add(grpc_experimental_completion_queue_functor* closure) override; + + int num_pending_closures() const override; + int pool_capacity() const override; + const Thread::Options& thread_options() const override; + const char* thread_name() const override; + + private: + int num_threads_ = 0; + const char* thd_name_ = nullptr; + Thread::Options thread_options_; + ThreadPoolWorker** threads_ = nullptr; // Array of worker threads + MPMCQueueInterface* queue_ = nullptr; // Closure queue + + Atomic shut_down_{false}; // Destructor has been called if set to true + + void SharedThreadPoolConstructor(); + // For ThreadPool, default stack size for mobile platform is 1952K. for other + // platforms is 64K. + size_t DefaultStackSize(); + // Internal Use Only for debug checking. + void AssertHasNotBeenShutDown(); +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 72e19f02081..6a8b208b914 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -102,6 +102,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor/mpmcqueue.cc', + 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index af67d5de25e..808a635b80a 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -281,6 +281,17 @@ grpc_cc_test( tags = ["no_windows"], ) +grpc_cc_test( + name = "threadpool_test", + srcs = ["threadpool_test.cc"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "time_averaged_stats_test", srcs = ["time_averaged_stats_test.cc"], diff --git a/test/core/iomgr/threadpool_test.cc b/test/core/iomgr/threadpool_test.cc new file mode 100644 index 00000000000..ac94af170e2 --- /dev/null +++ b/test/core/iomgr/threadpool_test.cc @@ -0,0 +1,192 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/executor/threadpool.h" + +#include "test/core/util/test_config.h" + +static const int kSmallThreadPoolSize = 20; +static const int kLargeThreadPoolSize = 100; +static const int kThreadSmallIter = 100; +static const int kThreadLargeIter = 10000; + +static void test_size_zero(void) { + gpr_log(GPR_INFO, "test_size_zero"); + grpc_core::ThreadPool* pool_size_zero = + grpc_core::New(0); + GPR_ASSERT(pool_size_zero->pool_capacity() == 1); + Delete(pool_size_zero); +} + +static void test_constructor_option(void) { + gpr_log(GPR_INFO, "test_constructor_option"); + // Tests options + grpc_core::Thread::Options options; + options.set_stack_size(192 * 1024); // Random non-default value + grpc_core::ThreadPool* pool = grpc_core::New( + 0, "test_constructor_option", options); + GPR_ASSERT(pool->thread_options().stack_size() == options.stack_size()); + Delete(pool); +} + +// Simple functor for testing. It will count how many times being called. +class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor { + public: + friend class SimpleFunctorCheckForAdd; + SimpleFunctorForAdd() { + functor_run = &SimpleFunctorForAdd::Run; + internal_next = this; + internal_success = 0; + } + ~SimpleFunctorForAdd() {} + static void Run(struct grpc_experimental_completion_queue_functor* cb, + int ok) { + auto* callback = static_cast(cb); + callback->count_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); + } + + int count() { return count_.Load(grpc_core::MemoryOrder::RELAXED); } + + private: + grpc_core::Atomic count_{0}; +}; + +static void test_add(void) { + gpr_log(GPR_INFO, "test_add"); + grpc_core::ThreadPool* pool = + grpc_core::New(kSmallThreadPoolSize, "test_add"); + + SimpleFunctorForAdd* functor = grpc_core::New(); + for (int i = 0; i < kThreadSmallIter; ++i) { + pool->Add(functor); + } + grpc_core::Delete(pool); + GPR_ASSERT(functor->count() == kThreadSmallIter); + grpc_core::Delete(functor); + gpr_log(GPR_DEBUG, "Done."); +} + +// Thread that adds closures to pool +class WorkThread { + public: + WorkThread(grpc_core::ThreadPool* pool, SimpleFunctorForAdd* cb, int num_add) + : num_add_(num_add), cb_(cb), pool_(pool) { + thd_ = grpc_core::Thread( + "thread_pool_test_add_thd", + [](void* th) { static_cast(th)->Run(); }, this); + } + ~WorkThread() {} + + void Start() { thd_.Start(); } + void Join() { thd_.Join(); } + + private: + void Run() { + for (int i = 0; i < num_add_; ++i) { + pool_->Add(cb_); + } + } + + int num_add_; + SimpleFunctorForAdd* cb_; + grpc_core::ThreadPool* pool_; + grpc_core::Thread thd_; +}; + +static void test_multi_add(void) { + gpr_log(GPR_INFO, "test_multi_add"); + const int num_work_thds = 10; + grpc_core::ThreadPool* pool = grpc_core::New( + kLargeThreadPoolSize, "test_multi_add"); + SimpleFunctorForAdd* functor = grpc_core::New(); + WorkThread** work_thds = static_cast( + gpr_zalloc(sizeof(WorkThread*) * num_work_thds)); + gpr_log(GPR_DEBUG, "Fork threads for adding..."); + for (int i = 0; i < num_work_thds; ++i) { + work_thds[i] = grpc_core::New(pool, functor, kThreadLargeIter); + work_thds[i]->Start(); + } + // Wait for all threads finish + gpr_log(GPR_DEBUG, "Waiting for all work threads finish..."); + for (int i = 0; i < num_work_thds; ++i) { + work_thds[i]->Join(); + grpc_core::Delete(work_thds[i]); + } + gpr_free(work_thds); + gpr_log(GPR_DEBUG, "Done."); + gpr_log(GPR_DEBUG, "Waiting for all closures finish..."); + // Destructor of thread pool will wait for all closures to finish + grpc_core::Delete(pool); + GPR_ASSERT(functor->count() == kThreadLargeIter * num_work_thds); + grpc_core::Delete(functor); + gpr_log(GPR_DEBUG, "Done."); +} + +// Checks the current count with a given number. +class SimpleFunctorCheckForAdd + : public grpc_experimental_completion_queue_functor { + public: + SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) { + functor_run = &SimpleFunctorCheckForAdd::Run; + internal_success = ok; + } + ~SimpleFunctorCheckForAdd() {} + static void Run(struct grpc_experimental_completion_queue_functor* cb, + int ok) { + auto* callback = static_cast(cb); + (*callback->count_)++; + GPR_ASSERT(*callback->count_ == callback->internal_success); + } + + private: + int* count_; +}; + +static void test_one_thread_FIFO(void) { + gpr_log(GPR_INFO, "test_one_thread_FIFO"); + int counter = 0; + grpc_core::ThreadPool* pool = + grpc_core::New(1, "test_one_thread_FIFO"); + SimpleFunctorCheckForAdd** check_functors = + static_cast( + gpr_zalloc(sizeof(SimpleFunctorCheckForAdd*) * kThreadSmallIter)); + for (int i = 0; i < kThreadSmallIter; ++i) { + check_functors[i] = + grpc_core::New(i + 1, &counter); + pool->Add(check_functors[i]); + } + // Destructor of pool will wait until all closures finished. + grpc_core::Delete(pool); + for (int i = 0; i < kThreadSmallIter; ++i) { + grpc_core::Delete(check_functors[i]); + } + gpr_free(check_functors); + gpr_log(GPR_DEBUG, "Done."); +} + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(argc, argv); + grpc_init(); + test_size_zero(); + test_constructor_option(); + test_add(); + test_multi_add(); + test_one_thread_FIFO(); + grpc_shutdown(); + return 0; +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 626887290be..96da09eefc6 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1145,6 +1145,7 @@ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.h \ src/core/lib/iomgr/executor/mpmcqueue.h \ +src/core/lib/iomgr/executor/threadpool.h \ src/core/lib/iomgr/gethostname.h \ src/core/lib/iomgr/grpc_if_nametoindex.h \ src/core/lib/iomgr/internal_errqueue.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 5ac8256ce4b..9bb01cb67f8 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1237,6 +1237,8 @@ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor.h \ src/core/lib/iomgr/executor/mpmcqueue.cc \ src/core/lib/iomgr/executor/mpmcqueue.h \ +src/core/lib/iomgr/executor/threadpool.cc \ +src/core/lib/iomgr/executor/threadpool.h \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 0b56c7b32df..b2feea44d53 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -2251,6 +2251,22 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "threadpool_test", + "src": [ + "test/core/iomgr/threadpool_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -8541,6 +8557,7 @@ "src/core/lib/iomgr/exec_ctx.cc", "src/core/lib/iomgr/executor.cc", "src/core/lib/iomgr/executor/mpmcqueue.cc", + "src/core/lib/iomgr/executor/threadpool.cc", "src/core/lib/iomgr/fork_posix.cc", "src/core/lib/iomgr/fork_windows.cc", "src/core/lib/iomgr/gethostname_fallback.cc", @@ -8730,6 +8747,7 @@ "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/executor/mpmcqueue.h", + "src/core/lib/iomgr/executor/threadpool.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", "src/core/lib/iomgr/internal_errqueue.h", @@ -8889,6 +8907,7 @@ "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/executor/mpmcqueue.h", + "src/core/lib/iomgr/executor/threadpool.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", "src/core/lib/iomgr/internal_errqueue.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f4757e5ef62..d4a19741729 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2793,6 +2793,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "threadpool_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,