mirror of https://github.com/grpc/grpc.git
[event-engine] Fork timer code (#29816)
* move files * [event-engine] Fork timer code from iomgr * progress * thread-pool * x * fixes * tests * Automated change: Fix sanity tests * x * wip * Automated change: Fix sanity tests * timer-heap-test * flesh-things-out * wip * Automated change: Fix sanity tests * fix-test * fix * Automated change: Fix sanity tests * fix * fix windows * Automated change: Fix sanity tests * fix mac * fix * review feedback * fix * Automated change: Fix sanity tests * fixes * Automated change: Fix sanity tests * review feedback * Automated change: Fix sanity tests * fix * annotate * Automated change: Fix sanity tests Co-authored-by: ctiller <ctiller@users.noreply.github.com>pull/30017/head
parent
d0751191a9
commit
977ebbef09
36 changed files with 3037 additions and 188 deletions
@ -0,0 +1,123 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/thread_pool.h" |
||||
|
||||
#include "src/core/lib/gprpp/thd.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
ThreadPool::Thread::Thread(ThreadPool* pool) |
||||
: pool_(pool), |
||||
thd_( |
||||
"iomgr_eventengine_pool", |
||||
[](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); }, |
||||
this) { |
||||
thd_.Start(); |
||||
} |
||||
ThreadPool::Thread::~Thread() { thd_.Join(); } |
||||
|
||||
void ThreadPool::Thread::ThreadFunc() { |
||||
pool_->ThreadFunc(); |
||||
// Now that we have killed ourselves, we should reduce the thread count
|
||||
grpc_core::MutexLock lock(&pool_->mu_); |
||||
pool_->nthreads_--; |
||||
// Move ourselves to dead list
|
||||
pool_->dead_threads_.push_back(this); |
||||
|
||||
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { |
||||
pool_->shutdown_cv_.Signal(); |
||||
} |
||||
} |
||||
|
||||
void ThreadPool::ThreadFunc() { |
||||
for (;;) { |
||||
// Wait until work is available or we are shutting down.
|
||||
grpc_core::ReleasableMutexLock lock(&mu_); |
||||
if (!shutdown_ && callbacks_.empty()) { |
||||
// If there are too many threads waiting, then quit this thread
|
||||
if (threads_waiting_ >= reserve_threads_) { |
||||
break; |
||||
} |
||||
threads_waiting_++; |
||||
cv_.Wait(&mu_); |
||||
threads_waiting_--; |
||||
} |
||||
// Drain callbacks before considering shutdown to ensure all work
|
||||
// gets completed.
|
||||
if (!callbacks_.empty()) { |
||||
auto cb = callbacks_.front(); |
||||
callbacks_.pop(); |
||||
lock.Release(); |
||||
cb(); |
||||
} else if (shutdown_) { |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
ThreadPool::ThreadPool(int reserve_threads) |
||||
: shutdown_(false), |
||||
reserve_threads_(reserve_threads), |
||||
nthreads_(0), |
||||
threads_waiting_(0) { |
||||
for (int i = 0; i < reserve_threads_; i++) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
nthreads_++; |
||||
new Thread(this); |
||||
} |
||||
} |
||||
|
||||
void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) { |
||||
for (auto* t : *tlist) delete t; |
||||
tlist->clear(); |
||||
} |
||||
|
||||
ThreadPool::~ThreadPool() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
shutdown_ = true; |
||||
cv_.SignalAll(); |
||||
while (nthreads_ != 0) { |
||||
shutdown_cv_.Wait(&mu_); |
||||
} |
||||
ReapThreads(&dead_threads_); |
||||
} |
||||
|
||||
void ThreadPool::Add(const std::function<void()>& callback) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
// Add works to the callbacks list
|
||||
callbacks_.push(callback); |
||||
// Increase pool size or notify as needed
|
||||
if (threads_waiting_ == 0) { |
||||
// Kick off a new thread
|
||||
nthreads_++; |
||||
new Thread(this); |
||||
} else { |
||||
cv_.Signal(); |
||||
} |
||||
// Also use this chance to harvest dead threads
|
||||
if (!dead_threads_.empty()) { |
||||
ReapThreads(&dead_threads_); |
||||
} |
||||
} |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,70 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_THREAD_POOL_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_THREAD_POOL_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <functional> |
||||
#include <queue> |
||||
#include <vector> |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/thd.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
class ThreadPool final { |
||||
public: |
||||
explicit ThreadPool(int reserve_threads); |
||||
~ThreadPool(); |
||||
|
||||
void Add(const std::function<void()>& callback); |
||||
|
||||
private: |
||||
class Thread { |
||||
public: |
||||
explicit Thread(ThreadPool* pool); |
||||
~Thread(); |
||||
|
||||
private: |
||||
ThreadPool* pool_; |
||||
grpc_core::Thread thd_; |
||||
void ThreadFunc(); |
||||
}; |
||||
|
||||
void ThreadFunc(); |
||||
static void ReapThreads(std::vector<Thread*>* tlist); |
||||
|
||||
grpc_core::Mutex mu_; |
||||
grpc_core::CondVar cv_; |
||||
grpc_core::CondVar shutdown_cv_; |
||||
bool shutdown_; |
||||
std::queue<std::function<void()>> callbacks_; |
||||
int reserve_threads_; |
||||
int nthreads_; |
||||
int threads_waiting_; |
||||
std::vector<Thread*> dead_threads_; |
||||
}; |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_THREAD_POOL_H
|
@ -0,0 +1,62 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/time_averaged_stats.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
TimeAveragedStats::TimeAveragedStats(double init_avg, double regress_weight, |
||||
double persistence_factor) |
||||
: init_avg_(init_avg), |
||||
regress_weight_(regress_weight), |
||||
persistence_factor_(persistence_factor) {} |
||||
|
||||
void TimeAveragedStats::AddSample(double value) { |
||||
batch_total_value_ += value; |
||||
++batch_num_samples_; |
||||
} |
||||
|
||||
double TimeAveragedStats::UpdateAverage() { |
||||
/* Start with the current batch: */ |
||||
double weighted_sum = batch_total_value_; |
||||
double total_weight = batch_num_samples_; |
||||
if (regress_weight_ > 0) { |
||||
/* Add in the regression towards init_avg_: */ |
||||
weighted_sum += regress_weight_ * init_avg_; |
||||
total_weight += regress_weight_; |
||||
} |
||||
if (persistence_factor_ > 0) { |
||||
/* Add in the persistence: */ |
||||
const double prev_sample_weight = |
||||
persistence_factor_ * aggregate_total_weight_; |
||||
weighted_sum += prev_sample_weight * aggregate_weighted_avg_; |
||||
total_weight += prev_sample_weight; |
||||
} |
||||
aggregate_weighted_avg_ = |
||||
(total_weight > 0) ? (weighted_sum / total_weight) : init_avg_; |
||||
aggregate_total_weight_ = total_weight; |
||||
batch_num_samples_ = 0; |
||||
batch_total_value_ = 0; |
||||
return aggregate_weighted_avg_; |
||||
} |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,81 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIME_AVERAGED_STATS_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIME_AVERAGED_STATS_H |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
/* This tracks a time-decaying weighted average. It works by collecting
|
||||
batches of samples and then mixing their average into a time-decaying |
||||
weighted mean. It is designed for batch operations where we do many adds |
||||
before updating the average. */ |
||||
|
||||
class TimeAveragedStats { |
||||
public: |
||||
TimeAveragedStats(double init_avg, double regress_weight, |
||||
double persistence_factor); |
||||
|
||||
/* Add a sample to the current batch. */ |
||||
void AddSample(double value); |
||||
/* Complete a batch and compute the new estimate of the average sample
|
||||
value. */ |
||||
double UpdateAverage(); |
||||
|
||||
double aggregate_weighted_avg() const { return aggregate_weighted_avg_; } |
||||
double aggregate_total_weight() const { return aggregate_total_weight_; } |
||||
|
||||
private: |
||||
/* The initial average value. This is the reported average until the first
|
||||
grpc_time_averaged_stats_update_average call. If a positive regress_weight |
||||
is used, we also regress towards this value on each update. */ |
||||
const double init_avg_; |
||||
/* The sample weight of "init_avg" that is mixed in with each call to
|
||||
grpc_time_averaged_stats_update_average. If the calls to |
||||
grpc_time_averaged_stats_add_sample stop, this will cause the average to |
||||
regress back to the mean. This should be non-negative. Set it to 0 to |
||||
disable the bias. A value of 1 has the effect of adding in 1 bonus sample |
||||
with value init_avg to each sample period. */ |
||||
const double regress_weight_; |
||||
/* This determines the rate of decay of the time-averaging from one period
|
||||
to the next by scaling the aggregate_total_weight of samples from prior |
||||
periods when combining with the latest period. It should be in the range |
||||
[0,1]. A higher value adapts more slowly. With a value of 0.5, if the |
||||
batches each have k samples, the samples_in_avg_ will grow to 2 k, so the |
||||
weighting of the time average will eventually be 1/3 new batch and 2/3 |
||||
old average. */ |
||||
const double persistence_factor_; |
||||
|
||||
/* The total value of samples since the last UpdateAverage(). */ |
||||
double batch_total_value_ = 0; |
||||
/* The number of samples since the last UpdateAverage(). */ |
||||
double batch_num_samples_ = 0; |
||||
/* The time-decayed sum of batch_num_samples_ over previous batches. This is
|
||||
the "weight" of the old aggregate_weighted_avg_ when updating the |
||||
average. */ |
||||
double aggregate_total_weight_ = 0; |
||||
/* A time-decayed average of the (batch_total_value_ / batch_num_samples_),
|
||||
computed by decaying the samples_in_avg_ weight in the weighted average. */ |
||||
double aggregate_weighted_avg_ = init_avg_; |
||||
}; |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIME_AVERAGED_STATS_H */ |
@ -0,0 +1,312 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
||||
|
||||
#include <algorithm> |
||||
#include <atomic> |
||||
#include <limits> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include <grpc/support/cpu.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
static const size_t kInvalidHeapIndex = std::numeric_limits<size_t>::max(); |
||||
static const double kAddDeadlineScale = 0.33; |
||||
static const double kMinQueueWindowDuration = 0.01; |
||||
static const double kMaxQueueWindowDuration = 1.0; |
||||
|
||||
grpc_core::Timestamp TimerList::Shard::ComputeMinDeadline() { |
||||
return heap.is_empty() |
||||
? queue_deadline_cap + grpc_core::Duration::Epsilon() |
||||
: grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
heap.Top()->deadline); |
||||
} |
||||
|
||||
TimerList::Shard::Shard() : stats(1.0 / kAddDeadlineScale, 0.1, 0.5) {} |
||||
|
||||
TimerList::TimerList(TimerListHost* host) |
||||
: host_(host), |
||||
num_shards_(grpc_core::Clamp(2 * gpr_cpu_num_cores(), 1u, 32u)), |
||||
min_timer_(host_->Now().milliseconds_after_process_epoch()), |
||||
shards_(new Shard[num_shards_]), |
||||
shard_queue_(new Shard*[num_shards_]) { |
||||
for (size_t i = 0; i < num_shards_; i++) { |
||||
Shard& shard = shards_[i]; |
||||
shard.queue_deadline_cap = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
min_timer_.load(std::memory_order_relaxed)); |
||||
shard.shard_queue_index = i; |
||||
shard.list.next = shard.list.prev = &shard.list; |
||||
shard.min_deadline = shard.ComputeMinDeadline(); |
||||
shard_queue_[i] = &shard; |
||||
} |
||||
} |
||||
|
||||
namespace { |
||||
/* returns true if the first element in the list */ |
||||
void ListJoin(Timer* head, Timer* timer) { |
||||
timer->next = head; |
||||
timer->prev = head->prev; |
||||
timer->next->prev = timer->prev->next = timer; |
||||
} |
||||
|
||||
void ListRemove(Timer* timer) { |
||||
timer->next->prev = timer->prev; |
||||
timer->prev->next = timer->next; |
||||
} |
||||
} // namespace
|
||||
|
||||
void TimerList::SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) { |
||||
Shard* temp; |
||||
temp = shard_queue_[first_shard_queue_index]; |
||||
shard_queue_[first_shard_queue_index] = |
||||
shard_queue_[first_shard_queue_index + 1]; |
||||
shard_queue_[first_shard_queue_index + 1] = temp; |
||||
shard_queue_[first_shard_queue_index]->shard_queue_index = |
||||
first_shard_queue_index; |
||||
shard_queue_[first_shard_queue_index + 1]->shard_queue_index = |
||||
first_shard_queue_index + 1; |
||||
} |
||||
|
||||
void TimerList::NoteDeadlineChange(Shard* shard) { |
||||
while (shard->shard_queue_index > 0 && |
||||
shard->min_deadline < |
||||
shard_queue_[shard->shard_queue_index - 1]->min_deadline) { |
||||
SwapAdjacentShardsInQueue(shard->shard_queue_index - 1); |
||||
} |
||||
while (shard->shard_queue_index < num_shards_ - 1 && |
||||
shard->min_deadline > |
||||
shard_queue_[shard->shard_queue_index + 1]->min_deadline) { |
||||
SwapAdjacentShardsInQueue(shard->shard_queue_index); |
||||
} |
||||
} |
||||
|
||||
void TimerList::TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
||||
experimental::EventEngine::Closure* closure) { |
||||
bool is_first_timer = false; |
||||
Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)]; |
||||
timer->closure = closure; |
||||
timer->deadline = deadline.milliseconds_after_process_epoch(); |
||||
|
||||
#ifndef NDEBUG |
||||
timer->hash_table_next = nullptr; |
||||
#endif |
||||
|
||||
{ |
||||
grpc_core::MutexLock lock(&shard->mu); |
||||
timer->pending = true; |
||||
grpc_core::Timestamp now = host_->Now(); |
||||
if (deadline <= now) { |
||||
deadline = now; |
||||
} |
||||
|
||||
shard->stats.AddSample((deadline - now).millis() / 1000.0); |
||||
|
||||
if (deadline < shard->queue_deadline_cap) { |
||||
is_first_timer = shard->heap.Add(timer); |
||||
} else { |
||||
timer->heap_index = kInvalidHeapIndex; |
||||
ListJoin(&shard->list, timer); |
||||
} |
||||
} |
||||
|
||||
/* Deadline may have decreased, we need to adjust the main queue. Note
|
||||
that there is a potential racy unlocked region here. There could be a |
||||
reordering of multiple TimerInit calls, at this point, but the < test |
||||
below should ensure that we err on the side of caution. There could |
||||
also be a race with TimerCheck, which might beat us to the lock. In |
||||
that case, it is possible that the timer that we added will have already |
||||
run by the time we hold the lock, but that too is a safe error. |
||||
Finally, it's possible that the TimerCheck that intervened failed to |
||||
trigger the new timer because the min_deadline hadn't yet been reduced. |
||||
In that case, the timer will simply have to wait for the next |
||||
TimerCheck. */ |
||||
if (is_first_timer) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (deadline < shard->min_deadline) { |
||||
grpc_core::Timestamp old_min_deadline = shard_queue_[0]->min_deadline; |
||||
shard->min_deadline = deadline; |
||||
NoteDeadlineChange(shard); |
||||
if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { |
||||
min_timer_.store(deadline.milliseconds_after_process_epoch(), |
||||
std::memory_order_relaxed); |
||||
host_->Kick(); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
bool TimerList::TimerCancel(Timer* timer) { |
||||
Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)]; |
||||
grpc_core::MutexLock lock(&shard->mu); |
||||
|
||||
if (timer->pending) { |
||||
timer->pending = false; |
||||
if (timer->heap_index == kInvalidHeapIndex) { |
||||
ListRemove(timer); |
||||
} else { |
||||
shard->heap.Remove(timer); |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
|
||||
/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
|
||||
all relevant timers in shard->list (i.e timers with deadlines earlier than |
||||
'queue_deadline_cap') into into shard->heap. |
||||
Returns 'true' if shard->heap has at least ONE element */ |
||||
bool TimerList::Shard::RefillHeap(grpc_core::Timestamp now) { |
||||
/* Compute the new queue window width and bound by the limits: */ |
||||
double computed_deadline_delta = stats.UpdateAverage() * kAddDeadlineScale; |
||||
double deadline_delta = |
||||
grpc_core::Clamp(computed_deadline_delta, kMinQueueWindowDuration, |
||||
kMaxQueueWindowDuration); |
||||
Timer *timer, *next; |
||||
|
||||
/* Compute the new cap and put all timers under it into the queue: */ |
||||
queue_deadline_cap = std::max(now, queue_deadline_cap) + |
||||
grpc_core::Duration::FromSecondsAsDouble(deadline_delta); |
||||
|
||||
for (timer = list.next; timer != &list; timer = next) { |
||||
next = timer->next; |
||||
auto timer_deadline = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
timer->deadline); |
||||
|
||||
if (timer_deadline < queue_deadline_cap) { |
||||
ListRemove(timer); |
||||
heap.Add(timer); |
||||
} |
||||
} |
||||
return !heap.is_empty(); |
||||
} |
||||
|
||||
/* This pops the next non-cancelled timer with deadline <= now from the
|
||||
queue, or returns NULL if there isn't one. */ |
||||
Timer* TimerList::Shard::PopOne(grpc_core::Timestamp now) { |
||||
Timer* timer; |
||||
for (;;) { |
||||
if (heap.is_empty()) { |
||||
if (now < queue_deadline_cap) return nullptr; |
||||
if (!RefillHeap(now)) return nullptr; |
||||
} |
||||
timer = heap.Top(); |
||||
auto timer_deadline = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
timer->deadline); |
||||
if (timer_deadline > now) return nullptr; |
||||
timer->pending = false; |
||||
heap.Pop(); |
||||
return timer; |
||||
} |
||||
} |
||||
|
||||
void TimerList::Shard::PopTimers( |
||||
grpc_core::Timestamp now, grpc_core::Timestamp* new_min_deadline, |
||||
std::vector<experimental::EventEngine::Closure*>* out) { |
||||
grpc_core::MutexLock lock(&mu); |
||||
while (Timer* timer = PopOne(now)) { |
||||
out->push_back(timer->closure); |
||||
} |
||||
*new_min_deadline = ComputeMinDeadline(); |
||||
} |
||||
|
||||
std::vector<experimental::EventEngine::Closure*> TimerList::FindExpiredTimers( |
||||
grpc_core::Timestamp now, grpc_core::Timestamp* next) { |
||||
grpc_core::Timestamp min_timer = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
min_timer_.load(std::memory_order_relaxed)); |
||||
|
||||
std::vector<experimental::EventEngine::Closure*> done; |
||||
if (now < min_timer) { |
||||
if (next != nullptr) *next = std::min(*next, min_timer); |
||||
return done; |
||||
} |
||||
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
|
||||
while (shard_queue_[0]->min_deadline < now || |
||||
(now != grpc_core::Timestamp::InfFuture() && |
||||
shard_queue_[0]->min_deadline == now)) { |
||||
grpc_core::Timestamp new_min_deadline; |
||||
|
||||
/* For efficiency, we pop as many available timers as we can from the
|
||||
shard. This may violate perfect timer deadline ordering, but that |
||||
shouldn't be a big deal because we don't make ordering guarantees. */ |
||||
shard_queue_[0]->PopTimers(now, &new_min_deadline, &done); |
||||
|
||||
/* An TimerInit() on the shard could intervene here, adding a new
|
||||
timer that is earlier than new_min_deadline. However, |
||||
TimerInit() will block on the mutex before it can call |
||||
set_min_deadline, so this one will complete first and then the Addtimer |
||||
will reduce the min_deadline (perhaps unnecessarily). */ |
||||
shard_queue_[0]->min_deadline = new_min_deadline; |
||||
NoteDeadlineChange(shard_queue_[0]); |
||||
} |
||||
|
||||
if (next) { |
||||
*next = std::min(*next, shard_queue_[0]->min_deadline); |
||||
} |
||||
|
||||
min_timer_.store( |
||||
shard_queue_[0]->min_deadline.milliseconds_after_process_epoch(), |
||||
std::memory_order_relaxed); |
||||
|
||||
return done; |
||||
} |
||||
|
||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> |
||||
TimerList::TimerCheck(grpc_core::Timestamp* next) { |
||||
// prelude
|
||||
grpc_core::Timestamp now = host_->Now(); |
||||
|
||||
/* fetch from a thread-local first: this avoids contention on a globally
|
||||
mutable cacheline in the common case */ |
||||
grpc_core::Timestamp min_timer = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
min_timer_.load(std::memory_order_relaxed)); |
||||
|
||||
if (now < min_timer) { |
||||
if (next != nullptr) { |
||||
*next = std::min(*next, min_timer); |
||||
} |
||||
return std::vector<experimental::EventEngine::Closure*>(); |
||||
} |
||||
|
||||
if (!checker_mu_.TryLock()) return absl::nullopt; |
||||
std::vector<experimental::EventEngine::Closure*> run = |
||||
FindExpiredTimers(now, next); |
||||
checker_mu_.Unlock(); |
||||
|
||||
return std::move(run); |
||||
} |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,193 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <vector> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/time_averaged_stats.h" |
||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
struct Timer { |
||||
int64_t deadline; |
||||
// kInvalidHeapIndex if not in heap.
|
||||
size_t heap_index; |
||||
bool pending; |
||||
struct Timer* next; |
||||
struct Timer* prev; |
||||
experimental::EventEngine::Closure* closure; |
||||
#ifndef NDEBUG |
||||
struct Timer* hash_table_next; |
||||
#endif |
||||
|
||||
grpc_event_engine::experimental::EventEngine::TaskHandle task_handle; |
||||
}; |
||||
|
||||
// Dependency injection: allow tests and/or TimerManager to inject
|
||||
// their own implementations of Now, Kick.
|
||||
class TimerListHost { |
||||
public: |
||||
// Return the current timestamp.
|
||||
// Abstracted so that tests can be run deterministically.
|
||||
virtual grpc_core::Timestamp Now() = 0; |
||||
// Wake up a thread to check for timers.
|
||||
virtual void Kick() = 0; |
||||
|
||||
protected: |
||||
~TimerListHost() = default; |
||||
}; |
||||
|
||||
class TimerList { |
||||
public: |
||||
explicit TimerList(TimerListHost* host); |
||||
|
||||
TimerList(const TimerList&) = delete; |
||||
TimerList& operator=(const TimerList&) = delete; |
||||
|
||||
/* Initialize *timer. When expired or canceled, closure will be called with
|
||||
error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled |
||||
(GRPC_ERROR_CANCELLED). *closure is guaranteed to be called exactly once, and |
||||
application code should check the error to determine how it was invoked. The |
||||
application callback is also responsible for maintaining information about |
||||
when to free up any user-level state. Behavior is undefined for a deadline of |
||||
grpc_core::Timestamp::InfFuture(). */ |
||||
void TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
||||
experimental::EventEngine::Closure* closure); |
||||
|
||||
/* Note that there is no timer destroy function. This is because the
|
||||
timer is a one-time occurrence with a guarantee that the callback will |
||||
be called exactly once, either at expiration or cancellation. Thus, all |
||||
the internal timer event management state is destroyed just before |
||||
that callback is invoked. If the user has additional state associated with |
||||
the timer, the user is responsible for determining when it is safe to |
||||
destroy that state. */ |
||||
|
||||
/* Cancel an *timer.
|
||||
There are three cases: |
||||
1. We normally cancel the timer |
||||
2. The timer has already run |
||||
3. We can't cancel the timer because it is "in flight". |
||||
|
||||
In all of these cases, the cancellation is still considered successful. |
||||
They are essentially distinguished in that the timer_cb will be run |
||||
exactly once from either the cancellation (with error GRPC_ERROR_CANCELLED) |
||||
or from the activation (with error GRPC_ERROR_NONE). |
||||
|
||||
Note carefully that the callback function MAY occur in the same callstack |
||||
as grpc_timer_cancel. It's expected that most timers will be cancelled |
||||
(their primary use is to implement deadlines), and so this code is |
||||
optimized such that cancellation costs as little as possible. Making |
||||
callbacks run inline matches this aim. |
||||
|
||||
Requires: cancel() must happen after init() on a given timer */ |
||||
bool TimerCancel(Timer* timer) GRPC_MUST_USE_RESULT; |
||||
|
||||
/* iomgr internal api for dealing with timers */ |
||||
|
||||
/* Check for timers to be run, and return them.
|
||||
Return nullopt if timers could not be checked due to contention with |
||||
another thread checking. |
||||
Return a vector of closures that *must* be run otherwise. |
||||
If next is non-null, TRY to update *next with the next running timer |
||||
IF that timer occurs before *next current value. |
||||
*next is never guaranteed to be updated on any given execution; however, |
||||
with high probability at least one thread in the system will see an update |
||||
at any time slice. */ |
||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> TimerCheck( |
||||
grpc_core::Timestamp* next); |
||||
|
||||
private: |
||||
/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
|
||||
* deadlines earlier than 'queue_deadline_cap' are maintained in the heap and |
||||
* others are maintained in the list (unordered). This helps to keep the |
||||
* number of elements in the heap low. |
||||
* |
||||
* The 'queue_deadline_cap' gets recomputed periodically based on the timer |
||||
* stats maintained in 'stats' and the relevant timers are then moved from the |
||||
* 'list' to 'heap'. |
||||
*/ |
||||
struct Shard { |
||||
Shard(); |
||||
|
||||
grpc_core::Timestamp ComputeMinDeadline() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu); |
||||
bool RefillHeap(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu); |
||||
Timer* PopOne(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu); |
||||
void PopTimers(grpc_core::Timestamp now, |
||||
grpc_core::Timestamp* new_min_deadline, |
||||
std::vector<experimental::EventEngine::Closure*>* out) |
||||
ABSL_LOCKS_EXCLUDED(mu); |
||||
|
||||
grpc_core::Mutex mu; |
||||
TimeAveragedStats stats ABSL_GUARDED_BY(mu); |
||||
/* All and only timers with deadlines < this will be in the heap. */ |
||||
grpc_core::Timestamp queue_deadline_cap ABSL_GUARDED_BY(mu); |
||||
/* The deadline of the next timer due in this shard. */ |
||||
grpc_core::Timestamp min_deadline ABSL_GUARDED_BY(&TimerList::mu_); |
||||
/* Index of this timer_shard in the g_shard_queue. */ |
||||
uint32_t shard_queue_index ABSL_GUARDED_BY(&TimerList::mu_); |
||||
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
|
||||
list have the top bit of their deadline set to 0. */ |
||||
TimerHeap heap ABSL_GUARDED_BY(mu); |
||||
/* This holds timers whose deadline is >= queue_deadline_cap. */ |
||||
Timer list ABSL_GUARDED_BY(mu); |
||||
}; |
||||
|
||||
void SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
void NoteDeadlineChange(Shard* shard) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
std::vector<experimental::EventEngine::Closure*> FindExpiredTimers( |
||||
grpc_core::Timestamp now, grpc_core::Timestamp* next); |
||||
|
||||
TimerListHost* const host_; |
||||
const size_t num_shards_; |
||||
grpc_core::Mutex mu_; |
||||
/* The deadline of the next timer due across all timer shards */ |
||||
std::atomic<uint64_t> min_timer_; |
||||
/* Allow only one FindExpiredTimers at once (used as a TryLock, protects no
|
||||
* fields but ensures limits on concurrency) */ |
||||
grpc_core::Mutex checker_mu_; |
||||
/* Array of timer shards. Whenever a timer (Timer *) is added, its address
|
||||
* is hashed to select the timer shard to add the timer to */ |
||||
const std::unique_ptr<Shard[]> shards_; |
||||
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
|
||||
* the deadline of the next timer in each shard). */ |
||||
const std::unique_ptr<Shard*[]> shard_queue_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_H */ |
@ -0,0 +1,107 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <algorithm> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
/* Adjusts a heap so as to move a hole at position i closer to the root,
|
||||
until a suitable position is found for element t. Then, copies t into that |
||||
position. This functor is called each time immediately after modifying a |
||||
value in the underlying container, with the offset of the modified element as |
||||
its argument. */ |
||||
void TimerHeap::AdjustUpwards(size_t i, Timer* t) { |
||||
while (i > 0) { |
||||
size_t parent = (i - 1) / 2; |
||||
if (timers_[parent]->deadline <= t->deadline) break; |
||||
timers_[i] = timers_[parent]; |
||||
timers_[i]->heap_index = i; |
||||
i = parent; |
||||
} |
||||
timers_[i] = t; |
||||
t->heap_index = i; |
||||
} |
||||
|
||||
/* Adjusts a heap so as to move a hole at position i farther away from the root,
|
||||
until a suitable position is found for element t. Then, copies t into that |
||||
position. */ |
||||
void TimerHeap::AdjustDownwards(size_t i, Timer* t) { |
||||
for (;;) { |
||||
size_t left_child = 1 + 2 * i; |
||||
if (left_child >= timers_.size()) break; |
||||
size_t right_child = left_child + 1; |
||||
size_t next_i = |
||||
right_child < timers_.size() && |
||||
timers_[left_child]->deadline > timers_[right_child]->deadline |
||||
? right_child |
||||
: left_child; |
||||
if (t->deadline <= timers_[next_i]->deadline) break; |
||||
timers_[i] = timers_[next_i]; |
||||
timers_[i]->heap_index = i; |
||||
i = next_i; |
||||
} |
||||
timers_[i] = t; |
||||
t->heap_index = i; |
||||
} |
||||
|
||||
void TimerHeap::NoteChangedPriority(Timer* timer) { |
||||
uint32_t i = timer->heap_index; |
||||
uint32_t parent = static_cast<uint32_t>((static_cast<int>(i) - 1) / 2); |
||||
if (timers_[parent]->deadline > timer->deadline) { |
||||
AdjustUpwards(i, timer); |
||||
} else { |
||||
AdjustDownwards(i, timer); |
||||
} |
||||
} |
||||
|
||||
bool TimerHeap::Add(Timer* timer) { |
||||
timer->heap_index = timers_.size(); |
||||
timers_.push_back(timer); |
||||
AdjustUpwards(timer->heap_index, timer); |
||||
return timer->heap_index == 0; |
||||
} |
||||
|
||||
void TimerHeap::Remove(Timer* timer) { |
||||
uint32_t i = timer->heap_index; |
||||
if (i == timers_.size() - 1) { |
||||
timers_.pop_back(); |
||||
return; |
||||
} |
||||
timers_[i] = timers_[timers_.size() - 1]; |
||||
timers_[i]->heap_index = i; |
||||
timers_.pop_back(); |
||||
NoteChangedPriority(timers_[i]); |
||||
} |
||||
|
||||
bool TimerHeap::is_empty() { return timers_.empty(); } |
||||
|
||||
Timer* TimerHeap::Top() { return timers_[0]; } |
||||
|
||||
void TimerHeap::Pop() { Remove(Top()); } |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,56 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_HEAP_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_HEAP_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <cstddef> |
||||
#include <vector> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
struct Timer; |
||||
|
||||
class TimerHeap { |
||||
public: |
||||
/* return true if the new timer is the first timer in the heap */ |
||||
bool Add(Timer* timer); |
||||
|
||||
void Remove(Timer* timer); |
||||
Timer* Top(); |
||||
void Pop(); |
||||
|
||||
bool is_empty(); |
||||
|
||||
const std::vector<Timer*>& TestOnlyGetTimers() const { return timers_; } |
||||
|
||||
private: |
||||
void AdjustUpwards(size_t i, Timer* t); |
||||
void AdjustDownwards(size_t i, Timer* t); |
||||
void NoteChangedPriority(Timer* timer); |
||||
|
||||
std::vector<Timer*> timers_; |
||||
}; |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_HEAP_H */ |
@ -0,0 +1,254 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer_manager.h" |
||||
|
||||
#include <algorithm> |
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/memory/memory.h" |
||||
#include "absl/time/time.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/impl/codegen/gpr_types.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/lib/gprpp/thd.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
namespace { |
||||
class ThreadCollector { |
||||
public: |
||||
ThreadCollector() = default; |
||||
~ThreadCollector(); |
||||
|
||||
void Collect(std::vector<grpc_core::Thread> threads) { |
||||
GPR_ASSERT(threads_.empty()); |
||||
threads_ = std::move(threads); |
||||
} |
||||
|
||||
private: |
||||
std::vector<grpc_core::Thread> threads_; |
||||
}; |
||||
|
||||
ThreadCollector::~ThreadCollector() { |
||||
for (auto& t : threads_) t.Join(); |
||||
} |
||||
} // namespace
|
||||
|
||||
void TimerManager::StartThread() { |
||||
++waiter_count_; |
||||
++thread_count_; |
||||
auto* thread = new RunThreadArgs(); |
||||
thread->self = this; |
||||
thread->thread = |
||||
grpc_core::Thread("timer_manager", &TimerManager::RunThread, thread); |
||||
thread->thread.Start(); |
||||
} |
||||
|
||||
void TimerManager::RunSomeTimers( |
||||
std::vector<experimental::EventEngine::Closure*> timers) { |
||||
// if there's something to execute...
|
||||
ThreadCollector collector; |
||||
{ |
||||
grpc_core::MutexLock lock(&mu_); |
||||
// remove a waiter from the pool, and start another thread if necessary
|
||||
--waiter_count_; |
||||
if (waiter_count_ == 0) { |
||||
// The number of timer threads is always increasing until all the threads
|
||||
// are stopped. In rare cases, if a large number of timers fire
|
||||
// simultaneously, we may end up using a large number of threads.
|
||||
// TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
|
||||
StartThread(); |
||||
} else { |
||||
// if there's no thread waiting with a timeout, kick an existing untimed
|
||||
// waiter so that the next deadline is not missed
|
||||
if (!has_timed_waiter_) { |
||||
cv_.Signal(); |
||||
} |
||||
} |
||||
} |
||||
for (auto* timer : timers) { |
||||
timer->Run(); |
||||
} |
||||
{ |
||||
grpc_core::MutexLock lock(&mu_); |
||||
collector.Collect(std::move(completed_threads_)); |
||||
// get ready to wait again
|
||||
++waiter_count_; |
||||
} |
||||
} |
||||
|
||||
// wait until 'next' (or forever if there is already a timed waiter in the pool)
|
||||
// returns true if the thread should continue executing (false if it should
|
||||
// shutdown)
|
||||
bool TimerManager::WaitUntil(grpc_core::Timestamp next) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
|
||||
if (shutdown_) { |
||||
return false; |
||||
} |
||||
|
||||
// TODO(ctiller): if there are too many waiting threads, this would be a good
|
||||
// place to exit the current thread.
|
||||
|
||||
// If kicked_ is true at this point, it means there was a kick from the timer
|
||||
// system that the timer-manager threads here missed. We cannot trust 'next'
|
||||
// here any longer (since there might be an earlier deadline). So if kicked_
|
||||
// is true at this point, we should quickly exit this and get the next
|
||||
// deadline from the timer system
|
||||
|
||||
if (!kicked_) { |
||||
// if there's no timed waiter, we should become one: that waiter waits
|
||||
// only until the next timer should expire. All other timers wait forever
|
||||
//
|
||||
// 'timed_waiter_generation_' is a global generation counter. The idea here
|
||||
// is that the thread becoming a timed-waiter increments and stores this
|
||||
// global counter locally in 'my_timed_waiter_generation' before going to
|
||||
// sleep. After waking up, if my_timed_waiter_generation ==
|
||||
// timed_waiter_generation_, it can be sure that it was the timed_waiter
|
||||
// thread (and that no other thread took over while this was asleep)
|
||||
//
|
||||
// Initialize my_timed_waiter_generation to some value that is NOT equal to
|
||||
// timed_waiter_generation_
|
||||
uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1; |
||||
|
||||
/* If there's no timed waiter, we should become one: that waiter waits only
|
||||
until the next timer should expire. All other timer threads wait forever |
||||
unless their 'next' is earlier than the current timed-waiter's deadline |
||||
(in which case the thread with earlier 'next' takes over as the new timed |
||||
waiter) */ |
||||
if (next != grpc_core::Timestamp::InfFuture()) { |
||||
if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) { |
||||
my_timed_waiter_generation = ++timed_waiter_generation_; |
||||
has_timed_waiter_ = true; |
||||
timed_waiter_deadline_ = next; |
||||
} else { // timed_waiter_ == true && next >= timed_waiter_deadline_
|
||||
next = grpc_core::Timestamp::InfFuture(); |
||||
} |
||||
} |
||||
|
||||
cv_.WaitWithTimeout(&mu_, |
||||
absl::Milliseconds((next - host_.Now()).millis())); |
||||
|
||||
// if this was the timed waiter, then we need to check timers, and flag
|
||||
// that there's now no timed waiter... we'll look for a replacement if
|
||||
// there's work to do after checking timers (code above)
|
||||
if (my_timed_waiter_generation == timed_waiter_generation_) { |
||||
++wakeups_; |
||||
has_timed_waiter_ = false; |
||||
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); |
||||
} |
||||
} |
||||
|
||||
kicked_ = false; |
||||
|
||||
return true; |
||||
} |
||||
|
||||
void TimerManager::MainLoop() { |
||||
for (;;) { |
||||
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture(); |
||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> |
||||
check_result = timer_list_->TimerCheck(&next); |
||||
if (check_result.has_value()) { |
||||
if (!check_result->empty()) { |
||||
RunSomeTimers(std::move(*check_result)); |
||||
continue; |
||||
} |
||||
} else { |
||||
/* This case only happens under contention, meaning more than one timer
|
||||
manager thread checked timers concurrently. |
||||
|
||||
If that happens, we're guaranteed that some other thread has just |
||||
checked timers, and this will avalanche into some other thread seeing |
||||
empty timers and doing a timed sleep. |
||||
|
||||
Consequently, we can just sleep forever here and be happy at some |
||||
saved wakeup cycles. */ |
||||
next = grpc_core::Timestamp::InfFuture(); |
||||
} |
||||
if (!WaitUntil(next)) return; |
||||
} |
||||
} |
||||
|
||||
void TimerManager::RunThread(void* arg) { |
||||
std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg)); |
||||
thread->self->MainLoop(); |
||||
{ |
||||
grpc_core::MutexLock lock(&thread->self->mu_); |
||||
thread->self->thread_count_--; |
||||
thread->self->completed_threads_.push_back(std::move(thread->thread)); |
||||
} |
||||
thread->self->cv_.Signal(); |
||||
} |
||||
|
||||
TimerManager::TimerManager() : host_(this) { |
||||
timer_list_ = absl::make_unique<TimerList>(&host_); |
||||
grpc_core::MutexLock lock(&mu_); |
||||
StartThread(); |
||||
} |
||||
|
||||
grpc_core::Timestamp TimerManager::Host::Now() { |
||||
return grpc_core::Timestamp::FromTimespecRoundDown( |
||||
gpr_now(GPR_CLOCK_MONOTONIC)); |
||||
} |
||||
|
||||
void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
||||
experimental::EventEngine::Closure* closure) { |
||||
timer_list_->TimerInit(timer, deadline, closure); |
||||
} |
||||
|
||||
bool TimerManager::TimerCancel(Timer* timer) { |
||||
return timer_list_->TimerCancel(timer); |
||||
} |
||||
|
||||
TimerManager::~TimerManager() { |
||||
{ |
||||
grpc_core::MutexLock lock(&mu_); |
||||
shutdown_ = true; |
||||
cv_.SignalAll(); |
||||
} |
||||
while (true) { |
||||
ThreadCollector collector; |
||||
grpc_core::MutexLock lock(&mu_); |
||||
collector.Collect(std::move(completed_threads_)); |
||||
if (thread_count_ == 0) break; |
||||
cv_.Wait(&mu_); |
||||
} |
||||
} |
||||
|
||||
void TimerManager::Host::Kick() { timer_manager_->Kick(); } |
||||
|
||||
void TimerManager::Kick() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
has_timed_waiter_ = false; |
||||
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); |
||||
++timed_waiter_generation_; |
||||
kicked_ = true; |
||||
cv_.Signal(); |
||||
} |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,111 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_MANAGER_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_MANAGER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
#include <stdint.h> |
||||
|
||||
#include <memory> |
||||
#include <vector> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/thd.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
// Timer Manager tries to keep only one thread waiting for the next timeout at
|
||||
// all times, and thus effectively preventing the thundering herd problem.
|
||||
// TODO(ctiller): consider unifying this thread pool and the one in
|
||||
// thread_pool.{h,cc}.
|
||||
class TimerManager final { |
||||
public: |
||||
TimerManager(); |
||||
~TimerManager(); |
||||
|
||||
grpc_core::Timestamp Now() { return host_.Now(); } |
||||
|
||||
void TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
||||
experimental::EventEngine::Closure* closure); |
||||
bool TimerCancel(Timer* timer); |
||||
|
||||
private: |
||||
struct RunThreadArgs { |
||||
TimerManager* self; |
||||
grpc_core::Thread thread; |
||||
}; |
||||
|
||||
class Host final : public TimerListHost { |
||||
public: |
||||
explicit Host(TimerManager* timer_manager) |
||||
: timer_manager_(timer_manager) {} |
||||
|
||||
void Kick() override; |
||||
grpc_core::Timestamp Now() override; |
||||
|
||||
private: |
||||
TimerManager* const timer_manager_; |
||||
}; |
||||
|
||||
void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
static void RunThread(void* arg); |
||||
void MainLoop(); |
||||
void RunSomeTimers(std::vector<experimental::EventEngine::Closure*> timers); |
||||
bool WaitUntil(grpc_core::Timestamp next); |
||||
void Kick(); |
||||
|
||||
grpc_core::Mutex mu_; |
||||
grpc_core::CondVar cv_; |
||||
Host host_; |
||||
// number of threads in the system
|
||||
size_t thread_count_ ABSL_GUARDED_BY(mu_) = 0; |
||||
// number of threads sitting around waiting
|
||||
size_t waiter_count_ ABSL_GUARDED_BY(mu_) = 0; |
||||
// Threads waiting to be joined
|
||||
std::vector<grpc_core::Thread> completed_threads_ ABSL_GUARDED_BY(mu_); |
||||
// is there a thread waiting until the next timer should fire?
|
||||
bool has_timed_waiter_ ABSL_GUARDED_BY(mu_) = false; |
||||
// are we shutting down?
|
||||
bool shutdown_ ABSL_GUARDED_BY(mu_) = false; |
||||
// are we shutting down?
|
||||
bool kicked_ ABSL_GUARDED_BY(mu_) = false; |
||||
// the deadline of the current timed waiter thread (only relevant if
|
||||
// has_timed_waiter_ is true)
|
||||
grpc_core::Timestamp timed_waiter_deadline_ ABSL_GUARDED_BY(mu_); |
||||
// generation counter to track which thread is waiting for the next timer
|
||||
uint64_t timed_waiter_generation_ ABSL_GUARDED_BY(mu_) = 0; |
||||
// number of timer wakeups
|
||||
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = 0; |
||||
// actual timer implementation
|
||||
std::unique_ptr<TimerList> timer_list_; |
||||
}; |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_MANAGER_H */ |
@ -0,0 +1,62 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
grpc_package( |
||||
name = "test/core/event_engine/iomgr_event_engine", |
||||
visibility = "public", |
||||
) # Useful for third party devs to test their io manager implementation. |
||||
|
||||
grpc_cc_test( |
||||
name = "time_averaged_stats_test", |
||||
srcs = ["time_averaged_stats_test.cc"], |
||||
external_deps = ["gtest"], |
||||
language = "C++", |
||||
uses_event_engine = False, |
||||
uses_polling = False, |
||||
deps = [ |
||||
"//:iomgr_ee_time_averaged_stats", |
||||
"//test/core/util:grpc_suppressions", |
||||
], |
||||
) |
||||
|
||||
grpc_cc_test( |
||||
name = "timer_heap_test", |
||||
srcs = ["timer_heap_test.cc"], |
||||
external_deps = ["gtest"], |
||||
language = "C++", |
||||
uses_event_engine = False, |
||||
uses_polling = False, |
||||
deps = [ |
||||
"//:bitset", |
||||
"//:iomgr_ee_timer", |
||||
"//test/core/util:grpc_suppressions", |
||||
], |
||||
) |
||||
|
||||
grpc_cc_test( |
||||
name = "timer_list_test", |
||||
srcs = ["timer_list_test.cc"], |
||||
external_deps = ["gtest"], |
||||
language = "C++", |
||||
uses_event_engine = False, |
||||
uses_polling = False, |
||||
deps = [ |
||||
"//:iomgr_ee_timer", |
||||
"//test/core/util:grpc_suppressions", |
||||
], |
||||
) |
@ -0,0 +1,179 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/time_averaged_stats.h" |
||||
|
||||
#include <math.h> |
||||
|
||||
#include <gtest/gtest.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
TEST(TimeAveragedStatsTest, NoRegressNoPersistTest1) { |
||||
TimeAveragedStats tas(1000, 0, 0.0); |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
||||
|
||||
/* Should have no effect */ |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
||||
|
||||
/* Should replace old average */ |
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, NoRegressNoPersistTest2) { |
||||
TimeAveragedStats tas(1000, 0, 0.0); |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
/* Should replace init value */ |
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(3000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, NoRegressNoPersistTest3) { |
||||
TimeAveragedStats tas(1000, 0, 0.0); |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
/* Should replace init value */ |
||||
tas.AddSample(2500); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(2500, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(3500); |
||||
tas.AddSample(4500); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(4000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, SomeRegressNoPersistTest) { |
||||
TimeAveragedStats tas(1000, 0.5, 0.0); |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
||||
tas.AddSample(2000); |
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
/* (2 * 2000 + 0.5 * 1000) / 2.5 */ |
||||
EXPECT_DOUBLE_EQ(1800, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2.5, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, SomeDecayTest) { |
||||
TimeAveragedStats tas(1000, 1, 0.0); |
||||
EXPECT_EQ(1000, tas.aggregate_weighted_avg()); |
||||
/* Should avg with init value */ |
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(1500, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(1500, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(1500, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, NoRegressFullPersistTest) { |
||||
TimeAveragedStats tas(1000, 0, 1.0); |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
||||
|
||||
/* Should replace init value */ |
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_EQ(2000, tas.aggregate_weighted_avg()); |
||||
EXPECT_EQ(1, tas.aggregate_total_weight()); |
||||
|
||||
/* Will result in average of the 3 samples. */ |
||||
tas.AddSample(2300); |
||||
tas.AddSample(2300); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(2200, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(3, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, NoRegressSomePersistTest) { |
||||
TimeAveragedStats tas(1000, 0, 0.5); |
||||
/* Should replace init value */ |
||||
tas.AddSample(2000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(2500); |
||||
tas.AddSample(4000); |
||||
tas.UpdateAverage(); |
||||
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2.5, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
TEST(TimeAveragedStatsTest, SomeRegressSomePersistTest) { |
||||
TimeAveragedStats tas(1000, 0.4, 0.6); |
||||
/* Sample weight = 0 */ |
||||
EXPECT_EQ(1000, tas.aggregate_weighted_avg()); |
||||
EXPECT_EQ(0, tas.aggregate_total_weight()); |
||||
|
||||
tas.UpdateAverage(); |
||||
/* (0.6 * 0 * 1000 + 0.4 * 1000 / 0.4) */ |
||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(0.4, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(2640); |
||||
tas.UpdateAverage(); |
||||
/* (1 * 2640 + 0.6 * 0.4 * 1000 + 0.4 * 1000 / (1 + 0.6 * 0.4 + 0.4) */ |
||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(1.64, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(2876.8); |
||||
tas.UpdateAverage(); |
||||
/* (1 * 2876.8 + 0.6 * 1.64 * 2000 + 0.4 * 1000 / (1 + 0.6 * 1.64 + 0.4) */ |
||||
EXPECT_DOUBLE_EQ(2200, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2.384, tas.aggregate_total_weight()); |
||||
|
||||
tas.AddSample(4944.32); |
||||
tas.UpdateAverage(); |
||||
/* (1 * 4944.32 + 0.6 * 2.384 * 2200 + 0.4 * 1000) /
|
||||
(1 + 0.6 * 2.384 + 0.4) */ |
||||
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg()); |
||||
EXPECT_DOUBLE_EQ(2.8304, tas.aggregate_total_weight()); |
||||
} |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
@ -0,0 +1,202 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
||||
|
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <gmock/gmock.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/bitset.h" |
||||
|
||||
using testing::Contains; |
||||
using testing::Not; |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
namespace { |
||||
int64_t RandomDeadline(void) { return rand(); } |
||||
|
||||
std::vector<Timer> CreateTestElements(size_t num_elements) { |
||||
std::vector<Timer> elems(num_elements); |
||||
for (size_t i = 0; i < num_elements; i++) { |
||||
elems[i].deadline = RandomDeadline(); |
||||
} |
||||
return elems; |
||||
} |
||||
|
||||
void CheckValid(TimerHeap* pq) { |
||||
const std::vector<Timer*>& timers = pq->TestOnlyGetTimers(); |
||||
for (size_t i = 0; i < timers.size(); ++i) { |
||||
size_t left_child = 1u + 2u * i; |
||||
size_t right_child = left_child + 1u; |
||||
if (left_child < timers.size()) { |
||||
EXPECT_LE(timers[i]->deadline, timers[left_child]->deadline); |
||||
} |
||||
if (right_child < timers.size()) { |
||||
EXPECT_LE(timers[i]->deadline, timers[right_child]->deadline); |
||||
} |
||||
} |
||||
} |
||||
|
||||
TEST(TimerHeapTest, Basics) { |
||||
TimerHeap pq; |
||||
const size_t num_test_elements = 200; |
||||
const size_t num_test_operations = 10000; |
||||
size_t i; |
||||
std::vector<Timer> test_elements = CreateTestElements(num_test_elements); |
||||
grpc_core::BitSet<num_test_elements> inpq; |
||||
|
||||
EXPECT_TRUE(pq.is_empty()); |
||||
CheckValid(&pq); |
||||
for (i = 0; i < num_test_elements; ++i) { |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Not(Contains(&test_elements[i]))); |
||||
pq.Add(&test_elements[i]); |
||||
CheckValid(&pq); |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(&test_elements[i])); |
||||
inpq.set(i); |
||||
} |
||||
for (i = 0; i < num_test_elements; ++i) { |
||||
/* Test that check still succeeds even for element that wasn't just
|
||||
inserted. */ |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(&test_elements[i])); |
||||
} |
||||
|
||||
EXPECT_EQ(pq.TestOnlyGetTimers().size(), num_test_elements); |
||||
CheckValid(&pq); |
||||
|
||||
for (i = 0; i < num_test_operations; ++i) { |
||||
size_t elem_num = static_cast<size_t>(rand()) % num_test_elements; |
||||
Timer* el = &test_elements[elem_num]; |
||||
if (!inpq.is_set(elem_num)) { /* not in pq */ |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Not(Contains(el))); |
||||
el->deadline = RandomDeadline(); |
||||
pq.Add(el); |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(el)); |
||||
inpq.set(elem_num); |
||||
CheckValid(&pq); |
||||
} else { |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(el)); |
||||
pq.Remove(el); |
||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Not(Contains(el))); |
||||
inpq.clear(elem_num); |
||||
CheckValid(&pq); |
||||
} |
||||
} |
||||
} |
||||
|
||||
struct ElemStruct { |
||||
Timer elem; |
||||
bool inserted = false; |
||||
}; |
||||
|
||||
ElemStruct* SearchElems(std::vector<ElemStruct>& elems, bool inserted) { |
||||
std::vector<size_t> search_order; |
||||
for (size_t i = 0; i < elems.size(); i++) { |
||||
search_order.push_back(i); |
||||
} |
||||
for (size_t i = 0; i < elems.size() * 2; i++) { |
||||
size_t a = static_cast<size_t>(rand()) % elems.size(); |
||||
size_t b = static_cast<size_t>(rand()) % elems.size(); |
||||
std::swap(search_order[a], search_order[b]); |
||||
} |
||||
ElemStruct* out = nullptr; |
||||
for (size_t i = 0; out == nullptr && i < elems.size(); i++) { |
||||
if (elems[search_order[i]].inserted == inserted) { |
||||
out = &elems[search_order[i]]; |
||||
} |
||||
} |
||||
return out; |
||||
} |
||||
|
||||
// TODO(ctiller): this should be an actual fuzzer
|
||||
TEST(TimerHeapTest, RandomMutations) { |
||||
TimerHeap pq; |
||||
|
||||
static const size_t elems_size = 1000; |
||||
std::vector<ElemStruct> elems(elems_size); |
||||
size_t num_inserted = 0; |
||||
|
||||
for (size_t round = 0; round < 10000; round++) { |
||||
int r = rand() % 1000; |
||||
if (r <= 550) { |
||||
/* 55% of the time we try to add something */ |
||||
ElemStruct* el = SearchElems(elems, false); |
||||
if (el != nullptr) { |
||||
el->elem.deadline = RandomDeadline(); |
||||
pq.Add(&el->elem); |
||||
el->inserted = true; |
||||
num_inserted++; |
||||
CheckValid(&pq); |
||||
} |
||||
} else if (r <= 650) { |
||||
/* 10% of the time we try to remove something */ |
||||
ElemStruct* el = SearchElems(elems, true); |
||||
if (el != nullptr) { |
||||
pq.Remove(&el->elem); |
||||
el->inserted = false; |
||||
num_inserted--; |
||||
CheckValid(&pq); |
||||
} |
||||
} else { |
||||
/* the remaining times we pop */ |
||||
if (num_inserted > 0) { |
||||
Timer* top = pq.Top(); |
||||
pq.Pop(); |
||||
for (size_t i = 0; i < elems_size; i++) { |
||||
if (top == &elems[i].elem) { |
||||
GPR_ASSERT(elems[i].inserted); |
||||
elems[i].inserted = false; |
||||
} |
||||
} |
||||
num_inserted--; |
||||
CheckValid(&pq); |
||||
} |
||||
} |
||||
|
||||
if (num_inserted) { |
||||
int64_t* min_deadline = nullptr; |
||||
for (size_t i = 0; i < elems_size; i++) { |
||||
if (elems[i].inserted) { |
||||
if (min_deadline == nullptr) { |
||||
min_deadline = &elems[i].elem.deadline; |
||||
} else { |
||||
if (elems[i].elem.deadline < *min_deadline) { |
||||
min_deadline = &elems[i].elem.deadline; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
GPR_ASSERT(pq.Top()->deadline == *min_deadline); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
@ -0,0 +1,246 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <cstdint> |
||||
#include <limits> |
||||
|
||||
#include <gmock/gmock.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
using testing::Mock; |
||||
using testing::Return; |
||||
using testing::StrictMock; |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace iomgr_engine { |
||||
|
||||
namespace { |
||||
const int64_t kHoursIn25Days = 25 * 24; |
||||
const grpc_core::Duration k25Days = grpc_core::Duration::Hours(kHoursIn25Days); |
||||
|
||||
class MockClosure : public experimental::EventEngine::Closure { |
||||
public: |
||||
MOCK_METHOD(void, Run, ()); |
||||
}; |
||||
|
||||
class MockHost : public TimerListHost { |
||||
public: |
||||
MOCK_METHOD(grpc_core::Timestamp, Now, ()); |
||||
MOCK_METHOD(void, Kick, ()); |
||||
}; |
||||
|
||||
enum class CheckResult { kTimersFired, kCheckedAndEmpty, kNotChecked }; |
||||
|
||||
CheckResult FinishCheck( |
||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> result) { |
||||
if (!result.has_value()) return CheckResult::kNotChecked; |
||||
if (result->empty()) return CheckResult::kCheckedAndEmpty; |
||||
for (auto closure : *result) { |
||||
closure->Run(); |
||||
} |
||||
return CheckResult::kTimersFired; |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
TEST(TimerListTest, Add) { |
||||
Timer timers[20]; |
||||
StrictMock<MockClosure> closures[20]; |
||||
|
||||
const auto kStart = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(100); |
||||
|
||||
StrictMock<MockHost> host; |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
TimerList timer_list(&host); |
||||
|
||||
/* 10 ms timers. will expire in the current epoch */ |
||||
for (int i = 0; i < 10; i++) { |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
timer_list.TimerInit(&timers[i], |
||||
kStart + grpc_core::Duration::Milliseconds(10), |
||||
&closures[i]); |
||||
} |
||||
|
||||
/* 1010 ms timers. will expire in the next epoch */ |
||||
for (int i = 10; i < 20; i++) { |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
timer_list.TimerInit(&timers[i], |
||||
kStart + grpc_core::Duration::Milliseconds(1010), |
||||
&closures[i]); |
||||
} |
||||
|
||||
/* collect timers. Only the first batch should be ready. */ |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(500))); |
||||
for (int i = 0; i < 10; i++) { |
||||
EXPECT_CALL(closures[i], Run()); |
||||
} |
||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
||||
CheckResult::kTimersFired); |
||||
for (int i = 0; i < 10; i++) { |
||||
Mock::VerifyAndClearExpectations(&closures[i]); |
||||
} |
||||
|
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(600))); |
||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
||||
CheckResult::kCheckedAndEmpty); |
||||
|
||||
/* collect the rest of the timers */ |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(1500))); |
||||
for (int i = 10; i < 20; i++) { |
||||
EXPECT_CALL(closures[i], Run()); |
||||
} |
||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
||||
CheckResult::kTimersFired); |
||||
for (int i = 10; i < 20; i++) { |
||||
Mock::VerifyAndClearExpectations(&closures[i]); |
||||
} |
||||
|
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(1600))); |
||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
||||
CheckResult::kCheckedAndEmpty); |
||||
} |
||||
|
||||
/* Cleaning up a list with pending timers. */ |
||||
TEST(TimerListTest, Destruction) { |
||||
Timer timers[5]; |
||||
StrictMock<MockClosure> closures[5]; |
||||
|
||||
StrictMock<MockHost> host; |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
||||
TimerList timer_list(&host); |
||||
|
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
||||
timer_list.TimerInit( |
||||
&timers[0], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(100), |
||||
&closures[0]); |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
||||
timer_list.TimerInit( |
||||
&timers[1], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(3), |
||||
&closures[1]); |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
||||
timer_list.TimerInit( |
||||
&timers[2], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(100), |
||||
&closures[2]); |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
||||
timer_list.TimerInit( |
||||
&timers[3], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(3), |
||||
&closures[3]); |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
||||
timer_list.TimerInit( |
||||
&timers[4], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(1), |
||||
&closures[4]); |
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce( |
||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(2))); |
||||
EXPECT_CALL(closures[4], Run()); |
||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
||||
CheckResult::kTimersFired); |
||||
Mock::VerifyAndClearExpectations(&closures[4]); |
||||
EXPECT_FALSE(timer_list.TimerCancel(&timers[4])); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[0])); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[3])); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[1])); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[2])); |
||||
} |
||||
|
||||
/* Cleans up a list with pending timers that simulate long-running-services.
|
||||
This test does the following: |
||||
1) Simulates grpc server start time to 25 days in the past (completed in |
||||
`main` using TestOnlyGlobalInit()) |
||||
2) Creates 4 timers - one with a deadline 25 days in the future, one just |
||||
3 milliseconds in future, one way out in the future, and one using the |
||||
Timestamp::FromTimespecRoundUp function to compute a deadline of 25 |
||||
days in the future |
||||
3) Simulates 4 milliseconds of elapsed time by changing `now` (cached at |
||||
step 1) to `now+4` |
||||
4) Shuts down the timer list |
||||
https://github.com/grpc/grpc/issues/15904 */
|
||||
TEST(TimerListTest, LongRunningServiceCleanup) { |
||||
Timer timers[4]; |
||||
StrictMock<MockClosure> closures[4]; |
||||
|
||||
const auto kStart = |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(k25Days.millis()); |
||||
|
||||
StrictMock<MockHost> host; |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
TimerList timer_list(&host); |
||||
|
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
timer_list.TimerInit(&timers[0], kStart + k25Days, &closures[0]); |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
timer_list.TimerInit( |
||||
&timers[1], kStart + grpc_core::Duration::Milliseconds(3), &closures[1]); |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
timer_list.TimerInit(&timers[2], |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
std::numeric_limits<int64_t>::max() - 1), |
||||
&closures[2]); |
||||
|
||||
gpr_timespec deadline_spec = |
||||
(kStart + k25Days).as_timespec(gpr_clock_type::GPR_CLOCK_MONOTONIC); |
||||
|
||||
/* Timestamp::FromTimespecRoundUp is how users usually compute a millisecond
|
||||
input value into grpc_timer_init, so we mimic that behavior here */ |
||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
||||
timer_list.TimerInit(&timers[3], |
||||
grpc_core::Timestamp::FromTimespecRoundUp(deadline_spec), |
||||
&closures[3]); |
||||
|
||||
EXPECT_CALL(host, Now()) |
||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(4))); |
||||
EXPECT_CALL(closures[1], Run()); |
||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
||||
CheckResult::kTimersFired); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[0])); |
||||
EXPECT_FALSE(timer_list.TimerCancel(&timers[1])); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[2])); |
||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[3])); |
||||
} |
||||
|
||||
} // namespace iomgr_engine
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue