[event_engine] Improve scaling in threadpool (#31234)

* fixes

* Automated change: Fix sanity tests

* fix

* fix

* fix

* fix

* fixes

* fixes

* fix

* fix-win

* fix iwyu

* fix

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31242/head
Craig Tiller 2 years ago committed by GitHub
parent 3c96517fc0
commit 2d00d50c59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      BUILD
  2. 4
      CMakeLists.txt
  3. 2
      Makefile
  4. 8
      build_autogenerated.yaml
  5. 2
      config.m4
  6. 2
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 2
      package.xml
  12. 36
      src/core/lib/event_engine/executor/threaded_executor.cc
  13. 44
      src/core/lib/event_engine/executor/threaded_executor.h
  14. 1
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  15. 4
      src/core/lib/event_engine/posix_engine/posix_engine.h
  16. 120
      src/core/lib/event_engine/thread_pool.cc
  17. 35
      src/core/lib/event_engine/thread_pool.h
  18. 1
      src/core/lib/event_engine/windows/windows_engine.cc
  19. 4
      src/core/lib/event_engine/windows/windows_engine.h
  20. 1
      src/python/grpcio/grpc_core_dependencies.py
  21. 9
      test/core/end2end/tests/hpack_size.cc
  22. 34
      test/core/event_engine/thread_pool_test.cc
  23. 16
      test/core/event_engine/windows/iocp_test.cc
  24. 8
      test/core/event_engine/windows/win_socket_test.cc
  25. 4
      test/cpp/microbenchmarks/BUILD
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 2
      tools/doxygen/Doxyfile.core.internal

25
BUILD

@ -2445,23 +2445,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_threaded_executor",
srcs = [
"src/core/lib/event_engine/executor/threaded_executor.cc",
],
hdrs = [
"src/core/lib/event_engine/executor/threaded_executor.h",
],
external_deps = ["absl/functional:any_invocable"],
deps = [
"event_engine_base_hdrs",
"event_engine_executor",
"event_engine_thread_pool",
"gpr_platform",
],
)
grpc_cc_library(
name = "common_event_engine_closures",
hdrs = ["src/core/lib/event_engine/common_closures.h"],
@ -2507,8 +2490,12 @@ grpc_cc_library(
"absl/time",
],
deps = [
"event_engine_base_hdrs",
"event_engine_executor",
"forkable",
"gpr",
"time",
"useful",
],
)
@ -2884,7 +2871,7 @@ grpc_cc_library(
deps = [
"event_engine_base_hdrs",
"event_engine_common",
"event_engine_threaded_executor",
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"gpr",
@ -2906,7 +2893,7 @@ grpc_cc_library(
deps = [
"event_engine_base_hdrs",
"event_engine_common",
"event_engine_threaded_executor",
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"gpr",

4
CMakeLists.txt generated

@ -2116,7 +2116,6 @@ add_library(grpc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
src/core/lib/event_engine/executor/threaded_executor.cc
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
@ -2725,7 +2724,6 @@ add_library(grpc_unsecure
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
src/core/lib/event_engine/executor/threaded_executor.cc
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
@ -18549,6 +18547,7 @@ if(gRPC_BUILD_TESTS)
add_executable(thread_pool_test
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/gprpp/time.cc
test/core/event_engine/thread_pool_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -18578,6 +18577,7 @@ target_link_libraries(thread_pool_test
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::statusor
gpr
)

2
Makefile generated

@ -1390,7 +1390,6 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
@ -1862,7 +1861,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \

@ -735,7 +735,6 @@ libs:
- src/core/lib/event_engine/default_event_engine.h
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/executor/executor.h
- src/core/lib/event_engine/executor/threaded_executor.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h
@ -1449,7 +1448,6 @@ libs:
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
- src/core/lib/event_engine/executor/threaded_executor.cc
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
@ -1937,7 +1935,6 @@ libs:
- src/core/lib/event_engine/default_event_engine.h
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/executor/executor.h
- src/core/lib/event_engine/executor/threaded_executor.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h
@ -2292,7 +2289,6 @@ libs:
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
- src/core/lib/event_engine/executor/threaded_executor.cc
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
@ -10099,16 +10095,20 @@ targets:
build: test
language: c++
headers:
- src/core/lib/event_engine/executor/executor.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/time.h
src:
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/gprpp/time.cc
- test/core/event_engine/thread_pool_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/status:statusor
- gpr
- name: thread_quota_test
gtest: true

2
config.m4 generated

@ -472,7 +472,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
@ -1335,7 +1334,6 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/config)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/debug)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/executor)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/posix_engine)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/windows)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/experiments)

2
config.w32 generated

@ -438,7 +438,6 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\channel_args_endpoint_config.cc " +
"src\\core\\lib\\event_engine\\default_event_engine.cc " +
"src\\core\\lib\\event_engine\\default_event_engine_factory.cc " +
"src\\core\\lib\\event_engine\\executor\\threaded_executor.cc " +
"src\\core\\lib\\event_engine\\forkable.cc " +
"src\\core\\lib\\event_engine\\memory_allocator.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " +
@ -1457,7 +1456,6 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\config");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\debug");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\executor");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\posix_engine");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\windows");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\experiments");

2
gRPC-C++.podspec generated

@ -685,7 +685,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine.h',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h',
@ -1550,7 +1549,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine.h',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h',

3
gRPC-Core.podspec generated

@ -1054,8 +1054,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
@ -2179,7 +2177,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine.h',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h',

2
grpc.gemspec generated

@ -965,8 +965,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/default_event_engine_factory.cc )
s.files += %w( src/core/lib/event_engine/default_event_engine_factory.h )
s.files += %w( src/core/lib/event_engine/executor/executor.h )
s.files += %w( src/core/lib/event_engine/executor/threaded_executor.cc )
s.files += %w( src/core/lib/event_engine/executor/threaded_executor.h )
s.files += %w( src/core/lib/event_engine/forkable.cc )
s.files += %w( src/core/lib/event_engine/forkable.h )
s.files += %w( src/core/lib/event_engine/handle_containers.h )

2
grpc.gyp generated

@ -805,7 +805,6 @@
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
@ -1256,7 +1255,6 @@
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',

2
package.xml generated

@ -947,8 +947,6 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/default_event_engine_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/default_event_engine_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/executor/executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/executor/threaded_executor.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/executor/threaded_executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/forkable.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/forkable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/handle_containers.h" role="src" />

@ -1,36 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include <utility>
namespace grpc_event_engine {
namespace experimental {
ThreadedExecutor::ThreadedExecutor(int reserve_threads)
: thread_pool_(reserve_threads){};
void ThreadedExecutor::Run(EventEngine::Closure* closure) {
thread_pool_.Add([closure]() { closure->Run(); });
}
void ThreadedExecutor::Run(absl::AnyInvocable<void()> closure) {
thread_pool_.Add(std::move(closure));
}
} // namespace experimental
} // namespace grpc_event_engine

@ -1,44 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H
#define GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H
#include <grpc/support/port_platform.h>
#include "absl/functional/any_invocable.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/executor/executor.h"
#include "src/core/lib/event_engine/thread_pool.h"
namespace grpc_event_engine {
namespace experimental {
class ThreadedExecutor : public Executor {
public:
explicit ThreadedExecutor(int reserve_threads);
~ThreadedExecutor() override = default;
void Run(EventEngine::Closure* closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
private:
ThreadPool thread_pool_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H

@ -25,7 +25,6 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"

@ -31,9 +31,9 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
@ -111,7 +111,7 @@ class PosixEventEngine final : public EventEngine {
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
std::atomic<intptr_t> aba_token_{0};
posix_engine::TimerManager timer_manager_;
ThreadedExecutor executor_{2};
ThreadPool executor_;
};
} // namespace experimental

@ -20,15 +20,18 @@
#include "src/core/lib/event_engine/thread_pool.h"
#include <atomic>
#include <memory>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace experimental {
@ -39,29 +42,60 @@ namespace {
thread_local bool g_threadpool_thread;
} // namespace
void ThreadPool::StartThread(StatePtr state, bool throttled) {
void ThreadPool::StartThread(StatePtr state, StartThreadReason reason) {
state->thread_count.Add();
if (throttled && state->currently_starting_one_thread.exchange(
true, std::memory_order_relaxed)) {
state->thread_count.Remove();
return;
const auto now = grpc_core::Timestamp::Now();
switch (reason) {
case StartThreadReason::kNoWaitersWhenScheduling: {
auto time_since_last_start =
now - grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
state->last_started_thread.load(std::memory_order_relaxed));
if (time_since_last_start < grpc_core::Duration::Seconds(1)) {
state->thread_count.Remove();
return;
}
}
ABSL_FALLTHROUGH_INTENDED;
case StartThreadReason::kNoWaitersWhenFinishedStarting:
if (state->currently_starting_one_thread.exchange(
true, std::memory_order_relaxed)) {
state->thread_count.Remove();
return;
}
state->last_started_thread.store(now.milliseconds_after_process_epoch(),
std::memory_order_relaxed);
break;
case StartThreadReason::kInitialPool:
break;
}
struct ThreadArg {
StatePtr state;
bool throttled;
StartThreadReason reason;
};
grpc_core::Thread(
"event_engine",
[](void* arg) {
std::unique_ptr<ThreadArg> a(static_cast<ThreadArg*>(arg));
g_threadpool_thread = true;
if (a->throttled) {
GPR_ASSERT(a->state->currently_starting_one_thread.exchange(
false, std::memory_order_relaxed));
switch (a->reason) {
case StartThreadReason::kInitialPool:
break;
case StartThreadReason::kNoWaitersWhenFinishedStarting:
a->state->queue.SleepIfRunning();
ABSL_FALLTHROUGH_INTENDED;
case StartThreadReason::kNoWaitersWhenScheduling:
// Release throttling variable
GPR_ASSERT(a->state->currently_starting_one_thread.exchange(
false, std::memory_order_relaxed));
if (a->state->queue.IsBacklogged()) {
StartThread(a->state,
StartThreadReason::kNoWaitersWhenFinishedStarting);
}
break;
}
ThreadFunc(a->state);
},
new ThreadArg{state, throttled}, nullptr,
new ThreadArg{state, reason}, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
.Start();
}
@ -78,10 +112,18 @@ bool ThreadPool::Queue::Step() {
while (state_ == State::kRunning && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread.
// TODO(ctiller): wait some time in this case to be sure.
if (threads_waiting_ >= reserve_threads_) return false;
threads_waiting_++;
cv_.Wait(&mu_);
threads_waiting_--;
if (threads_waiting_ >= reserve_threads_) {
threads_waiting_++;
bool timeout = cv_.WaitWithTimeout(&mu_, absl::Seconds(30));
threads_waiting_--;
if (timeout && threads_waiting_ >= reserve_threads_) {
return false;
}
} else {
threads_waiting_++;
cv_.Wait(&mu_);
threads_waiting_--;
}
}
switch (state_) {
case State::kRunning:
@ -99,10 +141,9 @@ bool ThreadPool::Queue::Step() {
return true;
}
ThreadPool::ThreadPool(int reserve_threads)
: reserve_threads_(reserve_threads) {
for (int i = 0; i < reserve_threads; i++) {
StartThread(state_, /*throttled=*/false);
ThreadPool::ThreadPool() {
for (unsigned i = 0; i < reserve_threads_; i++) {
StartThread(state_, StartThreadReason::kInitialPool);
}
}
@ -116,12 +157,16 @@ ThreadPool::~ThreadPool() {
"shutting down");
}
void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
void ThreadPool::Run(absl::AnyInvocable<void()> callback) {
if (state_->queue.Add(std::move(callback))) {
StartThread(state_, /*throttled=*/true);
StartThread(state_, StartThreadReason::kNoWaitersWhenScheduling);
}
}
void ThreadPool::Run(EventEngine::Closure* closure) {
Run([closure]() { closure->Run(); });
}
bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list
@ -130,13 +175,42 @@ bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
switch (state_) {
case State::kRunning:
case State::kShutdown:
return threads_waiting_ == 0;
return callbacks_.size() > threads_waiting_;
case State::kForking:
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
bool ThreadPool::Queue::IsBacklogged() {
grpc_core::MutexLock lock(&mu_);
switch (state_) {
case State::kRunning:
case State::kShutdown:
return callbacks_.size() > 1;
case State::kForking:
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
void ThreadPool::Queue::SleepIfRunning() {
grpc_core::MutexLock lock(&mu_);
auto end = grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now();
while (true) {
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
if (now >= end) return;
switch (state_) {
case State::kRunning:
case State::kShutdown:
cv_.WaitWithTimeout(&mu_, absl::Milliseconds((end - now).millis()));
break;
case State::kForking:
return;
}
}
}
void ThreadPool::Queue::SetState(State state) {
grpc_core::MutexLock lock(&mu_);
if (state == State::kRunning) {
@ -187,8 +261,8 @@ void ThreadPool::PostforkChild() { Postfork(); }
void ThreadPool::Postfork() {
state_->queue.Reset();
for (int i = 0; i < reserve_threads_; i++) {
StartThread(state_, /*throttled=*/false);
for (unsigned i = 0; i < reserve_threads_; i++) {
StartThread(state_, StartThreadReason::kInitialPool);
}
}

@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <queue>
@ -28,19 +30,25 @@
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/cpu.h>
#include "src/core/lib/event_engine/executor/executor.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
class ThreadPool final : public grpc_event_engine::experimental::Forkable {
class ThreadPool final : public Forkable, public Executor {
public:
explicit ThreadPool(int reserve_threads);
ThreadPool();
// Ensures the thread pool is empty before destroying it.
~ThreadPool() override;
void Add(absl::AnyInvocable<void()> callback);
void Run(absl::AnyInvocable<void()> callback) override;
void Run(EventEngine::Closure* closure) override;
// Forkable
// Ensures that the thread pool is empty before forking.
@ -51,7 +59,8 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
private:
class Queue {
public:
explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}
explicit Queue(unsigned reserve_threads)
: reserve_threads_(reserve_threads) {}
bool Step();
void SetShutdown() { SetState(State::kShutdown); }
void SetForking() { SetState(State::kForking); }
@ -59,6 +68,8 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
// Return true if we should also spin up a new thread.
bool Add(absl::AnyInvocable<void()> callback);
void Reset() { SetState(State::kRunning); }
bool IsBacklogged();
void SleepIfRunning();
private:
enum class State { kRunning, kShutdown, kForking };
@ -68,8 +79,8 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
const int reserve_threads_;
unsigned threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
const unsigned reserve_threads_;
State state_ ABSL_GUARDED_BY(mu_) = State::kRunning;
};
@ -92,19 +103,27 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
// After pool creation we use this to rate limit creation of threads to one
// at a time.
std::atomic<bool> currently_starting_one_thread{false};
std::atomic<uint64_t> last_started_thread{0};
};
using StatePtr = std::shared_ptr<State>;
enum class StartThreadReason {
kInitialPool,
kNoWaitersWhenScheduling,
kNoWaitersWhenFinishedStarting,
};
static void ThreadFunc(StatePtr state);
// Start a new thread; throttled indicates whether the State::starting_thread
// variable is being used to throttle this threads creation against others or
// not: at thread pool startup we start several threads concurrently, but
// after that we only start one at a time.
static void StartThread(StatePtr state, bool throttled);
static void StartThread(StatePtr state, StartThreadReason reason);
void Postfork();
const int reserve_threads_;
const unsigned reserve_threads_ =
grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 32u);
const StatePtr state_ = std::make_shared<State>(reserve_threads_);
};

@ -26,7 +26,6 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/trace.h"

@ -28,9 +28,9 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
@ -108,7 +108,7 @@ class WindowsEventEngine : public EventEngine {
std::atomic<intptr_t> aba_token_{0};
posix_engine::TimerManager timer_manager_;
ThreadedExecutor executor_{2};
ThreadPool executor_;
IOCP iocp_;
};

@ -447,7 +447,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',

@ -34,9 +34,11 @@
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/time.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/util/test_config.h"
@ -265,10 +267,9 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
extra_metadata[2].value =
grpc_slice_from_static_string(dragons[index % GPR_ARRAY_SIZE(dragons)]);
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"), nullptr,
deadline, nullptr);
gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
@ -309,7 +310,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(101), true);
cqv.Verify();
cqv.Verify(grpc_core::Duration::Seconds(120));
memset(ops, 0, sizeof(ops));
op = ops;
@ -337,7 +338,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
cqv.Expect(tag(102), true);
cqv.Expect(tag(1), true);
cqv.Verify();
cqv.Verify(grpc_core::Duration::Seconds(120));
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));

@ -29,16 +29,16 @@ namespace grpc_event_engine {
namespace experimental {
TEST(ThreadPoolTest, CanRunClosure) {
ThreadPool p(1);
ThreadPool p;
grpc_core::Notification n;
p.Add([&n] { n.Notify(); });
p.Run([&n] { n.Notify(); });
n.WaitForNotification();
}
TEST(ThreadPoolTest, CanDestroyInsideClosure) {
auto p = std::make_shared<ThreadPool>(1);
auto p = std::make_shared<ThreadPool>();
grpc_core::Notification n;
p->Add([p, &n]() mutable {
p->Run([p, &n]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
// This should delete the thread pool and not deadlock
p.reset();
@ -50,13 +50,13 @@ TEST(ThreadPoolTest, CanDestroyInsideClosure) {
}
TEST(ThreadPoolTest, CanSurviveFork) {
ThreadPool p(1);
ThreadPool p;
grpc_core::Notification n;
gpr_log(GPR_INFO, "add callback 1");
p.Add([&n, &p] {
gpr_log(GPR_INFO, "run callback 1");
p.Run([&n, &p] {
std::this_thread::sleep_for(std::chrono::seconds(1));
gpr_log(GPR_INFO, "add callback 2");
p.Add([&n] {
gpr_log(GPR_INFO, "run callback 2");
p.Run([&n] {
std::this_thread::sleep_for(std::chrono::seconds(1));
gpr_log(GPR_INFO, "notify");
n.Notify();
@ -69,8 +69,8 @@ TEST(ThreadPoolTest, CanSurviveFork) {
gpr_log(GPR_INFO, "postfork child");
p.PostforkChild();
grpc_core::Notification n2;
gpr_log(GPR_INFO, "add callback 3");
p.Add([&n2] {
gpr_log(GPR_INFO, "run callback 3");
p.Run([&n2] {
gpr_log(GPR_INFO, "notify");
n2.Notify();
});
@ -79,14 +79,14 @@ TEST(ThreadPoolTest, CanSurviveFork) {
}
void ScheduleSelf(ThreadPool* p) {
p->Add([p] { ScheduleSelf(p); });
p->Run([p] { ScheduleSelf(p); });
}
TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
ASSERT_DEATH_IF_SUPPORTED(
[] {
gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
ThreadPool p(1);
ThreadPool p;
ScheduleSelf(&p);
std::thread terminator([] {
std::this_thread::sleep_for(std::chrono::seconds(10));
@ -99,17 +99,17 @@ TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
void ScheduleTwiceUntilZero(ThreadPool* p, int n) {
if (n == 0) return;
p->Add([p, n] {
p->Run([p, n] {
ScheduleTwiceUntilZero(p, n - 1);
ScheduleTwiceUntilZero(p, n - 1);
});
}
TEST(ThreadPoolTest, CanStartLotsOfClosures) {
ThreadPool p(1);
// Our first thread pool implementation tried to create ~256k threads for this
ThreadPool p;
// Our first thread pool implementation tried to create ~1M threads for this
// test.
ScheduleTwiceUntilZero(&p, 18);
ScheduleTwiceUntilZero(&p, 20);
}
} // namespace experimental

@ -27,8 +27,8 @@
#include <grpc/support/log_windows.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/gprpp/notification.h"
@ -42,14 +42,14 @@ using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::IOCP;
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::experimental::SelfDeletingClosure;
using ::grpc_event_engine::experimental::ThreadedExecutor;
using ::grpc_event_engine::experimental::ThreadPool;
using ::grpc_event_engine::experimental::WinSocket;
} // namespace
class IOCPTest : public testing::Test {};
TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
ThreadedExecutor executor{2};
ThreadPool executor;
IOCP iocp(&executor);
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
@ -137,7 +137,7 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
}
TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
ThreadedExecutor executor{2};
ThreadPool executor;
IOCP iocp(&executor);
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
@ -201,7 +201,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
}
TEST_F(IOCPTest, KickWorks) {
ThreadedExecutor executor{2};
ThreadPool executor;
IOCP iocp(&executor);
grpc_core::Notification kicked;
executor.Run([&iocp, &kicked] {
@ -225,7 +225,7 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
// TODO(hork): evaluate if a kick count is going to be useful.
// This documents the existing poller's behavior of maintaining a kick count,
// but it's unclear if it's going to be needed.
ThreadedExecutor executor{2};
ThreadPool executor;
IOCP iocp(&executor);
// kick twice
iocp.Kick();
@ -248,7 +248,7 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
}
TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
ThreadedExecutor executor{2};
ThreadPool executor;
IOCP iocp(&executor);
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
@ -274,7 +274,7 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
for (int thread_n = 0; thread_n < thread_count; thread_n++) {
threads.emplace_back([thread_n, sockets_per_thread, &read_count,
&write_count] {
ThreadedExecutor executor{2};
ThreadPool executor;
IOCP iocp(&executor);
// Start a looping worker thread with a moderate timeout
std::thread iocp_worker([&iocp, &executor] {

@ -24,7 +24,7 @@
#include <grpc/support/log_windows.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/iomgr/error.h"
@ -34,14 +34,14 @@ namespace {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::CreateSockpair;
using ::grpc_event_engine::experimental::IOCP;
using ::grpc_event_engine::experimental::ThreadedExecutor;
using ::grpc_event_engine::experimental::ThreadPool;
using ::grpc_event_engine::experimental::WinSocket;
} // namespace
class WinSocketTest : public testing::Test {};
TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) {
ThreadedExecutor executor{2};
ThreadPool executor;
SOCKET sockpair[2];
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
WinSocket wrapped_client_socket(sockpair[0], &executor);
@ -66,7 +66,7 @@ TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) {
}
TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) {
ThreadedExecutor executor{2};
ThreadPool executor;
SOCKET sockpair[2];
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
WinSocket wrapped_client_socket(sockpair[0], &executor);

@ -62,6 +62,10 @@ grpc_cc_test(
external_deps = [
"benchmark",
],
tags = [
"no_mac",
"no_windows",
],
uses_polling = False,
deps = [
":helpers",

@ -1950,8 +1950,6 @@ src/core/lib/event_engine/default_event_engine.h \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/default_event_engine_factory.h \
src/core/lib/event_engine/executor/executor.h \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/executor/threaded_executor.h \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/handle_containers.h \

@ -1738,8 +1738,6 @@ src/core/lib/event_engine/default_event_engine.h \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/default_event_engine_factory.h \
src/core/lib/event_engine/executor/executor.h \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/executor/threaded_executor.h \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/handle_containers.h \

Loading…
Cancel
Save