mirror of https://github.com/grpc/grpc.git
[EventEngine] Implement work-stealing in the EventEngine ThreadPool (#32869)
This PR implements a work-stealing thread pool for use inside EventEngine implementations. Because of historical risks here, I've guarded the new implementation behind an experiment flag: `GRPC_EXPERIMENTS=work_stealing`. Current default behavior is the original thread pool implementation. Benchmarks look very promising: ``` bazel test \ --test_timeout=300 \ --config=opt -c opt \ --test_output=streamed \ --test_arg='--benchmark_format=csv' \ --test_arg='--benchmark_min_time=0.15' \ --test_arg='--benchmark_filter=_FanOut' \ --test_arg='--benchmark_repetitions=15' \ --test_arg='--benchmark_report_aggregates_only=true' \ test/cpp/microbenchmarks:bm_thread_pool ``` 2023-05-04: `bm_thread_pool` benchmark results on my local machine (64 core ThreadRipper PRO 3995WX, 256GB memory), comparing this PR to master: ![image](https://user-images.githubusercontent.com/295906/236315252-35ed237e-7626-486c-acfa-71a36f783d22.png) 2023-05-04: `bm_thread_pool` benchmark results in the Linux RBE environment (unsure of machine configuration, likely small), comparing this PR to master. ![image](https://user-images.githubusercontent.com/295906/236317164-2c5acbeb-fdac-4737-9b2d-4df9c41cb825.png) --------- Co-authored-by: drfloob <drfloob@users.noreply.github.com>pull/33043/head
parent
7df0e11755
commit
3fb738b9b1
60 changed files with 1848 additions and 1098 deletions
@ -0,0 +1,50 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_POOL_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_POOL_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <memory> |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/forkable.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// Interface for all EventEngine ThreadPool implementations
|
||||
class ThreadPool : public Forkable { |
||||
public: |
||||
// Asserts Quiesce was called.
|
||||
~ThreadPool() override = default; |
||||
// Shut down the pool, and wait for all threads to exit.
|
||||
// This method is safe to call from within a ThreadPool thread.
|
||||
virtual void Quiesce() = 0; |
||||
// Run must not be called after Quiesce completes
|
||||
virtual void Run(absl::AnyInvocable<void()> callback) = 0; |
||||
virtual void Run(EventEngine::Closure* closure) = 0; |
||||
}; |
||||
|
||||
// Creates a default thread pool.
|
||||
std::shared_ptr<ThreadPool> MakeThreadPool(size_t reserve_threads); |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_POOL_H
|
@ -0,0 +1,460 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2015 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" |
||||
|
||||
#include <atomic> |
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/time/clock.h" |
||||
#include "absl/time/time.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/backoff/backoff.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/event_engine/thread_local.h" |
||||
#include "src/core/lib/event_engine/trace.h" |
||||
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h" |
||||
#include "src/core/lib/event_engine/work_queue/work_queue.h" |
||||
#include "src/core/lib/gprpp/thd.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
namespace { |
||||
constexpr grpc_core::Duration kIdleThreadLimit = |
||||
grpc_core::Duration::Seconds(20); |
||||
constexpr grpc_core::Duration kTimeBetweenThrottledThreadStarts = |
||||
grpc_core::Duration::Seconds(1); |
||||
constexpr grpc_core::Duration kWorkerThreadMinSleepBetweenChecks{ |
||||
grpc_core::Duration::Milliseconds(33)}; |
||||
constexpr grpc_core::Duration kWorkerThreadMaxSleepBetweenChecks{ |
||||
grpc_core::Duration::Seconds(3)}; |
||||
constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{ |
||||
grpc_core::Duration::Milliseconds(50)}; |
||||
constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{ |
||||
grpc_core::Duration::Seconds(1)}; |
||||
constexpr absl::Duration kSleepBetweenQuiesceCheck{absl::Milliseconds(10)}; |
||||
} // namespace
|
||||
|
||||
thread_local WorkQueue* g_local_queue = nullptr; |
||||
|
||||
// -------- WorkStealingThreadPool --------
|
||||
|
||||
WorkStealingThreadPool::WorkStealingThreadPool(size_t reserve_threads) |
||||
: pool_{std::make_shared<WorkStealingThreadPoolImpl>(reserve_threads)} { |
||||
pool_->Start(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::Quiesce() { pool_->Quiesce(); } |
||||
|
||||
WorkStealingThreadPool::~WorkStealingThreadPool() { |
||||
GPR_ASSERT(pool_->IsQuiesced()); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::Run(absl::AnyInvocable<void()> callback) { |
||||
Run(SelfDeletingClosure::Create(std::move(callback))); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::Run(EventEngine::Closure* closure) { |
||||
pool_->Run(closure); |
||||
} |
||||
|
||||
// -------- WorkStealingThreadPool::TheftRegistry --------
|
||||
|
||||
void WorkStealingThreadPool::TheftRegistry::Enroll(WorkQueue* queue) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
queues_.emplace(queue); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::TheftRegistry::Unenroll(WorkQueue* queue) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
queues_.erase(queue); |
||||
} |
||||
|
||||
EventEngine::Closure* WorkStealingThreadPool::TheftRegistry::StealOne() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
EventEngine::Closure* closure; |
||||
for (auto* queue : queues_) { |
||||
closure = queue->PopMostRecent(); |
||||
if (closure != nullptr) return closure; |
||||
} |
||||
return nullptr; |
||||
} |
||||
|
||||
void WorkStealingThreadPool::PrepareFork() { pool_->PrepareFork(); } |
||||
|
||||
void WorkStealingThreadPool::PostforkParent() { pool_->Postfork(); } |
||||
|
||||
void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); } |
||||
|
||||
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl --------
|
||||
|
||||
WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl( |
||||
size_t reserve_threads) |
||||
: reserve_threads_(reserve_threads), lifeguard_() {} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() { |
||||
lifeguard_.Start(shared_from_this()); |
||||
for (size_t i = 0; i < reserve_threads_; i++) { |
||||
StartThread(); |
||||
} |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run( |
||||
EventEngine::Closure* closure) { |
||||
GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false); |
||||
if (g_local_queue != nullptr) { |
||||
g_local_queue->Add(closure); |
||||
return; |
||||
} |
||||
queue_.Add(closure); |
||||
work_signal_.Signal(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread() { |
||||
last_started_thread_.store( |
||||
grpc_core::Timestamp::Now().milliseconds_after_process_epoch(), |
||||
std::memory_order_relaxed); |
||||
grpc_core::Thread( |
||||
"event_engine", |
||||
[](void* arg) { |
||||
ThreadState* worker = static_cast<ThreadState*>(arg); |
||||
worker->ThreadBody(); |
||||
delete worker; |
||||
}, |
||||
new ThreadState(shared_from_this()), nullptr, |
||||
grpc_core::Thread::Options().set_tracked(false).set_joinable(false)) |
||||
.Start(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() { |
||||
SetShutdown(true); |
||||
// Wait until all threads have exited.
|
||||
// Note that if this is a threadpool thread then we won't exit this thread
|
||||
// until all other threads have exited, so we need to wait for just one thread
|
||||
// running instead of zero.
|
||||
bool is_threadpool_thread = g_local_queue != nullptr; |
||||
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, |
||||
is_threadpool_thread ? 1 : 0, |
||||
"shutting down", work_signal()); |
||||
GPR_ASSERT(queue_.Empty()); |
||||
quiesced_.store(true, std::memory_order_relaxed); |
||||
lifeguard_.BlockUntilShutdown(); |
||||
} |
||||
|
||||
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetThrottled( |
||||
bool throttled) { |
||||
return throttled_.exchange(throttled, std::memory_order_relaxed); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetShutdown( |
||||
bool is_shutdown) { |
||||
auto was_shutdown = shutdown_.exchange(is_shutdown); |
||||
GPR_ASSERT(is_shutdown != was_shutdown); |
||||
work_signal_.SignalAll(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::SetForking( |
||||
bool is_forking) { |
||||
auto was_forking = forking_.exchange(is_forking); |
||||
GPR_ASSERT(is_forking != was_forking); |
||||
} |
||||
|
||||
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsForking() { |
||||
return forking_.load(std::memory_order_relaxed); |
||||
} |
||||
|
||||
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsShutdown() { |
||||
return shutdown_.load(std::memory_order_relaxed); |
||||
} |
||||
|
||||
bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsQuiesced() { |
||||
return quiesced_.load(std::memory_order_relaxed); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() { |
||||
SetForking(true); |
||||
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, 0, |
||||
"forking", &work_signal_); |
||||
lifeguard_.BlockUntilShutdown(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() { |
||||
SetForking(false); |
||||
Start(); |
||||
} |
||||
|
||||
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard
|
||||
// --------
|
||||
|
||||
WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard() |
||||
: backoff_(grpc_core::BackOff::Options() |
||||
.set_initial_backoff(kLifeguardMinSleepBetweenChecks) |
||||
.set_max_backoff(kLifeguardMaxSleepBetweenChecks) |
||||
.set_multiplier(1.3)) {} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Start( |
||||
std::shared_ptr<WorkStealingThreadPoolImpl> pool) { |
||||
pool_ = std::move(pool); |
||||
grpc_core::Thread( |
||||
"lifeguard", |
||||
[](void* arg) { |
||||
auto* lifeguard = static_cast<Lifeguard*>(arg); |
||||
lifeguard->LifeguardMain(); |
||||
}, |
||||
this, nullptr, |
||||
grpc_core::Thread::Options().set_tracked(false).set_joinable(false)) |
||||
.Start(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: |
||||
LifeguardMain() { |
||||
thread_running_.store(true); |
||||
while (true) { |
||||
absl::SleepFor(absl::Milliseconds( |
||||
(backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()).millis())); |
||||
if (pool_->IsForking()) break; |
||||
if (pool_->IsShutdown() && pool_->IsQuiesced()) break; |
||||
MaybeStartNewThread(); |
||||
} |
||||
thread_running_.store(false); |
||||
pool_.reset(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: |
||||
BlockUntilShutdown() { |
||||
while (thread_running_.load()) { |
||||
absl::SleepFor(kSleepBetweenQuiesceCheck); |
||||
} |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: |
||||
MaybeStartNewThread() { |
||||
// No new threads are started when forking.
|
||||
// No new work is done when forking needs to begin.
|
||||
if (pool_->forking_.load()) return; |
||||
int busy_thread_count = |
||||
pool_->thread_count_.GetCount(CounterType::kBusyCount); |
||||
int living_thread_count = |
||||
pool_->thread_count_.GetCount(CounterType::kLivingThreadCount); |
||||
// Wake an idle worker thread if there's global work to be had.
|
||||
if (busy_thread_count < living_thread_count) { |
||||
if (!pool_->queue_.Empty()) { |
||||
pool_->work_signal()->Signal(); |
||||
backoff_.Reset(); |
||||
} |
||||
// Idle threads will eventually wake up for an attempt at work stealing.
|
||||
return; |
||||
} |
||||
// No new threads if in the throttled state.
|
||||
// However, all workers are busy, so the Lifeguard should be more
|
||||
// vigilant about checking whether a new thread must be started.
|
||||
if (grpc_core::Timestamp::Now() - |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
pool_->last_started_thread_) < |
||||
kTimeBetweenThrottledThreadStarts) { |
||||
backoff_.Reset(); |
||||
return; |
||||
} |
||||
// All workers are busy and the pool is not throttled. Start a new thread.
|
||||
// TODO(hork): new threads may spawn when there is no work in the global
|
||||
// queue, nor any work to steal. Add more sophisticated logic about when to
|
||||
// start a thread.
|
||||
GRPC_EVENT_ENGINE_TRACE( |
||||
"Starting new ThreadPool thread due to backlog (total threads: %d)", |
||||
living_thread_count + 1); |
||||
pool_->StartThread(); |
||||
// Tell the lifeguard to monitor the pool more closely.
|
||||
backoff_.Reset(); |
||||
} |
||||
|
||||
// -------- WorkStealingThreadPool::ThreadState --------
|
||||
|
||||
WorkStealingThreadPool::ThreadState::ThreadState( |
||||
std::shared_ptr<WorkStealingThreadPoolImpl> pool) |
||||
: pool_(std::move(pool)), |
||||
auto_thread_count_(pool_->thread_count(), |
||||
CounterType::kLivingThreadCount), |
||||
backoff_(grpc_core::BackOff::Options() |
||||
.set_initial_backoff(kWorkerThreadMinSleepBetweenChecks) |
||||
.set_max_backoff(kWorkerThreadMaxSleepBetweenChecks) |
||||
.set_multiplier(1.3)) {} |
||||
|
||||
void WorkStealingThreadPool::ThreadState::ThreadBody() { |
||||
g_local_queue = new BasicWorkQueue(); |
||||
pool_->theft_registry()->Enroll(g_local_queue); |
||||
ThreadLocal::SetIsEventEngineThread(true); |
||||
while (Step()) { |
||||
// loop until the thread should no longer run
|
||||
} |
||||
// cleanup
|
||||
if (pool_->IsForking()) { |
||||
// TODO(hork): consider WorkQueue::AddAll(WorkQueue*)
|
||||
EventEngine::Closure* closure; |
||||
while (!g_local_queue->Empty()) { |
||||
closure = g_local_queue->PopMostRecent(); |
||||
if (closure != nullptr) { |
||||
pool_->queue()->Add(closure); |
||||
} |
||||
} |
||||
} |
||||
GPR_ASSERT(g_local_queue->Empty()); |
||||
pool_->theft_registry()->Unenroll(g_local_queue); |
||||
delete g_local_queue; |
||||
} |
||||
|
||||
void WorkStealingThreadPool::ThreadState::SleepIfRunning() { |
||||
if (pool_->IsForking()) return; |
||||
absl::SleepFor( |
||||
absl::Milliseconds(kTimeBetweenThrottledThreadStarts.millis())); |
||||
} |
||||
|
||||
bool WorkStealingThreadPool::ThreadState::Step() { |
||||
if (pool_->IsForking()) return false; |
||||
auto* closure = g_local_queue->PopMostRecent(); |
||||
// If local work is available, run it.
|
||||
if (closure != nullptr) { |
||||
ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(), |
||||
CounterType::kBusyCount}; |
||||
closure->Run(); |
||||
return true; |
||||
} |
||||
// Thread shutdown exit condition (ignoring fork). All must be true:
|
||||
// * shutdown was called
|
||||
// * the local queue is empty
|
||||
// * the global queue is empty
|
||||
// * the steal pool returns nullptr
|
||||
bool should_run_again = false; |
||||
grpc_core::Timestamp start_time{grpc_core::Timestamp::Now()}; |
||||
// Wait until work is available or until shut down.
|
||||
while (!pool_->IsForking()) { |
||||
// Pull from the global queue next
|
||||
// TODO(hork): consider an empty check for performance wins. Depends on the
|
||||
// queue implementation, the BasicWorkQueue takes two locks when you do an
|
||||
// empty check then pop.
|
||||
closure = pool_->queue()->PopMostRecent(); |
||||
if (closure != nullptr) { |
||||
should_run_again = true; |
||||
break; |
||||
}; |
||||
// Try stealing if the queue is empty
|
||||
closure = pool_->theft_registry()->StealOne(); |
||||
if (closure != nullptr) { |
||||
should_run_again = true; |
||||
break; |
||||
} |
||||
// No closures were retrieved from anywhere.
|
||||
// Quit the thread if the pool has been shut down.
|
||||
if (pool_->IsShutdown()) break; |
||||
bool timed_out = pool_->work_signal()->WaitWithTimeout( |
||||
backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()); |
||||
// Quit a thread if the pool has more than it requires, and this thread
|
||||
// has been idle long enough.
|
||||
if (timed_out && |
||||
pool_->thread_count()->GetCount(CounterType::kLivingThreadCount) > |
||||
pool_->reserve_threads() && |
||||
grpc_core::Timestamp::Now() - start_time > kIdleThreadLimit) { |
||||
return false; |
||||
} |
||||
} |
||||
if (pool_->IsForking()) { |
||||
// save the closure since we aren't going to execute it.
|
||||
if (closure != nullptr) g_local_queue->Add(closure); |
||||
return false; |
||||
} |
||||
if (closure != nullptr) { |
||||
ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(), |
||||
CounterType::kBusyCount}; |
||||
closure->Run(); |
||||
} |
||||
backoff_.Reset(); |
||||
return should_run_again; |
||||
} |
||||
|
||||
// -------- WorkStealingThreadPool::ThreadCount --------
|
||||
|
||||
void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) { |
||||
thread_counts_[counter_type].fetch_add(1, std::memory_order_relaxed); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::ThreadCount::Remove(CounterType counter_type) { |
||||
thread_counts_[counter_type].fetch_sub(1, std::memory_order_relaxed); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount( |
||||
CounterType counter_type, int desired_threads, const char* why, |
||||
WorkSignal* work_signal) { |
||||
auto& counter = thread_counts_[counter_type]; |
||||
int curr_threads = counter.load(std::memory_order_relaxed); |
||||
// Wait for all threads to exit.
|
||||
auto last_log_time = grpc_core::Timestamp::Now(); |
||||
while (curr_threads > desired_threads) { |
||||
absl::SleepFor(kSleepBetweenQuiesceCheck); |
||||
work_signal->SignalAll(); |
||||
if (grpc_core::Timestamp::Now() - last_log_time > |
||||
grpc_core::Duration::Seconds(3)) { |
||||
gpr_log(GPR_DEBUG, |
||||
"Waiting for thread pool to idle before %s. (%d to %d)", why, |
||||
curr_threads, desired_threads); |
||||
last_log_time = grpc_core::Timestamp::Now(); |
||||
} |
||||
curr_threads = counter.load(std::memory_order_relaxed); |
||||
} |
||||
} |
||||
|
||||
size_t WorkStealingThreadPool::ThreadCount::GetCount(CounterType counter_type) { |
||||
return thread_counts_[counter_type].load(std::memory_order_relaxed); |
||||
} |
||||
|
||||
WorkStealingThreadPool::ThreadCount::AutoThreadCount::AutoThreadCount( |
||||
ThreadCount* counter, CounterType counter_type) |
||||
: counter_(counter), counter_type_(counter_type) { |
||||
counter_->Add(counter_type_); |
||||
} |
||||
|
||||
WorkStealingThreadPool::ThreadCount::AutoThreadCount::~AutoThreadCount() { |
||||
counter_->Remove(counter_type_); |
||||
} |
||||
|
||||
// -------- WorkStealingThreadPool::WorkSignal --------
|
||||
|
||||
void WorkStealingThreadPool::WorkSignal::Signal() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
cv_.Signal(); |
||||
} |
||||
|
||||
void WorkStealingThreadPool::WorkSignal::SignalAll() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
cv_.SignalAll(); |
||||
} |
||||
|
||||
bool WorkStealingThreadPool::WorkSignal::WaitWithTimeout( |
||||
grpc_core::Duration time) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
return cv_.WaitWithTimeout(&mu_, absl::Milliseconds(time.millis())); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,246 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2015 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
#include <stdint.h> |
||||
|
||||
#include <atomic> |
||||
#include <memory> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/container/flat_hash_set.h" |
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/backoff/backoff.h" |
||||
#include "src/core/lib/event_engine/thread_pool/thread_pool.h" |
||||
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h" |
||||
#include "src/core/lib/event_engine/work_queue/work_queue.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class WorkStealingThreadPool final : public ThreadPool { |
||||
public: |
||||
explicit WorkStealingThreadPool(size_t reserve_threads); |
||||
// Asserts Quiesce was called.
|
||||
~WorkStealingThreadPool() override; |
||||
// Shut down the pool, and wait for all threads to exit.
|
||||
// This method is safe to call from within a ThreadPool thread.
|
||||
void Quiesce() override; |
||||
// Run must not be called after Quiesce completes
|
||||
void Run(absl::AnyInvocable<void()> callback) override; |
||||
void Run(EventEngine::Closure* closure) override; |
||||
|
||||
// Forkable
|
||||
// These methods are exposed on the public object to allow for testing.
|
||||
void PrepareFork() override; |
||||
void PostforkParent() override; |
||||
void PostforkChild() override; |
||||
|
||||
private: |
||||
// A basic communication mechanism to signal waiting threads that work is
|
||||
// available.
|
||||
class WorkSignal { |
||||
public: |
||||
void Signal(); |
||||
void SignalAll(); |
||||
// Returns whether a timeout occurred.
|
||||
bool WaitWithTimeout(grpc_core::Duration time); |
||||
|
||||
private: |
||||
grpc_core::Mutex mu_; |
||||
grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
// Types of thread counts.
|
||||
// Note this is intentionally not an enum class, the keys are used as indexes
|
||||
// into the ThreadCount's private array.
|
||||
enum CounterType { |
||||
kLivingThreadCount = 0, |
||||
kBusyCount, |
||||
}; |
||||
|
||||
class ThreadCount { |
||||
public: |
||||
// Adds 1 to the thread count for that counter type.
|
||||
void Add(CounterType counter_type); |
||||
// Subtracts 1 from the thread count for that counter type.
|
||||
void Remove(CounterType counter_type); |
||||
// Blocks until the thread count for that type reaches `desired_threads`.
|
||||
void BlockUntilThreadCount(CounterType counter_type, int desired_threads, |
||||
const char* why, WorkSignal* work_signal); |
||||
// Returns the current thread count for the tracked type.
|
||||
size_t GetCount(CounterType counter_type); |
||||
|
||||
// Adds and removes thread counts on construction and destruction
|
||||
class AutoThreadCount { |
||||
public: |
||||
AutoThreadCount(ThreadCount* counter, CounterType counter_type); |
||||
~AutoThreadCount(); |
||||
|
||||
private: |
||||
ThreadCount* counter_; |
||||
CounterType counter_type_; |
||||
}; |
||||
|
||||
private: |
||||
std::atomic<size_t> thread_counts_[2]{{0}, {0}}; |
||||
}; |
||||
|
||||
// A pool of WorkQueues that participate in work stealing.
|
||||
//
|
||||
// Every worker thread registers and unregisters its thread-local thread pool
|
||||
// here, and steals closures from other threads when work is otherwise
|
||||
// unavailable.
|
||||
class TheftRegistry { |
||||
public: |
||||
// Allow any member of the registry to steal from the provided queue.
|
||||
void Enroll(WorkQueue* queue) ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Disallow work stealing from the provided queue.
|
||||
void Unenroll(WorkQueue* queue) ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Returns one closure from another thread, or nullptr if none are
|
||||
// available.
|
||||
EventEngine::Closure* StealOne() ABSL_LOCKS_EXCLUDED(mu_); |
||||
|
||||
private: |
||||
grpc_core::Mutex mu_; |
||||
absl::flat_hash_set<WorkQueue*> queues_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
// An implementation of the ThreadPool
|
||||
// This object is held as a shared_ptr between the owning ThreadPool and each
|
||||
// worker thread. This design allows a ThreadPool worker thread to be the last
|
||||
// owner of the ThreadPool itself.
|
||||
class WorkStealingThreadPoolImpl |
||||
: public std::enable_shared_from_this<WorkStealingThreadPoolImpl> { |
||||
public: |
||||
explicit WorkStealingThreadPoolImpl(size_t reserve_threads); |
||||
// Start all threads.
|
||||
void Start(); |
||||
// Add a closure to a work queue, preferably a thread-local queue if
|
||||
// available, otherwise the global queue.
|
||||
void Run(EventEngine::Closure* closure); |
||||
// Start a new thread.
|
||||
// The reason argument determines whether thread creation is rate-limited;
|
||||
// threads created to populate the initial pool are not rate-limited, but
|
||||
// all others thread creation scenarios are rate-limited.
|
||||
void StartThread(); |
||||
// Shut down the pool, and wait for all threads to exit.
|
||||
// This method is safe to call from within a ThreadPool thread.
|
||||
void Quiesce(); |
||||
// Sets a throttled state.
|
||||
// After the initial pool has been created, if the pool is backlogged when a
|
||||
// new thread has started, it is rate limited.
|
||||
// Returns the previous throttling state.
|
||||
bool SetThrottled(bool throttle); |
||||
// Set the shutdown flag.
|
||||
void SetShutdown(bool is_shutdown); |
||||
// Set the forking flag.
|
||||
void SetForking(bool is_forking); |
||||
// Forkable
|
||||
// Ensures that the thread pool is empty before forking.
|
||||
// Postfork parent and child have the same behavior.
|
||||
void PrepareFork(); |
||||
void Postfork(); |
||||
// Accessor methods
|
||||
bool IsShutdown(); |
||||
bool IsForking(); |
||||
bool IsQuiesced(); |
||||
size_t reserve_threads() { return reserve_threads_; } |
||||
ThreadCount* thread_count() { return &thread_count_; } |
||||
TheftRegistry* theft_registry() { return &theft_registry_; } |
||||
WorkQueue* queue() { return &queue_; } |
||||
WorkSignal* work_signal() { return &work_signal_; } |
||||
|
||||
private: |
||||
// Lifeguard monitors the pool and keeps it healthy.
|
||||
// It has two main responsibilities:
|
||||
// * scale the pool to match demand.
|
||||
// * distribute work to worker threads if the global queue is backing up
|
||||
// and there are threads that can accept work.
|
||||
class Lifeguard { |
||||
public: |
||||
Lifeguard(); |
||||
// Start the lifeguard thread.
|
||||
void Start(std::shared_ptr<WorkStealingThreadPoolImpl> pool); |
||||
// Block until the lifeguard thread is shut down.
|
||||
void BlockUntilShutdown(); |
||||
|
||||
private: |
||||
// The main body of the lifeguard thread.
|
||||
void LifeguardMain(); |
||||
// Starts a new thread if the pool is backlogged
|
||||
void MaybeStartNewThread(); |
||||
std::shared_ptr<WorkStealingThreadPoolImpl> pool_; |
||||
grpc_core::BackOff backoff_; |
||||
std::atomic<bool> thread_running_{false}; |
||||
}; |
||||
|
||||
const size_t reserve_threads_; |
||||
ThreadCount thread_count_; |
||||
TheftRegistry theft_registry_; |
||||
BasicWorkQueue queue_; |
||||
// Track shutdown and fork bits separately.
|
||||
// It's possible for a ThreadPool to initiate shut down while fork handlers
|
||||
// are running, and similarly possible for a fork event to occur during
|
||||
// shutdown.
|
||||
std::atomic<bool> shutdown_{false}; |
||||
std::atomic<bool> forking_{false}; |
||||
std::atomic<bool> quiesced_{false}; |
||||
std::atomic<uint64_t> last_started_thread_{0}; |
||||
// After pool creation we use this to rate limit creation of threads to one
|
||||
// at a time.
|
||||
std::atomic<bool> throttled_{false}; |
||||
WorkSignal work_signal_; |
||||
Lifeguard lifeguard_; |
||||
}; |
||||
|
||||
class ThreadState { |
||||
public: |
||||
explicit ThreadState(std::shared_ptr<WorkStealingThreadPoolImpl> pool); |
||||
void ThreadBody(); |
||||
void SleepIfRunning(); |
||||
bool Step(); |
||||
|
||||
private: |
||||
// pool_ must be the first member so that it is alive when the thread count
|
||||
// is decremented at time of destruction. This is necessary when this thread
|
||||
// state holds the last shared_ptr keeping the pool alive.
|
||||
std::shared_ptr<WorkStealingThreadPoolImpl> pool_; |
||||
// auto_thread_count_ must be the second member declared, so that the thread
|
||||
// count is decremented after all other state is cleaned up (preventing
|
||||
// leaks).
|
||||
ThreadCount::AutoThreadCount auto_thread_count_; |
||||
grpc_core::BackOff backoff_; |
||||
}; |
||||
|
||||
const std::shared_ptr<WorkStealingThreadPoolImpl> pool_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_WORK_STEALING_THREAD_POOL_H
|
@ -1,184 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/work_queue.h" |
||||
|
||||
#include <utility> |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// ------ WorkQueue::Storage --------------------------------------------------
|
||||
|
||||
WorkQueue::Storage::Storage(EventEngine::Closure* closure) noexcept |
||||
: closure_(closure), |
||||
enqueued_( |
||||
grpc_core::Timestamp::Now().milliseconds_after_process_epoch()) {} |
||||
|
||||
WorkQueue::Storage::Storage(absl::AnyInvocable<void()> callback) noexcept |
||||
: closure_(SelfDeletingClosure::Create(std::move(callback))), |
||||
enqueued_( |
||||
grpc_core::Timestamp::Now().milliseconds_after_process_epoch()) {} |
||||
|
||||
WorkQueue::Storage::Storage(Storage&& other) noexcept |
||||
: closure_(other.closure_), enqueued_(other.enqueued_) {} |
||||
|
||||
WorkQueue::Storage& WorkQueue::Storage::operator=(Storage&& other) noexcept { |
||||
std::swap(closure_, other.closure_); |
||||
std::swap(enqueued_, other.enqueued_); |
||||
return *this; |
||||
} |
||||
|
||||
EventEngine::Closure* WorkQueue::Storage::closure() { return closure_; } |
||||
|
||||
// ------ WorkQueue -----------------------------------------------------------
|
||||
|
||||
// Returns whether the queue is empty
|
||||
bool WorkQueue::Empty() const { |
||||
return (most_recent_element_enqueue_timestamp_.load( |
||||
std::memory_order_relaxed) == kInvalidTimestamp && |
||||
oldest_enqueued_timestamp_.load(std::memory_order_relaxed) == |
||||
kInvalidTimestamp); |
||||
} |
||||
|
||||
grpc_core::Timestamp WorkQueue::OldestEnqueuedTimestamp() const { |
||||
int64_t front_of_queue_timestamp = |
||||
oldest_enqueued_timestamp_.load(std::memory_order_relaxed); |
||||
if (front_of_queue_timestamp != kInvalidTimestamp) { |
||||
return grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
front_of_queue_timestamp); |
||||
} |
||||
int64_t most_recent_millis = |
||||
most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed); |
||||
if (most_recent_millis == kInvalidTimestamp) { |
||||
return grpc_core::Timestamp::InfPast(); |
||||
} |
||||
return grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
most_recent_millis); |
||||
} |
||||
|
||||
EventEngine::Closure* WorkQueue::PopFront() ABSL_LOCKS_EXCLUDED(mu_) { |
||||
if (oldest_enqueued_timestamp_.load(std::memory_order_relaxed) != |
||||
kInvalidTimestamp) { |
||||
EventEngine::Closure* t = TryLockAndPop(/*front=*/true); |
||||
if (t != nullptr) return t; |
||||
} |
||||
if (most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed) != |
||||
kInvalidTimestamp) { |
||||
return TryPopMostRecentElement(); |
||||
} |
||||
return nullptr; |
||||
} |
||||
|
||||
EventEngine::Closure* WorkQueue::PopBack() { |
||||
if (most_recent_element_enqueue_timestamp_.load(std::memory_order_relaxed) != |
||||
kInvalidTimestamp) { |
||||
return TryPopMostRecentElement(); |
||||
} |
||||
if (oldest_enqueued_timestamp_.load(std::memory_order_relaxed) != |
||||
kInvalidTimestamp) { |
||||
EventEngine::Closure* t = TryLockAndPop(/*front=*/false); |
||||
if (t != nullptr) return t; |
||||
} |
||||
return nullptr; |
||||
} |
||||
|
||||
void WorkQueue::Add(EventEngine::Closure* closure) { |
||||
AddInternal(Storage(closure)); |
||||
} |
||||
|
||||
void WorkQueue::Add(absl::AnyInvocable<void()> invocable) { |
||||
AddInternal(Storage(std::move(invocable))); |
||||
} |
||||
|
||||
void WorkQueue::AddInternal(Storage&& storage) { |
||||
Storage previous_most_recent; |
||||
int64_t previous_ts; |
||||
{ |
||||
absl::optional<Storage> tmp_element; |
||||
{ |
||||
grpc_core::MutexLock lock(&most_recent_element_lock_); |
||||
previous_ts = most_recent_element_enqueue_timestamp_.exchange( |
||||
storage.enqueued(), std::memory_order_relaxed); |
||||
tmp_element = std::exchange(most_recent_element_, std::move(storage)); |
||||
} |
||||
if (!tmp_element.has_value() || previous_ts == kInvalidTimestamp) return; |
||||
previous_most_recent = std::move(*tmp_element); |
||||
} |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (elements_.empty()) { |
||||
oldest_enqueued_timestamp_.store(previous_ts, std::memory_order_relaxed); |
||||
} |
||||
elements_.push_back(std::move(previous_most_recent)); |
||||
} |
||||
|
||||
EventEngine::Closure* WorkQueue::TryLockAndPop(bool front) |
||||
ABSL_LOCKS_EXCLUDED(mu_) { |
||||
// Do not block the worker if there are other workers trying to pop
|
||||
// tasks from this queue.
|
||||
if (!mu_.TryLock()) return nullptr; |
||||
auto ret = PopLocked(front); |
||||
mu_.Unlock(); |
||||
return ret; |
||||
} |
||||
|
||||
EventEngine::Closure* WorkQueue::PopLocked(bool front) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
if (GPR_UNLIKELY(elements_.empty())) { |
||||
if (most_recent_element_enqueue_timestamp_.load( |
||||
std::memory_order_relaxed) == kInvalidTimestamp) { |
||||
return nullptr; |
||||
} |
||||
return TryPopMostRecentElement(); |
||||
} |
||||
// the queue has elements, let's pop one and update timestamps
|
||||
Storage ret_s; |
||||
if (front) { |
||||
ret_s = std::move(elements_.front()); |
||||
elements_.pop_front(); |
||||
} else { |
||||
ret_s = std::move(elements_.back()); |
||||
elements_.pop_back(); |
||||
} |
||||
if (elements_.empty()) { |
||||
oldest_enqueued_timestamp_.store(kInvalidTimestamp, |
||||
std::memory_order_relaxed); |
||||
} else if (front) { |
||||
oldest_enqueued_timestamp_.store(elements_.front().enqueued(), |
||||
std::memory_order_relaxed); |
||||
} |
||||
return ret_s.closure(); |
||||
} |
||||
|
||||
EventEngine::Closure* WorkQueue::TryPopMostRecentElement() { |
||||
if (!most_recent_element_lock_.TryLock()) return nullptr; |
||||
if (GPR_UNLIKELY(!most_recent_element_.has_value())) { |
||||
most_recent_element_lock_.Unlock(); |
||||
return nullptr; |
||||
} |
||||
most_recent_element_enqueue_timestamp_.store(kInvalidTimestamp, |
||||
std::memory_order_relaxed); |
||||
absl::optional<Storage> tmp = |
||||
std::exchange(most_recent_element_, absl::nullopt); |
||||
most_recent_element_lock_.Unlock(); |
||||
return tmp->closure(); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -1,121 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <atomic> |
||||
#include <deque> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/functional/any_invocable.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// A fast work queue based lightly on an internal Google implementation.
|
||||
//
|
||||
// This uses atomics to access the most recent element in the queue, making it
|
||||
// fast for LIFO operations. Accessing the oldest (next) element requires taking
|
||||
// a mutex lock.
|
||||
class WorkQueue { |
||||
public: |
||||
// comparable to Timestamp::milliseconds_after_process_epoch()
|
||||
static const int64_t kInvalidTimestamp = -1; |
||||
|
||||
WorkQueue() = default; |
||||
// Returns whether the queue is empty
|
||||
bool Empty() const; |
||||
// Returns the Timestamp of when the most recently-added element was
|
||||
// enqueued.
|
||||
grpc_core::Timestamp OldestEnqueuedTimestamp() const; |
||||
// Returns the next (oldest) element from the queue, or nullopt if empty
|
||||
EventEngine::Closure* PopFront() ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Returns the most recent element from the queue, or nullopt if empty
|
||||
EventEngine::Closure* PopBack(); |
||||
// Adds a closure to the back of the queue
|
||||
void Add(EventEngine::Closure* closure); |
||||
// Wraps an AnyInvocable and adds it to the back of the queue
|
||||
void Add(absl::AnyInvocable<void()> invocable); |
||||
|
||||
private: |
||||
class Storage { |
||||
public: |
||||
Storage() = default; |
||||
// Take a non-owned Closure*
|
||||
// Requires an exec_ctx on the stack
|
||||
// TODO(ctiller): replace with an alternative time source
|
||||
explicit Storage(EventEngine::Closure* closure) noexcept; |
||||
// Wrap an AnyInvocable into a Closure.
|
||||
// The closure must be executed or explicitly deleted to prevent memory
|
||||
// leaks. Requires an exec_ctx on the stack
|
||||
// TODO(ctiller): replace with an alternative time source
|
||||
explicit Storage(absl::AnyInvocable<void()> callback) noexcept; |
||||
~Storage() = default; |
||||
// not copyable
|
||||
Storage(const Storage&) = delete; |
||||
Storage& operator=(const Storage&) = delete; |
||||
// moveable
|
||||
Storage(Storage&& other) noexcept; |
||||
Storage& operator=(Storage&& other) noexcept; |
||||
// Is this enqueued?
|
||||
int64_t enqueued() const { return enqueued_; } |
||||
// Get the stored closure, or wrapped AnyInvocable
|
||||
EventEngine::Closure* closure(); |
||||
|
||||
private: |
||||
EventEngine::Closure* closure_ = nullptr; |
||||
int64_t enqueued_ = kInvalidTimestamp; |
||||
}; |
||||
|
||||
// Attempts to pop from the front of the queue (oldest).
|
||||
// This will return nullopt if the queue is empty, or if other workers
|
||||
// are already attempting to pop from this queue.
|
||||
EventEngine::Closure* TryLockAndPop(bool front) ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Internal implementation, helps with thread safety analysis in TryLockAndPop
|
||||
EventEngine::Closure* PopLocked(bool front) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
// Attempts to pop from the back of the queue (most recent).
|
||||
// This will return nullopt if the queue is empty, or if other workers
|
||||
// are already attempting to pop from this queue.
|
||||
EventEngine::Closure* TryPopMostRecentElement(); |
||||
// Common code for the Add methods
|
||||
void AddInternal(Storage&& storage); |
||||
|
||||
// The managed items in the queue
|
||||
std::deque<Storage> elements_ ABSL_GUARDED_BY(mu_); |
||||
// The most recently enqueued element. This is reserved from work stealing
|
||||
absl::optional<Storage> most_recent_element_ |
||||
ABSL_GUARDED_BY(most_recent_element_lock_); |
||||
grpc_core::Mutex ABSL_ACQUIRED_AFTER(mu_) most_recent_element_lock_; |
||||
// TODO(hork): consider ABSL_CACHELINE_ALIGNED
|
||||
std::atomic<int64_t> most_recent_element_enqueue_timestamp_{ |
||||
kInvalidTimestamp}; |
||||
std::atomic<int64_t> oldest_enqueued_timestamp_{kInvalidTimestamp}; |
||||
grpc_core::Mutex mu_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_H
|
@ -0,0 +1,63 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h" |
||||
|
||||
#include <utility> |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
bool BasicWorkQueue::Empty() const { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
return q_.empty(); |
||||
} |
||||
|
||||
size_t BasicWorkQueue::Size() const { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
return q_.size(); |
||||
} |
||||
|
||||
EventEngine::Closure* BasicWorkQueue::PopMostRecent() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (q_.empty()) return nullptr; |
||||
auto tmp = q_.back(); |
||||
q_.pop_back(); |
||||
return tmp; |
||||
} |
||||
|
||||
EventEngine::Closure* BasicWorkQueue::PopOldest() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (q_.empty()) return nullptr; |
||||
auto tmp = q_.front(); |
||||
q_.pop_front(); |
||||
return tmp; |
||||
} |
||||
|
||||
void BasicWorkQueue::Add(EventEngine::Closure* closure) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
q_.push_back(closure); |
||||
} |
||||
|
||||
void BasicWorkQueue::Add(absl::AnyInvocable<void()> invocable) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
q_.push_back(SelfDeletingClosure::Create(std::move(invocable))); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,71 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_BASIC_WORK_QUEUE_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_BASIC_WORK_QUEUE_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <deque> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/work_queue/work_queue.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// A basic WorkQueue implementation that guards an std::deque with a Mutex
|
||||
//
|
||||
// Implementation note: q_.back is the most recent. q_.front is the oldest. New
|
||||
// closures are added to the back.
|
||||
class BasicWorkQueue : public WorkQueue { |
||||
public: |
||||
BasicWorkQueue() = default; |
||||
// Returns whether the queue is empty
|
||||
bool Empty() const override ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Returns the size of the queue.
|
||||
size_t Size() const override ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Returns the most recent element from the queue, or nullptr if either empty
|
||||
// or the queue is under contention. This is the fastest way to retrieve
|
||||
// elements from the queue.
|
||||
//
|
||||
// This method may return nullptr even if the queue is not empty.
|
||||
EventEngine::Closure* PopMostRecent() override ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Returns the most recent element from the queue, or nullptr if either empty
|
||||
// or the queue is under contention.
|
||||
// This is expected to be the slower of the two ways to retrieve closures from
|
||||
// the queue.
|
||||
//
|
||||
// This method may return nullptr even if the queue is not empty.
|
||||
EventEngine::Closure* PopOldest() override ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Adds a closure to the queue.
|
||||
void Add(EventEngine::Closure* closure) override ABSL_LOCKS_EXCLUDED(mu_); |
||||
// Wraps an AnyInvocable and adds it to the the queue.
|
||||
void Add(absl::AnyInvocable<void()> invocable) override |
||||
ABSL_LOCKS_EXCLUDED(mu_); |
||||
|
||||
private: |
||||
mutable grpc_core::Mutex mu_; |
||||
std::deque<EventEngine::Closure*> q_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_BASIC_WORK_QUEUE_H
|
@ -0,0 +1,62 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_WORK_QUEUE_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_WORK_QUEUE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// An interface for thread-safe EventEngine callback work queues.
|
||||
//
|
||||
// Implementations should be optimized for LIFO operations using PopMostRecent.
|
||||
// All methods must be guaranteed thread-safe.
|
||||
class WorkQueue { |
||||
public: |
||||
virtual ~WorkQueue() = default; |
||||
// Returns whether the queue is empty.
|
||||
virtual bool Empty() const = 0; |
||||
// Returns the size of the queue.
|
||||
virtual size_t Size() const = 0; |
||||
// Returns the most recent element from the queue. This is the fastest way to
|
||||
// retrieve elements from the queue.
|
||||
//
|
||||
// Implementations are permitted to return nullptr even if the queue is not
|
||||
// empty. This is to support potential optimizations.
|
||||
virtual EventEngine::Closure* PopMostRecent() = 0; |
||||
// Returns the most recent element from the queue, or nullptr if either empty
|
||||
// or the queue is under contention.
|
||||
// This is expected to be the slower of the two ways to retrieve closures from
|
||||
// the queue.
|
||||
//
|
||||
// Implementations are permitted to return nullptr even if the queue is not
|
||||
// empty. This is to support potential optimizations.
|
||||
virtual EventEngine::Closure* PopOldest() = 0; |
||||
// Adds a closure to the queue.
|
||||
virtual void Add(EventEngine::Closure* closure) = 0; |
||||
// Wraps an AnyInvocable and adds it to the the queue.
|
||||
virtual void Add(absl::AnyInvocable<void()> invocable) = 0; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WORK_QUEUE_WORK_QUEUE_H
|
@ -0,0 +1,209 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <deque> |
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace { |
||||
|
||||
using ::grpc_event_engine::experimental::AnyInvocableClosure; |
||||
using ::grpc_event_engine::experimental::BasicWorkQueue; |
||||
using ::grpc_event_engine::experimental::EventEngine; |
||||
|
||||
grpc_core::Mutex globalMu; |
||||
BasicWorkQueue globalWorkQueue; |
||||
std::deque<EventEngine::Closure*> globalDeque; |
||||
|
||||
// --- Multithreaded Tests ---------------------------------------------------
|
||||
|
||||
void MultithreadedTestArguments(benchmark::internal::Benchmark* b) { |
||||
b->Range(1, 512) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime() |
||||
->Threads(1) |
||||
->Threads(4) |
||||
->ThreadPerCpu(); |
||||
} |
||||
|
||||
void BM_MultithreadedWorkQueuePopOldest(benchmark::State& state) { |
||||
AnyInvocableClosure closure([] {}); |
||||
int element_count = state.range(0); |
||||
double pop_attempts = 0; |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure); |
||||
int cnt = 0; |
||||
do { |
||||
if (++pop_attempts && globalWorkQueue.PopOldest() != nullptr) ++cnt; |
||||
} while (cnt < element_count); |
||||
} |
||||
state.counters["added"] = element_count * state.iterations(); |
||||
state.counters["pop_rate"] = benchmark::Counter( |
||||
element_count * state.iterations(), benchmark::Counter::kIsRate); |
||||
state.counters["pop_attempts"] = pop_attempts; |
||||
// Rough measurement of queue contention.
|
||||
// WorkQueue::Pop* may return nullptr when the queue is non-empty, usually
|
||||
// when under thread contention. hit_rate is the ratio of pop attempts to
|
||||
// closure executions.
|
||||
state.counters["hit_rate"] = |
||||
benchmark::Counter(element_count * state.iterations() / pop_attempts, |
||||
benchmark::Counter::kAvgThreads); |
||||
if (state.thread_index() == 0) { |
||||
GPR_ASSERT(globalWorkQueue.Empty()); |
||||
} |
||||
} |
||||
BENCHMARK(BM_MultithreadedWorkQueuePopOldest) |
||||
->Apply(MultithreadedTestArguments); |
||||
|
||||
void BM_MultithreadedWorkQueuePopMostRecent(benchmark::State& state) { |
||||
AnyInvocableClosure closure([] {}); |
||||
int element_count = state.range(0); |
||||
double pop_attempts = 0; |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure); |
||||
int cnt = 0; |
||||
do { |
||||
if (++pop_attempts && globalWorkQueue.PopMostRecent() != nullptr) ++cnt; |
||||
} while (cnt < element_count); |
||||
} |
||||
state.counters["added"] = element_count * state.iterations(); |
||||
state.counters["pop_rate"] = benchmark::Counter( |
||||
element_count * state.iterations(), benchmark::Counter::kIsRate); |
||||
state.counters["pop_attempts"] = pop_attempts; |
||||
state.counters["hit_rate"] = |
||||
benchmark::Counter(element_count * state.iterations() / pop_attempts, |
||||
benchmark::Counter::kAvgThreads); |
||||
if (state.thread_index() == 0) { |
||||
GPR_ASSERT(globalWorkQueue.Empty()); |
||||
} |
||||
} |
||||
BENCHMARK(BM_MultithreadedWorkQueuePopMostRecent) |
||||
->Apply(MultithreadedTestArguments); |
||||
|
||||
void BM_MultithreadedStdDequeLIFO(benchmark::State& state) { |
||||
int element_count = state.range(0); |
||||
AnyInvocableClosure closure([] {}); |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) { |
||||
grpc_core::MutexLock lock(&globalMu); |
||||
globalDeque.push_back(&closure); |
||||
} |
||||
for (int i = 0; i < element_count; i++) { |
||||
grpc_core::MutexLock lock(&globalMu); |
||||
EventEngine::Closure* popped = globalDeque.back(); |
||||
globalDeque.pop_back(); |
||||
GPR_ASSERT(popped != nullptr); |
||||
} |
||||
} |
||||
state.counters["added"] = element_count * state.iterations(); |
||||
state.counters["pop_attempts"] = state.counters["added"]; |
||||
state.counters["pop_rate"] = benchmark::Counter( |
||||
element_count * state.iterations(), benchmark::Counter::kIsRate); |
||||
state.counters["hit_rate"] = |
||||
benchmark::Counter(1, benchmark::Counter::kAvgThreads); |
||||
} |
||||
BENCHMARK(BM_MultithreadedStdDequeLIFO)->Apply(MultithreadedTestArguments); |
||||
|
||||
// --- Basic Functionality Tests ---------------------------------------------
|
||||
|
||||
void BM_WorkQueueIntptrPopMostRecent(benchmark::State& state) { |
||||
BasicWorkQueue queue; |
||||
grpc_event_engine::experimental::AnyInvocableClosure closure([] {}); |
||||
int element_count = state.range(0); |
||||
for (auto _ : state) { |
||||
int cnt = 0; |
||||
for (int i = 0; i < element_count; i++) queue.Add(&closure); |
||||
do { |
||||
if (queue.PopMostRecent() != nullptr) ++cnt; |
||||
} while (cnt < element_count); |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Pop Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
} |
||||
BENCHMARK(BM_WorkQueueIntptrPopMostRecent) |
||||
->Range(1, 512) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
|
||||
void BM_WorkQueueClosureExecution(benchmark::State& state) { |
||||
BasicWorkQueue queue; |
||||
int element_count = state.range(0); |
||||
int run_count = 0; |
||||
grpc_event_engine::experimental::AnyInvocableClosure closure( |
||||
[&run_count] { ++run_count; }); |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) queue.Add(&closure); |
||||
do { |
||||
queue.PopMostRecent()->Run(); |
||||
} while (run_count < element_count); |
||||
run_count = 0; |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Pop Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
} |
||||
BENCHMARK(BM_WorkQueueClosureExecution) |
||||
->Range(8, 128) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
|
||||
void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) { |
||||
BasicWorkQueue queue; |
||||
int element_count = state.range(0); |
||||
int run_count = 0; |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) { |
||||
queue.Add([&run_count] { ++run_count; }); |
||||
} |
||||
do { |
||||
queue.PopMostRecent()->Run(); |
||||
} while (run_count < element_count); |
||||
run_count = 0; |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Pop Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
} |
||||
BENCHMARK(BM_WorkQueueAnyInvocableExecution) |
||||
->Range(8, 128) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
|
||||
} // namespace
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
::benchmark::Initialize(&argc, argv); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
return 0; |
||||
} |
@ -1,314 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <cmath> |
||||
#include <deque> |
||||
#include <sstream> |
||||
|
||||
// ensure assert() is enabled
|
||||
#undef NDEBUG |
||||
#include <cassert> |
||||
|
||||
#include <benchmark/benchmark.h> |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/event_engine/work_queue.h" |
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace { |
||||
|
||||
using ::grpc_event_engine::experimental::AnyInvocableClosure; |
||||
using ::grpc_event_engine::experimental::EventEngine; |
||||
using ::grpc_event_engine::experimental::WorkQueue; |
||||
|
||||
grpc_core::Mutex globalMu; |
||||
std::vector<WorkQueue*>* globalWorkQueueList; |
||||
std::vector<std::deque<EventEngine::Closure*>*>* globalDequeList; |
||||
std::vector<grpc_core::Mutex>* globalDequeMutexList; |
||||
|
||||
void GlobalSetup(const benchmark::State& state) { |
||||
// called for every test, resets all state
|
||||
globalWorkQueueList = new std::vector<WorkQueue*>(); |
||||
globalWorkQueueList->reserve(state.threads()); |
||||
globalDequeList = new std::vector<std::deque<EventEngine::Closure*>*>(); |
||||
globalDequeList->reserve(state.threads()); |
||||
globalDequeMutexList = new std::vector<grpc_core::Mutex>( |
||||
std::vector<grpc_core::Mutex>(state.threads())); |
||||
} |
||||
|
||||
void GlobalTeardown(const benchmark::State& /* state */) { |
||||
// called for every test, resets all state
|
||||
delete globalWorkQueueList; |
||||
delete globalDequeList; |
||||
delete globalDequeMutexList; |
||||
} |
||||
|
||||
void BM_WorkQueueIntptrPopFront(benchmark::State& state) { |
||||
WorkQueue queue; |
||||
grpc_event_engine::experimental::AnyInvocableClosure closure([] {}); |
||||
int element_count = state.range(0); |
||||
for (auto _ : state) { |
||||
int cnt = 0; |
||||
for (int i = 0; i < element_count; i++) queue.Add(&closure); |
||||
absl::optional<EventEngine::Closure*> popped; |
||||
cnt = 0; |
||||
do { |
||||
popped = queue.PopFront(); |
||||
if (popped.has_value()) ++cnt; |
||||
} while (cnt < element_count); |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Steal Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
} |
||||
BENCHMARK(BM_WorkQueueIntptrPopFront) |
||||
->Setup(GlobalSetup) |
||||
->Teardown(GlobalTeardown) |
||||
->Range(1, 512) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
|
||||
void BM_MultithreadedWorkQueuePopBack(benchmark::State& state) { |
||||
if (state.thread_index() == 0) (*globalWorkQueueList)[0] = new WorkQueue(); |
||||
AnyInvocableClosure closure([] {}); |
||||
int element_count = state.range(0); |
||||
for (auto _ : state) { |
||||
int cnt = 0; |
||||
auto* queue = (*globalWorkQueueList)[0]; |
||||
for (int i = 0; i < element_count; i++) queue->Add(&closure); |
||||
absl::optional<EventEngine::Closure*> popped; |
||||
cnt = 0; |
||||
do { |
||||
popped = queue->PopBack(); |
||||
if (popped.has_value()) ++cnt; |
||||
} while (cnt < element_count); |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Steal Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
if (state.thread_index() == 0) { |
||||
delete (*globalWorkQueueList)[0]; |
||||
} |
||||
} |
||||
BENCHMARK(BM_MultithreadedWorkQueuePopBack) |
||||
->Setup(GlobalSetup) |
||||
->Teardown(GlobalTeardown) |
||||
->Range(1, 512) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime() |
||||
->Threads(1) |
||||
->Threads(4) |
||||
->ThreadPerCpu(); |
||||
|
||||
void BM_WorkQueueClosureExecution(benchmark::State& state) { |
||||
WorkQueue queue; |
||||
int element_count = state.range(0); |
||||
int run_count = 0; |
||||
grpc_event_engine::experimental::AnyInvocableClosure closure( |
||||
[&run_count] { ++run_count; }); |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) queue.Add(&closure); |
||||
do { |
||||
queue.PopFront()->Run(); |
||||
} while (run_count < element_count); |
||||
run_count = 0; |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Steal Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
} |
||||
BENCHMARK(BM_WorkQueueClosureExecution) |
||||
->Range(8, 128) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
|
||||
void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) { |
||||
WorkQueue queue; |
||||
int element_count = state.range(0); |
||||
int run_count = 0; |
||||
for (auto _ : state) { |
||||
for (int i = 0; i < element_count; i++) { |
||||
queue.Add([&run_count] { ++run_count; }); |
||||
} |
||||
do { |
||||
queue.PopFront()->Run(); |
||||
} while (run_count < element_count); |
||||
run_count = 0; |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Steal Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
} |
||||
BENCHMARK(BM_WorkQueueAnyInvocableExecution) |
||||
->Range(8, 128) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime(); |
||||
|
||||
void BM_StdDequeLIFO(benchmark::State& state) { |
||||
if (state.thread_index() == 0) { |
||||
(*globalDequeList)[0] = new std::deque<EventEngine::Closure*>(); |
||||
} |
||||
auto& mu = (*globalDequeMutexList)[0]; |
||||
int element_count = state.range(0); |
||||
AnyInvocableClosure closure([] {}); |
||||
for (auto _ : state) { |
||||
auto* queue = (*globalDequeList)[0]; |
||||
for (int i = 0; i < element_count; i++) { |
||||
grpc_core::MutexLock lock(&mu); |
||||
queue->emplace_back(&closure); |
||||
} |
||||
for (int i = 0; i < element_count; i++) { |
||||
grpc_core::MutexLock lock(&mu); |
||||
EventEngine::Closure* popped = queue->back(); |
||||
queue->pop_back(); |
||||
assert(popped != nullptr); |
||||
} |
||||
} |
||||
state.counters["Added"] = element_count * state.iterations(); |
||||
state.counters["Popped"] = state.counters["Added"]; |
||||
state.counters["Steal Rate"] = |
||||
benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate); |
||||
if (state.thread_index() == 0) { |
||||
delete (*globalDequeList)[0]; |
||||
} |
||||
} |
||||
BENCHMARK(BM_StdDequeLIFO) |
||||
->Setup(GlobalSetup) |
||||
->Teardown(GlobalTeardown) |
||||
->Range(1, 512) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime() |
||||
->Threads(1) |
||||
->Threads(4) |
||||
->ThreadPerCpu(); |
||||
|
||||
void PerThreadArguments(benchmark::internal::Benchmark* b) { |
||||
b->Setup(GlobalSetup) |
||||
->Teardown(GlobalTeardown) |
||||
->ArgsProduct({/*pop_attempts=*/{10, 50, 250}, |
||||
/*pct_fill=*/{2, 10, 50}}) |
||||
->UseRealTime() |
||||
->MeasureProcessCPUTime() |
||||
->Threads(10) |
||||
->ThreadPerCpu(); |
||||
} |
||||
|
||||
void BM_WorkQueuePerThread(benchmark::State& state) { |
||||
WorkQueue local_queue; |
||||
{ |
||||
grpc_core::MutexLock lock(&globalMu); |
||||
(*globalWorkQueueList)[state.thread_index()] = &local_queue; |
||||
} |
||||
AnyInvocableClosure closure([] {}); |
||||
int element_count = state.range(0); |
||||
float pct_fill = state.range(1) / 100.0; |
||||
for (auto _ : state) { |
||||
// sparsely populate a queue
|
||||
for (int i = 0; i < std::ceil(element_count * pct_fill); i++) { |
||||
local_queue.Add(&closure); |
||||
} |
||||
// attempt to pop from all thread queues `element_count` times
|
||||
int pop_attempts = 0; |
||||
auto iq = globalWorkQueueList->begin(); |
||||
while (pop_attempts++ < element_count) { |
||||
// may not get a value if the queue being looked at from another thread
|
||||
(*iq)->PopBack(); |
||||
if (iq == globalWorkQueueList->end()) { |
||||
iq = globalWorkQueueList->begin(); |
||||
} else { |
||||
iq++; |
||||
}; |
||||
} |
||||
} |
||||
state.counters["Added"] = |
||||
std::ceil(element_count * pct_fill) * state.iterations(); |
||||
state.counters["Steal Attempts"] = element_count * state.iterations(); |
||||
state.counters["Steal Rate"] = benchmark::Counter( |
||||
state.counters["Steal Attempts"], benchmark::Counter::kIsRate); |
||||
if (state.thread_index() == 0) { |
||||
for (auto* queue : *globalWorkQueueList) { |
||||
assert(queue->Empty()); |
||||
} |
||||
} |
||||
} |
||||
BENCHMARK(BM_WorkQueuePerThread)->Apply(PerThreadArguments); |
||||
|
||||
void BM_StdDequePerThread(benchmark::State& state) { |
||||
std::deque<EventEngine::Closure*> local_queue; |
||||
(*globalDequeList)[state.thread_index()] = &local_queue; |
||||
int element_count = state.range(0); |
||||
float pct_fill = state.range(1) / 100.0; |
||||
AnyInvocableClosure closure([] {}); |
||||
auto& local_mu = (*globalDequeMutexList)[state.thread_index()]; |
||||
for (auto _ : state) { |
||||
// sparsely populate a queue
|
||||
for (int i = 0; i < std::ceil(element_count * pct_fill); i++) { |
||||
grpc_core::MutexLock lock(&local_mu); |
||||
local_queue.emplace_back(&closure); |
||||
} |
||||
int pop_attempts = 0; |
||||
auto iq = globalDequeList->begin(); |
||||
auto mu = globalDequeMutexList->begin(); |
||||
while (pop_attempts++ < element_count) { |
||||
{ |
||||
grpc_core::MutexLock lock(&*mu); |
||||
if (!(*iq)->empty()) { |
||||
assert((*iq)->back() != nullptr); |
||||
(*iq)->pop_back(); |
||||
} |
||||
} |
||||
if (iq == globalDequeList->end()) { |
||||
iq = globalDequeList->begin(); |
||||
mu = globalDequeMutexList->begin(); |
||||
} else { |
||||
++iq; |
||||
++mu; |
||||
}; |
||||
} |
||||
} |
||||
state.counters["Added"] = |
||||
std::ceil(element_count * pct_fill) * state.iterations(); |
||||
state.counters["Steal Attempts"] = element_count * state.iterations(); |
||||
state.counters["Steal Rate"] = benchmark::Counter( |
||||
state.counters["Steal Attempts"], benchmark::Counter::kIsRate); |
||||
if (state.thread_index() == 0) { |
||||
for (auto* queue : *globalDequeList) { |
||||
assert(queue->empty()); |
||||
} |
||||
} |
||||
} |
||||
BENCHMARK(BM_StdDequePerThread)->Apply(PerThreadArguments); |
||||
|
||||
} // namespace
|
||||
|
||||
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
|
||||
// and others do not. This allows us to support both modes.
|
||||
namespace benchmark { |
||||
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } |
||||
} // namespace benchmark
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
::benchmark::Initialize(&argc, argv); |
||||
benchmark::RunTheBenchmarksNamespaced(); |
||||
return 0; |
||||
} |
Loading…
Reference in new issue