From 2d00d50c5903952c40a2b63d82f73ba11e5b77e5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 5 Oct 2022 00:17:23 -0700 Subject: [PATCH] [event_engine] Improve scaling in threadpool (#31234) * fixes * Automated change: Fix sanity tests * fix * fix * fix * fix * fixes * fixes * fix * fix-win * fix iwyu * fix * fix Co-authored-by: ctiller --- BUILD | 25 +--- CMakeLists.txt | 4 +- Makefile | 2 - build_autogenerated.yaml | 8 +- config.m4 | 2 - config.w32 | 2 - gRPC-C++.podspec | 2 - gRPC-Core.podspec | 3 - grpc.gemspec | 2 - grpc.gyp | 2 - package.xml | 2 - .../executor/threaded_executor.cc | 36 ------ .../event_engine/executor/threaded_executor.h | 44 ------- .../event_engine/posix_engine/posix_engine.cc | 1 - .../event_engine/posix_engine/posix_engine.h | 4 +- src/core/lib/event_engine/thread_pool.cc | 120 ++++++++++++++---- src/core/lib/event_engine/thread_pool.h | 35 +++-- .../event_engine/windows/windows_engine.cc | 1 - .../lib/event_engine/windows/windows_engine.h | 4 +- src/python/grpcio/grpc_core_dependencies.py | 1 - test/core/end2end/tests/hpack_size.cc | 9 +- test/core/event_engine/thread_pool_test.cc | 34 ++--- test/core/event_engine/windows/iocp_test.cc | 16 +-- .../event_engine/windows/win_socket_test.cc | 8 +- test/cpp/microbenchmarks/BUILD | 4 + tools/doxygen/Doxyfile.c++.internal | 2 - tools/doxygen/Doxyfile.core.internal | 2 - 27 files changed, 178 insertions(+), 197 deletions(-) delete mode 100644 src/core/lib/event_engine/executor/threaded_executor.cc delete mode 100644 src/core/lib/event_engine/executor/threaded_executor.h diff --git a/BUILD b/BUILD index 67d8be4e24d..22f49c44ff4 100644 --- a/BUILD +++ b/BUILD @@ -2445,23 +2445,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "event_engine_threaded_executor", - srcs = [ - "src/core/lib/event_engine/executor/threaded_executor.cc", - ], - hdrs = [ - "src/core/lib/event_engine/executor/threaded_executor.h", - ], - external_deps = ["absl/functional:any_invocable"], - deps = [ - "event_engine_base_hdrs", - "event_engine_executor", - "event_engine_thread_pool", - "gpr_platform", - ], -) - grpc_cc_library( name = "common_event_engine_closures", hdrs = ["src/core/lib/event_engine/common_closures.h"], @@ -2507,8 +2490,12 @@ grpc_cc_library( "absl/time", ], deps = [ + "event_engine_base_hdrs", + "event_engine_executor", "forkable", "gpr", + "time", + "useful", ], ) @@ -2884,7 +2871,7 @@ grpc_cc_library( deps = [ "event_engine_base_hdrs", "event_engine_common", - "event_engine_threaded_executor", + "event_engine_thread_pool", "event_engine_trace", "event_engine_utils", "gpr", @@ -2906,7 +2893,7 @@ grpc_cc_library( deps = [ "event_engine_base_hdrs", "event_engine_common", - "event_engine_threaded_executor", + "event_engine_thread_pool", "event_engine_trace", "event_engine_utils", "gpr", diff --git a/CMakeLists.txt b/CMakeLists.txt index 31e42a29f65..629b7c62260 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2116,7 +2116,6 @@ add_library(grpc src/core/lib/event_engine/channel_args_endpoint_config.cc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc - src/core/lib/event_engine/executor/threaded_executor.cc src/core/lib/event_engine/forkable.cc src/core/lib/event_engine/memory_allocator.cc src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -2725,7 +2724,6 @@ add_library(grpc_unsecure src/core/lib/event_engine/channel_args_endpoint_config.cc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc - src/core/lib/event_engine/executor/threaded_executor.cc src/core/lib/event_engine/forkable.cc src/core/lib/event_engine/memory_allocator.cc src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -18549,6 +18547,7 @@ if(gRPC_BUILD_TESTS) add_executable(thread_pool_test src/core/lib/event_engine/forkable.cc src/core/lib/event_engine/thread_pool.cc + src/core/lib/gprpp/time.cc test/core/event_engine/thread_pool_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -18578,6 +18577,7 @@ target_link_libraries(thread_pool_test ${_gRPC_ALLTARGETS_LIBRARIES} absl::flat_hash_set absl::any_invocable + absl::statusor gpr ) diff --git a/Makefile b/Makefile index 0bdcaec8008..73d333c34fd 100644 --- a/Makefile +++ b/Makefile @@ -1390,7 +1390,6 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ - src/core/lib/event_engine/executor/threaded_executor.cc \ src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ @@ -1862,7 +1861,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ - src/core/lib/event_engine/executor/threaded_executor.cc \ src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 4aaac2e6deb..aa8f245b708 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -735,7 +735,6 @@ libs: - src/core/lib/event_engine/default_event_engine.h - src/core/lib/event_engine/default_event_engine_factory.h - src/core/lib/event_engine/executor/executor.h - - src/core/lib/event_engine/executor/threaded_executor.h - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h @@ -1449,7 +1448,6 @@ libs: - src/core/lib/event_engine/channel_args_endpoint_config.cc - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc - - src/core/lib/event_engine/executor/threaded_executor.cc - src/core/lib/event_engine/forkable.cc - src/core/lib/event_engine/memory_allocator.cc - src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -1937,7 +1935,6 @@ libs: - src/core/lib/event_engine/default_event_engine.h - src/core/lib/event_engine/default_event_engine_factory.h - src/core/lib/event_engine/executor/executor.h - - src/core/lib/event_engine/executor/threaded_executor.h - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h @@ -2292,7 +2289,6 @@ libs: - src/core/lib/event_engine/channel_args_endpoint_config.cc - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc - - src/core/lib/event_engine/executor/threaded_executor.cc - src/core/lib/event_engine/forkable.cc - src/core/lib/event_engine/memory_allocator.cc - src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -10099,16 +10095,20 @@ targets: build: test language: c++ headers: + - src/core/lib/event_engine/executor/executor.h - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/thread_pool.h - src/core/lib/gprpp/notification.h + - src/core/lib/gprpp/time.h src: - src/core/lib/event_engine/forkable.cc - src/core/lib/event_engine/thread_pool.cc + - src/core/lib/gprpp/time.cc - test/core/event_engine/thread_pool_test.cc deps: - absl/container:flat_hash_set - absl/functional:any_invocable + - absl/status:statusor - gpr - name: thread_quota_test gtest: true diff --git a/config.m4 b/config.m4 index 5d04f7a8975..32b303a8b0a 100644 --- a/config.m4 +++ b/config.m4 @@ -472,7 +472,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ - src/core/lib/event_engine/executor/threaded_executor.cc \ src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ @@ -1335,7 +1334,6 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/config) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/debug) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine) - PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/executor) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/posix_engine) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/windows) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/experiments) diff --git a/config.w32 b/config.w32 index 2bcaeb8f282..2074f6d2430 100644 --- a/config.w32 +++ b/config.w32 @@ -438,7 +438,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\channel_args_endpoint_config.cc " + "src\\core\\lib\\event_engine\\default_event_engine.cc " + "src\\core\\lib\\event_engine\\default_event_engine_factory.cc " + - "src\\core\\lib\\event_engine\\executor\\threaded_executor.cc " + "src\\core\\lib\\event_engine\\forkable.cc " + "src\\core\\lib\\event_engine\\memory_allocator.cc " + "src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " + @@ -1457,7 +1456,6 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\config"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\debug"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine"); - FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\executor"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\posix_engine"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\windows"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\experiments"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 8c89429eae6..e1cfca4ced3 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -685,7 +685,6 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine.h', 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', - 'src/core/lib/event_engine/executor/threaded_executor.h', 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', @@ -1550,7 +1549,6 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine.h', 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', - 'src/core/lib/event_engine/executor/threaded_executor.h', 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 256d0778d6f..3a1e0388c07 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1054,8 +1054,6 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine_factory.cc', 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', - 'src/core/lib/event_engine/executor/threaded_executor.cc', - 'src/core/lib/event_engine/executor/threaded_executor.h', 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', @@ -2179,7 +2177,6 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine.h', 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', - 'src/core/lib/event_engine/executor/threaded_executor.h', 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', diff --git a/grpc.gemspec b/grpc.gemspec index bc4e3da4294..f90e4bc5a4a 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -965,8 +965,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/default_event_engine_factory.cc ) s.files += %w( src/core/lib/event_engine/default_event_engine_factory.h ) s.files += %w( src/core/lib/event_engine/executor/executor.h ) - s.files += %w( src/core/lib/event_engine/executor/threaded_executor.cc ) - s.files += %w( src/core/lib/event_engine/executor/threaded_executor.h ) s.files += %w( src/core/lib/event_engine/forkable.cc ) s.files += %w( src/core/lib/event_engine/forkable.h ) s.files += %w( src/core/lib/event_engine/handle_containers.h ) diff --git a/grpc.gyp b/grpc.gyp index 43e04c28795..72bbcdb5799 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -805,7 +805,6 @@ 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', - 'src/core/lib/event_engine/executor/threaded_executor.cc', 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', @@ -1256,7 +1255,6 @@ 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', - 'src/core/lib/event_engine/executor/threaded_executor.cc', 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', diff --git a/package.xml b/package.xml index 4b7c9c2f6cc..8689ca94c9e 100644 --- a/package.xml +++ b/package.xml @@ -947,8 +947,6 @@ - - diff --git a/src/core/lib/event_engine/executor/threaded_executor.cc b/src/core/lib/event_engine/executor/threaded_executor.cc deleted file mode 100644 index 8274db56cfb..00000000000 --- a/src/core/lib/event_engine/executor/threaded_executor.cc +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include "src/core/lib/event_engine/executor/threaded_executor.h" - -#include - -namespace grpc_event_engine { -namespace experimental { - -ThreadedExecutor::ThreadedExecutor(int reserve_threads) - : thread_pool_(reserve_threads){}; - -void ThreadedExecutor::Run(EventEngine::Closure* closure) { - thread_pool_.Add([closure]() { closure->Run(); }); -} - -void ThreadedExecutor::Run(absl::AnyInvocable closure) { - thread_pool_.Add(std::move(closure)); -} - -} // namespace experimental -} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/executor/threaded_executor.h b/src/core/lib/event_engine/executor/threaded_executor.h deleted file mode 100644 index 05dfeb2963a..00000000000 --- a/src/core/lib/event_engine/executor/threaded_executor.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H -#define GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H - -#include - -#include "absl/functional/any_invocable.h" - -#include - -#include "src/core/lib/event_engine/executor/executor.h" -#include "src/core/lib/event_engine/thread_pool.h" - -namespace grpc_event_engine { -namespace experimental { - -class ThreadedExecutor : public Executor { - public: - explicit ThreadedExecutor(int reserve_threads); - ~ThreadedExecutor() override = default; - void Run(EventEngine::Closure* closure) override; - void Run(absl::AnyInvocable closure) override; - - private: - ThreadPool thread_pool_; -}; - -} // namespace experimental -} // namespace grpc_event_engine - -#endif // GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H \ No newline at end of file diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 07577b6a97b..eb5508cdae2 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -25,7 +25,6 @@ #include #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/executor/threaded_executor.h" #include "src/core/lib/event_engine/posix_engine/timer.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/utils.h" diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index cfae3617865..babb197349d 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -31,9 +31,9 @@ #include #include -#include "src/core/lib/event_engine/executor/threaded_executor.h" #include "src/core/lib/event_engine/handle_containers.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" +#include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/gprpp/sync.h" namespace grpc_event_engine { @@ -111,7 +111,7 @@ class PosixEventEngine final : public EventEngine { TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); std::atomic aba_token_{0}; posix_engine::TimerManager timer_manager_; - ThreadedExecutor executor_{2}; + ThreadPool executor_; }; } // namespace experimental diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc index 9bb8889de31..774122bd005 100644 --- a/src/core/lib/event_engine/thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool.cc @@ -20,15 +20,18 @@ #include "src/core/lib/event_engine/thread_pool.h" +#include #include #include +#include "absl/base/attributes.h" #include "absl/time/clock.h" #include "absl/time/time.h" #include #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/gprpp/time.h" namespace grpc_event_engine { namespace experimental { @@ -39,29 +42,60 @@ namespace { thread_local bool g_threadpool_thread; } // namespace -void ThreadPool::StartThread(StatePtr state, bool throttled) { +void ThreadPool::StartThread(StatePtr state, StartThreadReason reason) { state->thread_count.Add(); - if (throttled && state->currently_starting_one_thread.exchange( - true, std::memory_order_relaxed)) { - state->thread_count.Remove(); - return; + const auto now = grpc_core::Timestamp::Now(); + switch (reason) { + case StartThreadReason::kNoWaitersWhenScheduling: { + auto time_since_last_start = + now - grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( + state->last_started_thread.load(std::memory_order_relaxed)); + if (time_since_last_start < grpc_core::Duration::Seconds(1)) { + state->thread_count.Remove(); + return; + } + } + ABSL_FALLTHROUGH_INTENDED; + case StartThreadReason::kNoWaitersWhenFinishedStarting: + if (state->currently_starting_one_thread.exchange( + true, std::memory_order_relaxed)) { + state->thread_count.Remove(); + return; + } + state->last_started_thread.store(now.milliseconds_after_process_epoch(), + std::memory_order_relaxed); + break; + case StartThreadReason::kInitialPool: + break; } struct ThreadArg { StatePtr state; - bool throttled; + StartThreadReason reason; }; grpc_core::Thread( "event_engine", [](void* arg) { std::unique_ptr a(static_cast(arg)); g_threadpool_thread = true; - if (a->throttled) { - GPR_ASSERT(a->state->currently_starting_one_thread.exchange( - false, std::memory_order_relaxed)); + switch (a->reason) { + case StartThreadReason::kInitialPool: + break; + case StartThreadReason::kNoWaitersWhenFinishedStarting: + a->state->queue.SleepIfRunning(); + ABSL_FALLTHROUGH_INTENDED; + case StartThreadReason::kNoWaitersWhenScheduling: + // Release throttling variable + GPR_ASSERT(a->state->currently_starting_one_thread.exchange( + false, std::memory_order_relaxed)); + if (a->state->queue.IsBacklogged()) { + StartThread(a->state, + StartThreadReason::kNoWaitersWhenFinishedStarting); + } + break; } ThreadFunc(a->state); }, - new ThreadArg{state, throttled}, nullptr, + new ThreadArg{state, reason}, nullptr, grpc_core::Thread::Options().set_tracked(false).set_joinable(false)) .Start(); } @@ -78,10 +112,18 @@ bool ThreadPool::Queue::Step() { while (state_ == State::kRunning && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread. // TODO(ctiller): wait some time in this case to be sure. - if (threads_waiting_ >= reserve_threads_) return false; - threads_waiting_++; - cv_.Wait(&mu_); - threads_waiting_--; + if (threads_waiting_ >= reserve_threads_) { + threads_waiting_++; + bool timeout = cv_.WaitWithTimeout(&mu_, absl::Seconds(30)); + threads_waiting_--; + if (timeout && threads_waiting_ >= reserve_threads_) { + return false; + } + } else { + threads_waiting_++; + cv_.Wait(&mu_); + threads_waiting_--; + } } switch (state_) { case State::kRunning: @@ -99,10 +141,9 @@ bool ThreadPool::Queue::Step() { return true; } -ThreadPool::ThreadPool(int reserve_threads) - : reserve_threads_(reserve_threads) { - for (int i = 0; i < reserve_threads; i++) { - StartThread(state_, /*throttled=*/false); +ThreadPool::ThreadPool() { + for (unsigned i = 0; i < reserve_threads_; i++) { + StartThread(state_, StartThreadReason::kInitialPool); } } @@ -116,12 +157,16 @@ ThreadPool::~ThreadPool() { "shutting down"); } -void ThreadPool::Add(absl::AnyInvocable callback) { +void ThreadPool::Run(absl::AnyInvocable callback) { if (state_->queue.Add(std::move(callback))) { - StartThread(state_, /*throttled=*/true); + StartThread(state_, StartThreadReason::kNoWaitersWhenScheduling); } } +void ThreadPool::Run(EventEngine::Closure* closure) { + Run([closure]() { closure->Run(); }); +} + bool ThreadPool::Queue::Add(absl::AnyInvocable callback) { grpc_core::MutexLock lock(&mu_); // Add works to the callbacks list @@ -130,13 +175,42 @@ bool ThreadPool::Queue::Add(absl::AnyInvocable callback) { switch (state_) { case State::kRunning: case State::kShutdown: - return threads_waiting_ == 0; + return callbacks_.size() > threads_waiting_; case State::kForking: return false; } GPR_UNREACHABLE_CODE(return false); } +bool ThreadPool::Queue::IsBacklogged() { + grpc_core::MutexLock lock(&mu_); + switch (state_) { + case State::kRunning: + case State::kShutdown: + return callbacks_.size() > 1; + case State::kForking: + return false; + } + GPR_UNREACHABLE_CODE(return false); +} + +void ThreadPool::Queue::SleepIfRunning() { + grpc_core::MutexLock lock(&mu_); + auto end = grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(); + while (true) { + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); + if (now >= end) return; + switch (state_) { + case State::kRunning: + case State::kShutdown: + cv_.WaitWithTimeout(&mu_, absl::Milliseconds((end - now).millis())); + break; + case State::kForking: + return; + } + } +} + void ThreadPool::Queue::SetState(State state) { grpc_core::MutexLock lock(&mu_); if (state == State::kRunning) { @@ -187,8 +261,8 @@ void ThreadPool::PostforkChild() { Postfork(); } void ThreadPool::Postfork() { state_->queue.Reset(); - for (int i = 0; i < reserve_threads_; i++) { - StartThread(state_, /*throttled=*/false); + for (unsigned i = 0; i < reserve_threads_; i++) { + StartThread(state_, StartThreadReason::kInitialPool); } } diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h index f94f7ecffa3..4b93b5a443a 100644 --- a/src/core/lib/event_engine/thread_pool.h +++ b/src/core/lib/event_engine/thread_pool.h @@ -21,6 +21,8 @@ #include +#include + #include #include #include @@ -28,19 +30,25 @@ #include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" +#include +#include + +#include "src/core/lib/event_engine/executor/executor.h" #include "src/core/lib/event_engine/forkable.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/sync.h" namespace grpc_event_engine { namespace experimental { -class ThreadPool final : public grpc_event_engine::experimental::Forkable { +class ThreadPool final : public Forkable, public Executor { public: - explicit ThreadPool(int reserve_threads); + ThreadPool(); // Ensures the thread pool is empty before destroying it. ~ThreadPool() override; - void Add(absl::AnyInvocable callback); + void Run(absl::AnyInvocable callback) override; + void Run(EventEngine::Closure* closure) override; // Forkable // Ensures that the thread pool is empty before forking. @@ -51,7 +59,8 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { private: class Queue { public: - explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {} + explicit Queue(unsigned reserve_threads) + : reserve_threads_(reserve_threads) {} bool Step(); void SetShutdown() { SetState(State::kShutdown); } void SetForking() { SetState(State::kForking); } @@ -59,6 +68,8 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { // Return true if we should also spin up a new thread. bool Add(absl::AnyInvocable callback); void Reset() { SetState(State::kRunning); } + bool IsBacklogged(); + void SleepIfRunning(); private: enum class State { kRunning, kShutdown, kForking }; @@ -68,8 +79,8 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { grpc_core::Mutex mu_; grpc_core::CondVar cv_; std::queue> callbacks_ ABSL_GUARDED_BY(mu_); - int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0; - const int reserve_threads_; + unsigned threads_waiting_ ABSL_GUARDED_BY(mu_) = 0; + const unsigned reserve_threads_; State state_ ABSL_GUARDED_BY(mu_) = State::kRunning; }; @@ -92,19 +103,27 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { // After pool creation we use this to rate limit creation of threads to one // at a time. std::atomic currently_starting_one_thread{false}; + std::atomic last_started_thread{0}; }; using StatePtr = std::shared_ptr; + enum class StartThreadReason { + kInitialPool, + kNoWaitersWhenScheduling, + kNoWaitersWhenFinishedStarting, + }; + static void ThreadFunc(StatePtr state); // Start a new thread; throttled indicates whether the State::starting_thread // variable is being used to throttle this threads creation against others or // not: at thread pool startup we start several threads concurrently, but // after that we only start one at a time. - static void StartThread(StatePtr state, bool throttled); + static void StartThread(StatePtr state, StartThreadReason reason); void Postfork(); - const int reserve_threads_; + const unsigned reserve_threads_ = + grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 32u); const StatePtr state_ = std::make_shared(reserve_threads_); }; diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 65e19fe8a31..22500290371 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -26,7 +26,6 @@ #include #include -#include "src/core/lib/event_engine/executor/threaded_executor.h" #include "src/core/lib/event_engine/handle_containers.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/event_engine/trace.h" diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h index 26cf816110f..00fb35203f2 100644 --- a/src/core/lib/event_engine/windows/windows_engine.h +++ b/src/core/lib/event_engine/windows/windows_engine.h @@ -28,9 +28,9 @@ #include #include -#include "src/core/lib/event_engine/executor/threaded_executor.h" #include "src/core/lib/event_engine/handle_containers.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" +#include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" @@ -108,7 +108,7 @@ class WindowsEventEngine : public EventEngine { std::atomic aba_token_{0}; posix_engine::TimerManager timer_manager_; - ThreadedExecutor executor_{2}; + ThreadPool executor_; IOCP iocp_; }; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 8de01f66227..cced637f344 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -447,7 +447,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', - 'src/core/lib/event_engine/executor/threaded_executor.cc', 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', diff --git a/test/core/end2end/tests/hpack_size.cc b/test/core/end2end/tests/hpack_size.cc index 0bc80af2268..aa819ef9aff 100644 --- a/test/core/end2end/tests/hpack_size.cc +++ b/test/core/end2end/tests/hpack_size.cc @@ -34,9 +34,11 @@ #include #include #include +#include #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/time.h" #include "test/core/end2end/cq_verifier.h" #include "test/core/end2end/end2end_tests.h" #include "test/core/util/test_config.h" @@ -265,10 +267,9 @@ static void simple_request_body(grpc_end2end_test_config /*config*/, extra_metadata[2].value = grpc_slice_from_static_string(dragons[index % GPR_ARRAY_SIZE(dragons)]); - gpr_timespec deadline = five_seconds_from_now(); c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, grpc_slice_from_static_string("/foo"), nullptr, - deadline, nullptr); + gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr); GPR_ASSERT(c); grpc_metadata_array_init(&initial_metadata_recv); @@ -309,7 +310,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/, &request_metadata_recv, f.cq, f.cq, tag(101)); GPR_ASSERT(GRPC_CALL_OK == error); cqv.Expect(tag(101), true); - cqv.Verify(); + cqv.Verify(grpc_core::Duration::Seconds(120)); memset(ops, 0, sizeof(ops)); op = ops; @@ -337,7 +338,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/, cqv.Expect(tag(102), true); cqv.Expect(tag(1), true); - cqv.Verify(); + cqv.Verify(grpc_core::Duration::Seconds(120)); GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index ccffd884da4..605600ba375 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -29,16 +29,16 @@ namespace grpc_event_engine { namespace experimental { TEST(ThreadPoolTest, CanRunClosure) { - ThreadPool p(1); + ThreadPool p; grpc_core::Notification n; - p.Add([&n] { n.Notify(); }); + p.Run([&n] { n.Notify(); }); n.WaitForNotification(); } TEST(ThreadPoolTest, CanDestroyInsideClosure) { - auto p = std::make_shared(1); + auto p = std::make_shared(); grpc_core::Notification n; - p->Add([p, &n]() mutable { + p->Run([p, &n]() mutable { std::this_thread::sleep_for(std::chrono::seconds(1)); // This should delete the thread pool and not deadlock p.reset(); @@ -50,13 +50,13 @@ TEST(ThreadPoolTest, CanDestroyInsideClosure) { } TEST(ThreadPoolTest, CanSurviveFork) { - ThreadPool p(1); + ThreadPool p; grpc_core::Notification n; - gpr_log(GPR_INFO, "add callback 1"); - p.Add([&n, &p] { + gpr_log(GPR_INFO, "run callback 1"); + p.Run([&n, &p] { std::this_thread::sleep_for(std::chrono::seconds(1)); - gpr_log(GPR_INFO, "add callback 2"); - p.Add([&n] { + gpr_log(GPR_INFO, "run callback 2"); + p.Run([&n] { std::this_thread::sleep_for(std::chrono::seconds(1)); gpr_log(GPR_INFO, "notify"); n.Notify(); @@ -69,8 +69,8 @@ TEST(ThreadPoolTest, CanSurviveFork) { gpr_log(GPR_INFO, "postfork child"); p.PostforkChild(); grpc_core::Notification n2; - gpr_log(GPR_INFO, "add callback 3"); - p.Add([&n2] { + gpr_log(GPR_INFO, "run callback 3"); + p.Run([&n2] { gpr_log(GPR_INFO, "notify"); n2.Notify(); }); @@ -79,14 +79,14 @@ TEST(ThreadPoolTest, CanSurviveFork) { } void ScheduleSelf(ThreadPool* p) { - p->Add([p] { ScheduleSelf(p); }); + p->Run([p] { ScheduleSelf(p); }); } TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) { ASSERT_DEATH_IF_SUPPORTED( [] { gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR); - ThreadPool p(1); + ThreadPool p; ScheduleSelf(&p); std::thread terminator([] { std::this_thread::sleep_for(std::chrono::seconds(10)); @@ -99,17 +99,17 @@ TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) { void ScheduleTwiceUntilZero(ThreadPool* p, int n) { if (n == 0) return; - p->Add([p, n] { + p->Run([p, n] { ScheduleTwiceUntilZero(p, n - 1); ScheduleTwiceUntilZero(p, n - 1); }); } TEST(ThreadPoolTest, CanStartLotsOfClosures) { - ThreadPool p(1); - // Our first thread pool implementation tried to create ~256k threads for this + ThreadPool p; + // Our first thread pool implementation tried to create ~1M threads for this // test. - ScheduleTwiceUntilZero(&p, 18); + ScheduleTwiceUntilZero(&p, 20); } } // namespace experimental diff --git a/test/core/event_engine/windows/iocp_test.cc b/test/core/event_engine/windows/iocp_test.cc index a95d21f0cdd..1736446d3d4 100644 --- a/test/core/event_engine/windows/iocp_test.cc +++ b/test/core/event_engine/windows/iocp_test.cc @@ -27,8 +27,8 @@ #include #include "src/core/lib/event_engine/common_closures.h" -#include "src/core/lib/event_engine/executor/threaded_executor.h" #include "src/core/lib/event_engine/poller.h" +#include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/event_engine/windows/win_socket.h" #include "src/core/lib/gprpp/notification.h" @@ -42,14 +42,14 @@ using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::IOCP; using ::grpc_event_engine::experimental::Poller; using ::grpc_event_engine::experimental::SelfDeletingClosure; -using ::grpc_event_engine::experimental::ThreadedExecutor; +using ::grpc_event_engine::experimental::ThreadPool; using ::grpc_event_engine::experimental::WinSocket; } // namespace class IOCPTest : public testing::Test {}; TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) { - ThreadedExecutor executor{2}; + ThreadPool executor; IOCP iocp(&executor); SOCKET sockpair[2]; CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); @@ -137,7 +137,7 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) { } TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) { - ThreadedExecutor executor{2}; + ThreadPool executor; IOCP iocp(&executor); SOCKET sockpair[2]; CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); @@ -201,7 +201,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) { } TEST_F(IOCPTest, KickWorks) { - ThreadedExecutor executor{2}; + ThreadPool executor; IOCP iocp(&executor); grpc_core::Notification kicked; executor.Run([&iocp, &kicked] { @@ -225,7 +225,7 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) { // TODO(hork): evaluate if a kick count is going to be useful. // This documents the existing poller's behavior of maintaining a kick count, // but it's unclear if it's going to be needed. - ThreadedExecutor executor{2}; + ThreadPool executor; IOCP iocp(&executor); // kick twice iocp.Kick(); @@ -248,7 +248,7 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) { } TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) { - ThreadedExecutor executor{2}; + ThreadPool executor; IOCP iocp(&executor); SOCKET sockpair[2]; CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); @@ -274,7 +274,7 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) { for (int thread_n = 0; thread_n < thread_count; thread_n++) { threads.emplace_back([thread_n, sockets_per_thread, &read_count, &write_count] { - ThreadedExecutor executor{2}; + ThreadPool executor; IOCP iocp(&executor); // Start a looping worker thread with a moderate timeout std::thread iocp_worker([&iocp, &executor] { diff --git a/test/core/event_engine/windows/win_socket_test.cc b/test/core/event_engine/windows/win_socket_test.cc index 4c7ae868ee7..5b73f5ce14c 100644 --- a/test/core/event_engine/windows/win_socket_test.cc +++ b/test/core/event_engine/windows/win_socket_test.cc @@ -24,7 +24,7 @@ #include #include "src/core/lib/event_engine/common_closures.h" -#include "src/core/lib/event_engine/executor/threaded_executor.h" +#include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/event_engine/windows/win_socket.h" #include "src/core/lib/iomgr/error.h" @@ -34,14 +34,14 @@ namespace { using ::grpc_event_engine::experimental::AnyInvocableClosure; using ::grpc_event_engine::experimental::CreateSockpair; using ::grpc_event_engine::experimental::IOCP; -using ::grpc_event_engine::experimental::ThreadedExecutor; +using ::grpc_event_engine::experimental::ThreadPool; using ::grpc_event_engine::experimental::WinSocket; } // namespace class WinSocketTest : public testing::Test {}; TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) { - ThreadedExecutor executor{2}; + ThreadPool executor; SOCKET sockpair[2]; CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags()); WinSocket wrapped_client_socket(sockpair[0], &executor); @@ -66,7 +66,7 @@ TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) { } TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) { - ThreadedExecutor executor{2}; + ThreadPool executor; SOCKET sockpair[2]; CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags()); WinSocket wrapped_client_socket(sockpair[0], &executor); diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 3c7d4244a9b..6f460218efe 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -62,6 +62,10 @@ grpc_cc_test( external_deps = [ "benchmark", ], + tags = [ + "no_mac", + "no_windows", + ], uses_polling = False, deps = [ ":helpers", diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6977ac23506..deec3a9dd8c 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1950,8 +1950,6 @@ src/core/lib/event_engine/default_event_engine.h \ src/core/lib/event_engine/default_event_engine_factory.cc \ src/core/lib/event_engine/default_event_engine_factory.h \ src/core/lib/event_engine/executor/executor.h \ -src/core/lib/event_engine/executor/threaded_executor.cc \ -src/core/lib/event_engine/executor/threaded_executor.h \ src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/forkable.h \ src/core/lib/event_engine/handle_containers.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 71bf2bf6611..d4d83418513 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1738,8 +1738,6 @@ src/core/lib/event_engine/default_event_engine.h \ src/core/lib/event_engine/default_event_engine_factory.cc \ src/core/lib/event_engine/default_event_engine_factory.h \ src/core/lib/event_engine/executor/executor.h \ -src/core/lib/event_engine/executor/threaded_executor.cc \ -src/core/lib/event_engine/executor/threaded_executor.h \ src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/forkable.h \ src/core/lib/event_engine/handle_containers.h \