mirror of https://github.com/grpc/grpc.git
This reverts commit 977ebbef09
.
pull/30025/head
parent
fd8ff7d682
commit
cda2127776
36 changed files with 188 additions and 3037 deletions
@ -1,123 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/thread_pool.h" |
|
||||||
|
|
||||||
#include "src/core/lib/gprpp/thd.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
ThreadPool::Thread::Thread(ThreadPool* pool) |
|
||||||
: pool_(pool), |
|
||||||
thd_( |
|
||||||
"iomgr_eventengine_pool", |
|
||||||
[](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); }, |
|
||||||
this) { |
|
||||||
thd_.Start(); |
|
||||||
} |
|
||||||
ThreadPool::Thread::~Thread() { thd_.Join(); } |
|
||||||
|
|
||||||
void ThreadPool::Thread::ThreadFunc() { |
|
||||||
pool_->ThreadFunc(); |
|
||||||
// Now that we have killed ourselves, we should reduce the thread count
|
|
||||||
grpc_core::MutexLock lock(&pool_->mu_); |
|
||||||
pool_->nthreads_--; |
|
||||||
// Move ourselves to dead list
|
|
||||||
pool_->dead_threads_.push_back(this); |
|
||||||
|
|
||||||
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { |
|
||||||
pool_->shutdown_cv_.Signal(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void ThreadPool::ThreadFunc() { |
|
||||||
for (;;) { |
|
||||||
// Wait until work is available or we are shutting down.
|
|
||||||
grpc_core::ReleasableMutexLock lock(&mu_); |
|
||||||
if (!shutdown_ && callbacks_.empty()) { |
|
||||||
// If there are too many threads waiting, then quit this thread
|
|
||||||
if (threads_waiting_ >= reserve_threads_) { |
|
||||||
break; |
|
||||||
} |
|
||||||
threads_waiting_++; |
|
||||||
cv_.Wait(&mu_); |
|
||||||
threads_waiting_--; |
|
||||||
} |
|
||||||
// Drain callbacks before considering shutdown to ensure all work
|
|
||||||
// gets completed.
|
|
||||||
if (!callbacks_.empty()) { |
|
||||||
auto cb = callbacks_.front(); |
|
||||||
callbacks_.pop(); |
|
||||||
lock.Release(); |
|
||||||
cb(); |
|
||||||
} else if (shutdown_) { |
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
ThreadPool::ThreadPool(int reserve_threads) |
|
||||||
: shutdown_(false), |
|
||||||
reserve_threads_(reserve_threads), |
|
||||||
nthreads_(0), |
|
||||||
threads_waiting_(0) { |
|
||||||
for (int i = 0; i < reserve_threads_; i++) { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
nthreads_++; |
|
||||||
new Thread(this); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) { |
|
||||||
for (auto* t : *tlist) delete t; |
|
||||||
tlist->clear(); |
|
||||||
} |
|
||||||
|
|
||||||
ThreadPool::~ThreadPool() { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
shutdown_ = true; |
|
||||||
cv_.SignalAll(); |
|
||||||
while (nthreads_ != 0) { |
|
||||||
shutdown_cv_.Wait(&mu_); |
|
||||||
} |
|
||||||
ReapThreads(&dead_threads_); |
|
||||||
} |
|
||||||
|
|
||||||
void ThreadPool::Add(const std::function<void()>& callback) { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
// Add works to the callbacks list
|
|
||||||
callbacks_.push(callback); |
|
||||||
// Increase pool size or notify as needed
|
|
||||||
if (threads_waiting_ == 0) { |
|
||||||
// Kick off a new thread
|
|
||||||
nthreads_++; |
|
||||||
new Thread(this); |
|
||||||
} else { |
|
||||||
cv_.Signal(); |
|
||||||
} |
|
||||||
// Also use this chance to harvest dead threads
|
|
||||||
if (!dead_threads_.empty()) { |
|
||||||
ReapThreads(&dead_threads_); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
@ -1,70 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_THREAD_POOL_H |
|
||||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_THREAD_POOL_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <functional> |
|
||||||
#include <queue> |
|
||||||
#include <vector> |
|
||||||
|
|
||||||
#include "src/core/lib/gprpp/sync.h" |
|
||||||
#include "src/core/lib/gprpp/thd.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
class ThreadPool final { |
|
||||||
public: |
|
||||||
explicit ThreadPool(int reserve_threads); |
|
||||||
~ThreadPool(); |
|
||||||
|
|
||||||
void Add(const std::function<void()>& callback); |
|
||||||
|
|
||||||
private: |
|
||||||
class Thread { |
|
||||||
public: |
|
||||||
explicit Thread(ThreadPool* pool); |
|
||||||
~Thread(); |
|
||||||
|
|
||||||
private: |
|
||||||
ThreadPool* pool_; |
|
||||||
grpc_core::Thread thd_; |
|
||||||
void ThreadFunc(); |
|
||||||
}; |
|
||||||
|
|
||||||
void ThreadFunc(); |
|
||||||
static void ReapThreads(std::vector<Thread*>* tlist); |
|
||||||
|
|
||||||
grpc_core::Mutex mu_; |
|
||||||
grpc_core::CondVar cv_; |
|
||||||
grpc_core::CondVar shutdown_cv_; |
|
||||||
bool shutdown_; |
|
||||||
std::queue<std::function<void()>> callbacks_; |
|
||||||
int reserve_threads_; |
|
||||||
int nthreads_; |
|
||||||
int threads_waiting_; |
|
||||||
std::vector<Thread*> dead_threads_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_THREAD_POOL_H
|
|
@ -1,62 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/time_averaged_stats.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
TimeAveragedStats::TimeAveragedStats(double init_avg, double regress_weight, |
|
||||||
double persistence_factor) |
|
||||||
: init_avg_(init_avg), |
|
||||||
regress_weight_(regress_weight), |
|
||||||
persistence_factor_(persistence_factor) {} |
|
||||||
|
|
||||||
void TimeAveragedStats::AddSample(double value) { |
|
||||||
batch_total_value_ += value; |
|
||||||
++batch_num_samples_; |
|
||||||
} |
|
||||||
|
|
||||||
double TimeAveragedStats::UpdateAverage() { |
|
||||||
/* Start with the current batch: */ |
|
||||||
double weighted_sum = batch_total_value_; |
|
||||||
double total_weight = batch_num_samples_; |
|
||||||
if (regress_weight_ > 0) { |
|
||||||
/* Add in the regression towards init_avg_: */ |
|
||||||
weighted_sum += regress_weight_ * init_avg_; |
|
||||||
total_weight += regress_weight_; |
|
||||||
} |
|
||||||
if (persistence_factor_ > 0) { |
|
||||||
/* Add in the persistence: */ |
|
||||||
const double prev_sample_weight = |
|
||||||
persistence_factor_ * aggregate_total_weight_; |
|
||||||
weighted_sum += prev_sample_weight * aggregate_weighted_avg_; |
|
||||||
total_weight += prev_sample_weight; |
|
||||||
} |
|
||||||
aggregate_weighted_avg_ = |
|
||||||
(total_weight > 0) ? (weighted_sum / total_weight) : init_avg_; |
|
||||||
aggregate_total_weight_ = total_weight; |
|
||||||
batch_num_samples_ = 0; |
|
||||||
batch_total_value_ = 0; |
|
||||||
return aggregate_weighted_avg_; |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
@ -1,81 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIME_AVERAGED_STATS_H |
|
||||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIME_AVERAGED_STATS_H |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
/* This tracks a time-decaying weighted average. It works by collecting
|
|
||||||
batches of samples and then mixing their average into a time-decaying |
|
||||||
weighted mean. It is designed for batch operations where we do many adds |
|
||||||
before updating the average. */ |
|
||||||
|
|
||||||
class TimeAveragedStats { |
|
||||||
public: |
|
||||||
TimeAveragedStats(double init_avg, double regress_weight, |
|
||||||
double persistence_factor); |
|
||||||
|
|
||||||
/* Add a sample to the current batch. */ |
|
||||||
void AddSample(double value); |
|
||||||
/* Complete a batch and compute the new estimate of the average sample
|
|
||||||
value. */ |
|
||||||
double UpdateAverage(); |
|
||||||
|
|
||||||
double aggregate_weighted_avg() const { return aggregate_weighted_avg_; } |
|
||||||
double aggregate_total_weight() const { return aggregate_total_weight_; } |
|
||||||
|
|
||||||
private: |
|
||||||
/* The initial average value. This is the reported average until the first
|
|
||||||
grpc_time_averaged_stats_update_average call. If a positive regress_weight |
|
||||||
is used, we also regress towards this value on each update. */ |
|
||||||
const double init_avg_; |
|
||||||
/* The sample weight of "init_avg" that is mixed in with each call to
|
|
||||||
grpc_time_averaged_stats_update_average. If the calls to |
|
||||||
grpc_time_averaged_stats_add_sample stop, this will cause the average to |
|
||||||
regress back to the mean. This should be non-negative. Set it to 0 to |
|
||||||
disable the bias. A value of 1 has the effect of adding in 1 bonus sample |
|
||||||
with value init_avg to each sample period. */ |
|
||||||
const double regress_weight_; |
|
||||||
/* This determines the rate of decay of the time-averaging from one period
|
|
||||||
to the next by scaling the aggregate_total_weight of samples from prior |
|
||||||
periods when combining with the latest period. It should be in the range |
|
||||||
[0,1]. A higher value adapts more slowly. With a value of 0.5, if the |
|
||||||
batches each have k samples, the samples_in_avg_ will grow to 2 k, so the |
|
||||||
weighting of the time average will eventually be 1/3 new batch and 2/3 |
|
||||||
old average. */ |
|
||||||
const double persistence_factor_; |
|
||||||
|
|
||||||
/* The total value of samples since the last UpdateAverage(). */ |
|
||||||
double batch_total_value_ = 0; |
|
||||||
/* The number of samples since the last UpdateAverage(). */ |
|
||||||
double batch_num_samples_ = 0; |
|
||||||
/* The time-decayed sum of batch_num_samples_ over previous batches. This is
|
|
||||||
the "weight" of the old aggregate_weighted_avg_ when updating the |
|
||||||
average. */ |
|
||||||
double aggregate_total_weight_ = 0; |
|
||||||
/* A time-decayed average of the (batch_total_value_ / batch_num_samples_),
|
|
||||||
computed by decaying the samples_in_avg_ weight in the weighted average. */ |
|
||||||
double aggregate_weighted_avg_ = init_avg_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIME_AVERAGED_STATS_H */ |
|
@ -1,312 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
|
||||||
|
|
||||||
#include <algorithm> |
|
||||||
#include <atomic> |
|
||||||
#include <limits> |
|
||||||
#include <type_traits> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include <grpc/support/cpu.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
|
||||||
#include "src/core/lib/gpr/useful.h" |
|
||||||
#include "src/core/lib/gprpp/time.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
static const size_t kInvalidHeapIndex = std::numeric_limits<size_t>::max(); |
|
||||||
static const double kAddDeadlineScale = 0.33; |
|
||||||
static const double kMinQueueWindowDuration = 0.01; |
|
||||||
static const double kMaxQueueWindowDuration = 1.0; |
|
||||||
|
|
||||||
grpc_core::Timestamp TimerList::Shard::ComputeMinDeadline() { |
|
||||||
return heap.is_empty() |
|
||||||
? queue_deadline_cap + grpc_core::Duration::Epsilon() |
|
||||||
: grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
heap.Top()->deadline); |
|
||||||
} |
|
||||||
|
|
||||||
TimerList::Shard::Shard() : stats(1.0 / kAddDeadlineScale, 0.1, 0.5) {} |
|
||||||
|
|
||||||
TimerList::TimerList(TimerListHost* host) |
|
||||||
: host_(host), |
|
||||||
num_shards_(grpc_core::Clamp(2 * gpr_cpu_num_cores(), 1u, 32u)), |
|
||||||
min_timer_(host_->Now().milliseconds_after_process_epoch()), |
|
||||||
shards_(new Shard[num_shards_]), |
|
||||||
shard_queue_(new Shard*[num_shards_]) { |
|
||||||
for (size_t i = 0; i < num_shards_; i++) { |
|
||||||
Shard& shard = shards_[i]; |
|
||||||
shard.queue_deadline_cap = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
min_timer_.load(std::memory_order_relaxed)); |
|
||||||
shard.shard_queue_index = i; |
|
||||||
shard.list.next = shard.list.prev = &shard.list; |
|
||||||
shard.min_deadline = shard.ComputeMinDeadline(); |
|
||||||
shard_queue_[i] = &shard; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
namespace { |
|
||||||
/* returns true if the first element in the list */ |
|
||||||
void ListJoin(Timer* head, Timer* timer) { |
|
||||||
timer->next = head; |
|
||||||
timer->prev = head->prev; |
|
||||||
timer->next->prev = timer->prev->next = timer; |
|
||||||
} |
|
||||||
|
|
||||||
void ListRemove(Timer* timer) { |
|
||||||
timer->next->prev = timer->prev; |
|
||||||
timer->prev->next = timer->next; |
|
||||||
} |
|
||||||
} // namespace
|
|
||||||
|
|
||||||
void TimerList::SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) { |
|
||||||
Shard* temp; |
|
||||||
temp = shard_queue_[first_shard_queue_index]; |
|
||||||
shard_queue_[first_shard_queue_index] = |
|
||||||
shard_queue_[first_shard_queue_index + 1]; |
|
||||||
shard_queue_[first_shard_queue_index + 1] = temp; |
|
||||||
shard_queue_[first_shard_queue_index]->shard_queue_index = |
|
||||||
first_shard_queue_index; |
|
||||||
shard_queue_[first_shard_queue_index + 1]->shard_queue_index = |
|
||||||
first_shard_queue_index + 1; |
|
||||||
} |
|
||||||
|
|
||||||
void TimerList::NoteDeadlineChange(Shard* shard) { |
|
||||||
while (shard->shard_queue_index > 0 && |
|
||||||
shard->min_deadline < |
|
||||||
shard_queue_[shard->shard_queue_index - 1]->min_deadline) { |
|
||||||
SwapAdjacentShardsInQueue(shard->shard_queue_index - 1); |
|
||||||
} |
|
||||||
while (shard->shard_queue_index < num_shards_ - 1 && |
|
||||||
shard->min_deadline > |
|
||||||
shard_queue_[shard->shard_queue_index + 1]->min_deadline) { |
|
||||||
SwapAdjacentShardsInQueue(shard->shard_queue_index); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void TimerList::TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
|
||||||
experimental::EventEngine::Closure* closure) { |
|
||||||
bool is_first_timer = false; |
|
||||||
Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)]; |
|
||||||
timer->closure = closure; |
|
||||||
timer->deadline = deadline.milliseconds_after_process_epoch(); |
|
||||||
|
|
||||||
#ifndef NDEBUG |
|
||||||
timer->hash_table_next = nullptr; |
|
||||||
#endif |
|
||||||
|
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&shard->mu); |
|
||||||
timer->pending = true; |
|
||||||
grpc_core::Timestamp now = host_->Now(); |
|
||||||
if (deadline <= now) { |
|
||||||
deadline = now; |
|
||||||
} |
|
||||||
|
|
||||||
shard->stats.AddSample((deadline - now).millis() / 1000.0); |
|
||||||
|
|
||||||
if (deadline < shard->queue_deadline_cap) { |
|
||||||
is_first_timer = shard->heap.Add(timer); |
|
||||||
} else { |
|
||||||
timer->heap_index = kInvalidHeapIndex; |
|
||||||
ListJoin(&shard->list, timer); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
/* Deadline may have decreased, we need to adjust the main queue. Note
|
|
||||||
that there is a potential racy unlocked region here. There could be a |
|
||||||
reordering of multiple TimerInit calls, at this point, but the < test |
|
||||||
below should ensure that we err on the side of caution. There could |
|
||||||
also be a race with TimerCheck, which might beat us to the lock. In |
|
||||||
that case, it is possible that the timer that we added will have already |
|
||||||
run by the time we hold the lock, but that too is a safe error. |
|
||||||
Finally, it's possible that the TimerCheck that intervened failed to |
|
||||||
trigger the new timer because the min_deadline hadn't yet been reduced. |
|
||||||
In that case, the timer will simply have to wait for the next |
|
||||||
TimerCheck. */ |
|
||||||
if (is_first_timer) { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
if (deadline < shard->min_deadline) { |
|
||||||
grpc_core::Timestamp old_min_deadline = shard_queue_[0]->min_deadline; |
|
||||||
shard->min_deadline = deadline; |
|
||||||
NoteDeadlineChange(shard); |
|
||||||
if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { |
|
||||||
min_timer_.store(deadline.milliseconds_after_process_epoch(), |
|
||||||
std::memory_order_relaxed); |
|
||||||
host_->Kick(); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
bool TimerList::TimerCancel(Timer* timer) { |
|
||||||
Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)]; |
|
||||||
grpc_core::MutexLock lock(&shard->mu); |
|
||||||
|
|
||||||
if (timer->pending) { |
|
||||||
timer->pending = false; |
|
||||||
if (timer->heap_index == kInvalidHeapIndex) { |
|
||||||
ListRemove(timer); |
|
||||||
} else { |
|
||||||
shard->heap.Remove(timer); |
|
||||||
} |
|
||||||
return true; |
|
||||||
} |
|
||||||
|
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
|
|
||||||
all relevant timers in shard->list (i.e timers with deadlines earlier than |
|
||||||
'queue_deadline_cap') into into shard->heap. |
|
||||||
Returns 'true' if shard->heap has at least ONE element */ |
|
||||||
bool TimerList::Shard::RefillHeap(grpc_core::Timestamp now) { |
|
||||||
/* Compute the new queue window width and bound by the limits: */ |
|
||||||
double computed_deadline_delta = stats.UpdateAverage() * kAddDeadlineScale; |
|
||||||
double deadline_delta = |
|
||||||
grpc_core::Clamp(computed_deadline_delta, kMinQueueWindowDuration, |
|
||||||
kMaxQueueWindowDuration); |
|
||||||
Timer *timer, *next; |
|
||||||
|
|
||||||
/* Compute the new cap and put all timers under it into the queue: */ |
|
||||||
queue_deadline_cap = std::max(now, queue_deadline_cap) + |
|
||||||
grpc_core::Duration::FromSecondsAsDouble(deadline_delta); |
|
||||||
|
|
||||||
for (timer = list.next; timer != &list; timer = next) { |
|
||||||
next = timer->next; |
|
||||||
auto timer_deadline = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
timer->deadline); |
|
||||||
|
|
||||||
if (timer_deadline < queue_deadline_cap) { |
|
||||||
ListRemove(timer); |
|
||||||
heap.Add(timer); |
|
||||||
} |
|
||||||
} |
|
||||||
return !heap.is_empty(); |
|
||||||
} |
|
||||||
|
|
||||||
/* This pops the next non-cancelled timer with deadline <= now from the
|
|
||||||
queue, or returns NULL if there isn't one. */ |
|
||||||
Timer* TimerList::Shard::PopOne(grpc_core::Timestamp now) { |
|
||||||
Timer* timer; |
|
||||||
for (;;) { |
|
||||||
if (heap.is_empty()) { |
|
||||||
if (now < queue_deadline_cap) return nullptr; |
|
||||||
if (!RefillHeap(now)) return nullptr; |
|
||||||
} |
|
||||||
timer = heap.Top(); |
|
||||||
auto timer_deadline = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
timer->deadline); |
|
||||||
if (timer_deadline > now) return nullptr; |
|
||||||
timer->pending = false; |
|
||||||
heap.Pop(); |
|
||||||
return timer; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void TimerList::Shard::PopTimers( |
|
||||||
grpc_core::Timestamp now, grpc_core::Timestamp* new_min_deadline, |
|
||||||
std::vector<experimental::EventEngine::Closure*>* out) { |
|
||||||
grpc_core::MutexLock lock(&mu); |
|
||||||
while (Timer* timer = PopOne(now)) { |
|
||||||
out->push_back(timer->closure); |
|
||||||
} |
|
||||||
*new_min_deadline = ComputeMinDeadline(); |
|
||||||
} |
|
||||||
|
|
||||||
std::vector<experimental::EventEngine::Closure*> TimerList::FindExpiredTimers( |
|
||||||
grpc_core::Timestamp now, grpc_core::Timestamp* next) { |
|
||||||
grpc_core::Timestamp min_timer = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
min_timer_.load(std::memory_order_relaxed)); |
|
||||||
|
|
||||||
std::vector<experimental::EventEngine::Closure*> done; |
|
||||||
if (now < min_timer) { |
|
||||||
if (next != nullptr) *next = std::min(*next, min_timer); |
|
||||||
return done; |
|
||||||
} |
|
||||||
|
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
|
|
||||||
while (shard_queue_[0]->min_deadline < now || |
|
||||||
(now != grpc_core::Timestamp::InfFuture() && |
|
||||||
shard_queue_[0]->min_deadline == now)) { |
|
||||||
grpc_core::Timestamp new_min_deadline; |
|
||||||
|
|
||||||
/* For efficiency, we pop as many available timers as we can from the
|
|
||||||
shard. This may violate perfect timer deadline ordering, but that |
|
||||||
shouldn't be a big deal because we don't make ordering guarantees. */ |
|
||||||
shard_queue_[0]->PopTimers(now, &new_min_deadline, &done); |
|
||||||
|
|
||||||
/* An TimerInit() on the shard could intervene here, adding a new
|
|
||||||
timer that is earlier than new_min_deadline. However, |
|
||||||
TimerInit() will block on the mutex before it can call |
|
||||||
set_min_deadline, so this one will complete first and then the Addtimer |
|
||||||
will reduce the min_deadline (perhaps unnecessarily). */ |
|
||||||
shard_queue_[0]->min_deadline = new_min_deadline; |
|
||||||
NoteDeadlineChange(shard_queue_[0]); |
|
||||||
} |
|
||||||
|
|
||||||
if (next) { |
|
||||||
*next = std::min(*next, shard_queue_[0]->min_deadline); |
|
||||||
} |
|
||||||
|
|
||||||
min_timer_.store( |
|
||||||
shard_queue_[0]->min_deadline.milliseconds_after_process_epoch(), |
|
||||||
std::memory_order_relaxed); |
|
||||||
|
|
||||||
return done; |
|
||||||
} |
|
||||||
|
|
||||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> |
|
||||||
TimerList::TimerCheck(grpc_core::Timestamp* next) { |
|
||||||
// prelude
|
|
||||||
grpc_core::Timestamp now = host_->Now(); |
|
||||||
|
|
||||||
/* fetch from a thread-local first: this avoids contention on a globally
|
|
||||||
mutable cacheline in the common case */ |
|
||||||
grpc_core::Timestamp min_timer = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
min_timer_.load(std::memory_order_relaxed)); |
|
||||||
|
|
||||||
if (now < min_timer) { |
|
||||||
if (next != nullptr) { |
|
||||||
*next = std::min(*next, min_timer); |
|
||||||
} |
|
||||||
return std::vector<experimental::EventEngine::Closure*>(); |
|
||||||
} |
|
||||||
|
|
||||||
if (!checker_mu_.TryLock()) return absl::nullopt; |
|
||||||
std::vector<experimental::EventEngine::Closure*> run = |
|
||||||
FindExpiredTimers(now, next); |
|
||||||
checker_mu_.Unlock(); |
|
||||||
|
|
||||||
return std::move(run); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
@ -1,193 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_H |
|
||||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <stddef.h> |
|
||||||
|
|
||||||
#include <atomic> |
|
||||||
#include <cstdint> |
|
||||||
#include <memory> |
|
||||||
#include <vector> |
|
||||||
|
|
||||||
#include "absl/base/thread_annotations.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
|
|
||||||
#include <grpc/event_engine/event_engine.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/time_averaged_stats.h" |
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
|
||||||
#include "src/core/lib/gprpp/sync.h" |
|
||||||
#include "src/core/lib/gprpp/time.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
struct Timer { |
|
||||||
int64_t deadline; |
|
||||||
// kInvalidHeapIndex if not in heap.
|
|
||||||
size_t heap_index; |
|
||||||
bool pending; |
|
||||||
struct Timer* next; |
|
||||||
struct Timer* prev; |
|
||||||
experimental::EventEngine::Closure* closure; |
|
||||||
#ifndef NDEBUG |
|
||||||
struct Timer* hash_table_next; |
|
||||||
#endif |
|
||||||
|
|
||||||
grpc_event_engine::experimental::EventEngine::TaskHandle task_handle; |
|
||||||
}; |
|
||||||
|
|
||||||
// Dependency injection: allow tests and/or TimerManager to inject
|
|
||||||
// their own implementations of Now, Kick.
|
|
||||||
class TimerListHost { |
|
||||||
public: |
|
||||||
// Return the current timestamp.
|
|
||||||
// Abstracted so that tests can be run deterministically.
|
|
||||||
virtual grpc_core::Timestamp Now() = 0; |
|
||||||
// Wake up a thread to check for timers.
|
|
||||||
virtual void Kick() = 0; |
|
||||||
|
|
||||||
protected: |
|
||||||
~TimerListHost() = default; |
|
||||||
}; |
|
||||||
|
|
||||||
class TimerList { |
|
||||||
public: |
|
||||||
explicit TimerList(TimerListHost* host); |
|
||||||
|
|
||||||
TimerList(const TimerList&) = delete; |
|
||||||
TimerList& operator=(const TimerList&) = delete; |
|
||||||
|
|
||||||
/* Initialize *timer. When expired or canceled, closure will be called with
|
|
||||||
error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled |
|
||||||
(GRPC_ERROR_CANCELLED). *closure is guaranteed to be called exactly once, and |
|
||||||
application code should check the error to determine how it was invoked. The |
|
||||||
application callback is also responsible for maintaining information about |
|
||||||
when to free up any user-level state. Behavior is undefined for a deadline of |
|
||||||
grpc_core::Timestamp::InfFuture(). */ |
|
||||||
void TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
|
||||||
experimental::EventEngine::Closure* closure); |
|
||||||
|
|
||||||
/* Note that there is no timer destroy function. This is because the
|
|
||||||
timer is a one-time occurrence with a guarantee that the callback will |
|
||||||
be called exactly once, either at expiration or cancellation. Thus, all |
|
||||||
the internal timer event management state is destroyed just before |
|
||||||
that callback is invoked. If the user has additional state associated with |
|
||||||
the timer, the user is responsible for determining when it is safe to |
|
||||||
destroy that state. */ |
|
||||||
|
|
||||||
/* Cancel an *timer.
|
|
||||||
There are three cases: |
|
||||||
1. We normally cancel the timer |
|
||||||
2. The timer has already run |
|
||||||
3. We can't cancel the timer because it is "in flight". |
|
||||||
|
|
||||||
In all of these cases, the cancellation is still considered successful. |
|
||||||
They are essentially distinguished in that the timer_cb will be run |
|
||||||
exactly once from either the cancellation (with error GRPC_ERROR_CANCELLED) |
|
||||||
or from the activation (with error GRPC_ERROR_NONE). |
|
||||||
|
|
||||||
Note carefully that the callback function MAY occur in the same callstack |
|
||||||
as grpc_timer_cancel. It's expected that most timers will be cancelled |
|
||||||
(their primary use is to implement deadlines), and so this code is |
|
||||||
optimized such that cancellation costs as little as possible. Making |
|
||||||
callbacks run inline matches this aim. |
|
||||||
|
|
||||||
Requires: cancel() must happen after init() on a given timer */ |
|
||||||
bool TimerCancel(Timer* timer) GRPC_MUST_USE_RESULT; |
|
||||||
|
|
||||||
/* iomgr internal api for dealing with timers */ |
|
||||||
|
|
||||||
/* Check for timers to be run, and return them.
|
|
||||||
Return nullopt if timers could not be checked due to contention with |
|
||||||
another thread checking. |
|
||||||
Return a vector of closures that *must* be run otherwise. |
|
||||||
If next is non-null, TRY to update *next with the next running timer |
|
||||||
IF that timer occurs before *next current value. |
|
||||||
*next is never guaranteed to be updated on any given execution; however, |
|
||||||
with high probability at least one thread in the system will see an update |
|
||||||
at any time slice. */ |
|
||||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> TimerCheck( |
|
||||||
grpc_core::Timestamp* next); |
|
||||||
|
|
||||||
private: |
|
||||||
/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
|
|
||||||
* deadlines earlier than 'queue_deadline_cap' are maintained in the heap and |
|
||||||
* others are maintained in the list (unordered). This helps to keep the |
|
||||||
* number of elements in the heap low. |
|
||||||
* |
|
||||||
* The 'queue_deadline_cap' gets recomputed periodically based on the timer |
|
||||||
* stats maintained in 'stats' and the relevant timers are then moved from the |
|
||||||
* 'list' to 'heap'. |
|
||||||
*/ |
|
||||||
struct Shard { |
|
||||||
Shard(); |
|
||||||
|
|
||||||
grpc_core::Timestamp ComputeMinDeadline() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu); |
|
||||||
bool RefillHeap(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu); |
|
||||||
Timer* PopOne(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu); |
|
||||||
void PopTimers(grpc_core::Timestamp now, |
|
||||||
grpc_core::Timestamp* new_min_deadline, |
|
||||||
std::vector<experimental::EventEngine::Closure*>* out) |
|
||||||
ABSL_LOCKS_EXCLUDED(mu); |
|
||||||
|
|
||||||
grpc_core::Mutex mu; |
|
||||||
TimeAveragedStats stats ABSL_GUARDED_BY(mu); |
|
||||||
/* All and only timers with deadlines < this will be in the heap. */ |
|
||||||
grpc_core::Timestamp queue_deadline_cap ABSL_GUARDED_BY(mu); |
|
||||||
/* The deadline of the next timer due in this shard. */ |
|
||||||
grpc_core::Timestamp min_deadline ABSL_GUARDED_BY(&TimerList::mu_); |
|
||||||
/* Index of this timer_shard in the g_shard_queue. */ |
|
||||||
uint32_t shard_queue_index ABSL_GUARDED_BY(&TimerList::mu_); |
|
||||||
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
|
|
||||||
list have the top bit of their deadline set to 0. */ |
|
||||||
TimerHeap heap ABSL_GUARDED_BY(mu); |
|
||||||
/* This holds timers whose deadline is >= queue_deadline_cap. */ |
|
||||||
Timer list ABSL_GUARDED_BY(mu); |
|
||||||
}; |
|
||||||
|
|
||||||
void SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) |
|
||||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
|
||||||
void NoteDeadlineChange(Shard* shard) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
|
||||||
std::vector<experimental::EventEngine::Closure*> FindExpiredTimers( |
|
||||||
grpc_core::Timestamp now, grpc_core::Timestamp* next); |
|
||||||
|
|
||||||
TimerListHost* const host_; |
|
||||||
const size_t num_shards_; |
|
||||||
grpc_core::Mutex mu_; |
|
||||||
/* The deadline of the next timer due across all timer shards */ |
|
||||||
std::atomic<uint64_t> min_timer_; |
|
||||||
/* Allow only one FindExpiredTimers at once (used as a TryLock, protects no
|
|
||||||
* fields but ensures limits on concurrency) */ |
|
||||||
grpc_core::Mutex checker_mu_; |
|
||||||
/* Array of timer shards. Whenever a timer (Timer *) is added, its address
|
|
||||||
* is hashed to select the timer shard to add the timer to */ |
|
||||||
const std::unique_ptr<Shard[]> shards_; |
|
||||||
/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
|
|
||||||
* the deadline of the next timer in each shard). */ |
|
||||||
const std::unique_ptr<Shard*[]> shard_queue_ ABSL_GUARDED_BY(mu_); |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_H */ |
|
@ -1,107 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
|
||||||
|
|
||||||
#include <stdint.h> |
|
||||||
|
|
||||||
#include <algorithm> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
/* Adjusts a heap so as to move a hole at position i closer to the root,
|
|
||||||
until a suitable position is found for element t. Then, copies t into that |
|
||||||
position. This functor is called each time immediately after modifying a |
|
||||||
value in the underlying container, with the offset of the modified element as |
|
||||||
its argument. */ |
|
||||||
void TimerHeap::AdjustUpwards(size_t i, Timer* t) { |
|
||||||
while (i > 0) { |
|
||||||
size_t parent = (i - 1) / 2; |
|
||||||
if (timers_[parent]->deadline <= t->deadline) break; |
|
||||||
timers_[i] = timers_[parent]; |
|
||||||
timers_[i]->heap_index = i; |
|
||||||
i = parent; |
|
||||||
} |
|
||||||
timers_[i] = t; |
|
||||||
t->heap_index = i; |
|
||||||
} |
|
||||||
|
|
||||||
/* Adjusts a heap so as to move a hole at position i farther away from the root,
|
|
||||||
until a suitable position is found for element t. Then, copies t into that |
|
||||||
position. */ |
|
||||||
void TimerHeap::AdjustDownwards(size_t i, Timer* t) { |
|
||||||
for (;;) { |
|
||||||
size_t left_child = 1 + 2 * i; |
|
||||||
if (left_child >= timers_.size()) break; |
|
||||||
size_t right_child = left_child + 1; |
|
||||||
size_t next_i = |
|
||||||
right_child < timers_.size() && |
|
||||||
timers_[left_child]->deadline > timers_[right_child]->deadline |
|
||||||
? right_child |
|
||||||
: left_child; |
|
||||||
if (t->deadline <= timers_[next_i]->deadline) break; |
|
||||||
timers_[i] = timers_[next_i]; |
|
||||||
timers_[i]->heap_index = i; |
|
||||||
i = next_i; |
|
||||||
} |
|
||||||
timers_[i] = t; |
|
||||||
t->heap_index = i; |
|
||||||
} |
|
||||||
|
|
||||||
void TimerHeap::NoteChangedPriority(Timer* timer) { |
|
||||||
uint32_t i = timer->heap_index; |
|
||||||
uint32_t parent = static_cast<uint32_t>((static_cast<int>(i) - 1) / 2); |
|
||||||
if (timers_[parent]->deadline > timer->deadline) { |
|
||||||
AdjustUpwards(i, timer); |
|
||||||
} else { |
|
||||||
AdjustDownwards(i, timer); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
bool TimerHeap::Add(Timer* timer) { |
|
||||||
timer->heap_index = timers_.size(); |
|
||||||
timers_.push_back(timer); |
|
||||||
AdjustUpwards(timer->heap_index, timer); |
|
||||||
return timer->heap_index == 0; |
|
||||||
} |
|
||||||
|
|
||||||
void TimerHeap::Remove(Timer* timer) { |
|
||||||
uint32_t i = timer->heap_index; |
|
||||||
if (i == timers_.size() - 1) { |
|
||||||
timers_.pop_back(); |
|
||||||
return; |
|
||||||
} |
|
||||||
timers_[i] = timers_[timers_.size() - 1]; |
|
||||||
timers_[i]->heap_index = i; |
|
||||||
timers_.pop_back(); |
|
||||||
NoteChangedPriority(timers_[i]); |
|
||||||
} |
|
||||||
|
|
||||||
bool TimerHeap::is_empty() { return timers_.empty(); } |
|
||||||
|
|
||||||
Timer* TimerHeap::Top() { return timers_[0]; } |
|
||||||
|
|
||||||
void TimerHeap::Pop() { Remove(Top()); } |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
@ -1,56 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_HEAP_H |
|
||||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_HEAP_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <cstddef> |
|
||||||
#include <vector> |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
struct Timer; |
|
||||||
|
|
||||||
class TimerHeap { |
|
||||||
public: |
|
||||||
/* return true if the new timer is the first timer in the heap */ |
|
||||||
bool Add(Timer* timer); |
|
||||||
|
|
||||||
void Remove(Timer* timer); |
|
||||||
Timer* Top(); |
|
||||||
void Pop(); |
|
||||||
|
|
||||||
bool is_empty(); |
|
||||||
|
|
||||||
const std::vector<Timer*>& TestOnlyGetTimers() const { return timers_; } |
|
||||||
|
|
||||||
private: |
|
||||||
void AdjustUpwards(size_t i, Timer* t); |
|
||||||
void AdjustDownwards(size_t i, Timer* t); |
|
||||||
void NoteChangedPriority(Timer* timer); |
|
||||||
|
|
||||||
std::vector<Timer*> timers_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_HEAP_H */ |
|
@ -1,254 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2017 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer_manager.h" |
|
||||||
|
|
||||||
#include <algorithm> |
|
||||||
#include <memory> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/memory/memory.h" |
|
||||||
#include "absl/time/time.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
|
|
||||||
#include <grpc/impl/codegen/gpr_types.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
#include <grpc/support/time.h> |
|
||||||
|
|
||||||
#include "src/core/lib/gprpp/thd.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
namespace { |
|
||||||
class ThreadCollector { |
|
||||||
public: |
|
||||||
ThreadCollector() = default; |
|
||||||
~ThreadCollector(); |
|
||||||
|
|
||||||
void Collect(std::vector<grpc_core::Thread> threads) { |
|
||||||
GPR_ASSERT(threads_.empty()); |
|
||||||
threads_ = std::move(threads); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
std::vector<grpc_core::Thread> threads_; |
|
||||||
}; |
|
||||||
|
|
||||||
ThreadCollector::~ThreadCollector() { |
|
||||||
for (auto& t : threads_) t.Join(); |
|
||||||
} |
|
||||||
} // namespace
|
|
||||||
|
|
||||||
void TimerManager::StartThread() { |
|
||||||
++waiter_count_; |
|
||||||
++thread_count_; |
|
||||||
auto* thread = new RunThreadArgs(); |
|
||||||
thread->self = this; |
|
||||||
thread->thread = |
|
||||||
grpc_core::Thread("timer_manager", &TimerManager::RunThread, thread); |
|
||||||
thread->thread.Start(); |
|
||||||
} |
|
||||||
|
|
||||||
void TimerManager::RunSomeTimers( |
|
||||||
std::vector<experimental::EventEngine::Closure*> timers) { |
|
||||||
// if there's something to execute...
|
|
||||||
ThreadCollector collector; |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
// remove a waiter from the pool, and start another thread if necessary
|
|
||||||
--waiter_count_; |
|
||||||
if (waiter_count_ == 0) { |
|
||||||
// The number of timer threads is always increasing until all the threads
|
|
||||||
// are stopped. In rare cases, if a large number of timers fire
|
|
||||||
// simultaneously, we may end up using a large number of threads.
|
|
||||||
// TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
|
|
||||||
StartThread(); |
|
||||||
} else { |
|
||||||
// if there's no thread waiting with a timeout, kick an existing untimed
|
|
||||||
// waiter so that the next deadline is not missed
|
|
||||||
if (!has_timed_waiter_) { |
|
||||||
cv_.Signal(); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
for (auto* timer : timers) { |
|
||||||
timer->Run(); |
|
||||||
} |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
collector.Collect(std::move(completed_threads_)); |
|
||||||
// get ready to wait again
|
|
||||||
++waiter_count_; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// wait until 'next' (or forever if there is already a timed waiter in the pool)
|
|
||||||
// returns true if the thread should continue executing (false if it should
|
|
||||||
// shutdown)
|
|
||||||
bool TimerManager::WaitUntil(grpc_core::Timestamp next) { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
|
|
||||||
if (shutdown_) { |
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
// TODO(ctiller): if there are too many waiting threads, this would be a good
|
|
||||||
// place to exit the current thread.
|
|
||||||
|
|
||||||
// If kicked_ is true at this point, it means there was a kick from the timer
|
|
||||||
// system that the timer-manager threads here missed. We cannot trust 'next'
|
|
||||||
// here any longer (since there might be an earlier deadline). So if kicked_
|
|
||||||
// is true at this point, we should quickly exit this and get the next
|
|
||||||
// deadline from the timer system
|
|
||||||
|
|
||||||
if (!kicked_) { |
|
||||||
// if there's no timed waiter, we should become one: that waiter waits
|
|
||||||
// only until the next timer should expire. All other timers wait forever
|
|
||||||
//
|
|
||||||
// 'timed_waiter_generation_' is a global generation counter. The idea here
|
|
||||||
// is that the thread becoming a timed-waiter increments and stores this
|
|
||||||
// global counter locally in 'my_timed_waiter_generation' before going to
|
|
||||||
// sleep. After waking up, if my_timed_waiter_generation ==
|
|
||||||
// timed_waiter_generation_, it can be sure that it was the timed_waiter
|
|
||||||
// thread (and that no other thread took over while this was asleep)
|
|
||||||
//
|
|
||||||
// Initialize my_timed_waiter_generation to some value that is NOT equal to
|
|
||||||
// timed_waiter_generation_
|
|
||||||
uint64_t my_timed_waiter_generation = timed_waiter_generation_ - 1; |
|
||||||
|
|
||||||
/* If there's no timed waiter, we should become one: that waiter waits only
|
|
||||||
until the next timer should expire. All other timer threads wait forever |
|
||||||
unless their 'next' is earlier than the current timed-waiter's deadline |
|
||||||
(in which case the thread with earlier 'next' takes over as the new timed |
|
||||||
waiter) */ |
|
||||||
if (next != grpc_core::Timestamp::InfFuture()) { |
|
||||||
if (!has_timed_waiter_ || (next < timed_waiter_deadline_)) { |
|
||||||
my_timed_waiter_generation = ++timed_waiter_generation_; |
|
||||||
has_timed_waiter_ = true; |
|
||||||
timed_waiter_deadline_ = next; |
|
||||||
} else { // timed_waiter_ == true && next >= timed_waiter_deadline_
|
|
||||||
next = grpc_core::Timestamp::InfFuture(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
cv_.WaitWithTimeout(&mu_, |
|
||||||
absl::Milliseconds((next - host_.Now()).millis())); |
|
||||||
|
|
||||||
// if this was the timed waiter, then we need to check timers, and flag
|
|
||||||
// that there's now no timed waiter... we'll look for a replacement if
|
|
||||||
// there's work to do after checking timers (code above)
|
|
||||||
if (my_timed_waiter_generation == timed_waiter_generation_) { |
|
||||||
++wakeups_; |
|
||||||
has_timed_waiter_ = false; |
|
||||||
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
kicked_ = false; |
|
||||||
|
|
||||||
return true; |
|
||||||
} |
|
||||||
|
|
||||||
void TimerManager::MainLoop() { |
|
||||||
for (;;) { |
|
||||||
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture(); |
|
||||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> |
|
||||||
check_result = timer_list_->TimerCheck(&next); |
|
||||||
if (check_result.has_value()) { |
|
||||||
if (!check_result->empty()) { |
|
||||||
RunSomeTimers(std::move(*check_result)); |
|
||||||
continue; |
|
||||||
} |
|
||||||
} else { |
|
||||||
/* This case only happens under contention, meaning more than one timer
|
|
||||||
manager thread checked timers concurrently. |
|
||||||
|
|
||||||
If that happens, we're guaranteed that some other thread has just |
|
||||||
checked timers, and this will avalanche into some other thread seeing |
|
||||||
empty timers and doing a timed sleep. |
|
||||||
|
|
||||||
Consequently, we can just sleep forever here and be happy at some |
|
||||||
saved wakeup cycles. */ |
|
||||||
next = grpc_core::Timestamp::InfFuture(); |
|
||||||
} |
|
||||||
if (!WaitUntil(next)) return; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void TimerManager::RunThread(void* arg) { |
|
||||||
std::unique_ptr<RunThreadArgs> thread(static_cast<RunThreadArgs*>(arg)); |
|
||||||
thread->self->MainLoop(); |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&thread->self->mu_); |
|
||||||
thread->self->thread_count_--; |
|
||||||
thread->self->completed_threads_.push_back(std::move(thread->thread)); |
|
||||||
} |
|
||||||
thread->self->cv_.Signal(); |
|
||||||
} |
|
||||||
|
|
||||||
TimerManager::TimerManager() : host_(this) { |
|
||||||
timer_list_ = absl::make_unique<TimerList>(&host_); |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
StartThread(); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_core::Timestamp TimerManager::Host::Now() { |
|
||||||
return grpc_core::Timestamp::FromTimespecRoundDown( |
|
||||||
gpr_now(GPR_CLOCK_MONOTONIC)); |
|
||||||
} |
|
||||||
|
|
||||||
void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
|
||||||
experimental::EventEngine::Closure* closure) { |
|
||||||
timer_list_->TimerInit(timer, deadline, closure); |
|
||||||
} |
|
||||||
|
|
||||||
bool TimerManager::TimerCancel(Timer* timer) { |
|
||||||
return timer_list_->TimerCancel(timer); |
|
||||||
} |
|
||||||
|
|
||||||
TimerManager::~TimerManager() { |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
shutdown_ = true; |
|
||||||
cv_.SignalAll(); |
|
||||||
} |
|
||||||
while (true) { |
|
||||||
ThreadCollector collector; |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
collector.Collect(std::move(completed_threads_)); |
|
||||||
if (thread_count_ == 0) break; |
|
||||||
cv_.Wait(&mu_); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void TimerManager::Host::Kick() { timer_manager_->Kick(); } |
|
||||||
|
|
||||||
void TimerManager::Kick() { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
has_timed_waiter_ = false; |
|
||||||
timed_waiter_deadline_ = grpc_core::Timestamp::InfFuture(); |
|
||||||
++timed_waiter_generation_; |
|
||||||
kicked_ = true; |
|
||||||
cv_.Signal(); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
@ -1,111 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2017 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_MANAGER_H |
|
||||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_MANAGER_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <stddef.h> |
|
||||||
#include <stdint.h> |
|
||||||
|
|
||||||
#include <memory> |
|
||||||
#include <vector> |
|
||||||
|
|
||||||
#include "absl/base/thread_annotations.h" |
|
||||||
|
|
||||||
#include <grpc/event_engine/event_engine.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
|
||||||
#include "src/core/lib/gprpp/sync.h" |
|
||||||
#include "src/core/lib/gprpp/thd.h" |
|
||||||
#include "src/core/lib/gprpp/time.h" |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
// Timer Manager tries to keep only one thread waiting for the next timeout at
|
|
||||||
// all times, and thus effectively preventing the thundering herd problem.
|
|
||||||
// TODO(ctiller): consider unifying this thread pool and the one in
|
|
||||||
// thread_pool.{h,cc}.
|
|
||||||
class TimerManager final { |
|
||||||
public: |
|
||||||
TimerManager(); |
|
||||||
~TimerManager(); |
|
||||||
|
|
||||||
grpc_core::Timestamp Now() { return host_.Now(); } |
|
||||||
|
|
||||||
void TimerInit(Timer* timer, grpc_core::Timestamp deadline, |
|
||||||
experimental::EventEngine::Closure* closure); |
|
||||||
bool TimerCancel(Timer* timer); |
|
||||||
|
|
||||||
private: |
|
||||||
struct RunThreadArgs { |
|
||||||
TimerManager* self; |
|
||||||
grpc_core::Thread thread; |
|
||||||
}; |
|
||||||
|
|
||||||
class Host final : public TimerListHost { |
|
||||||
public: |
|
||||||
explicit Host(TimerManager* timer_manager) |
|
||||||
: timer_manager_(timer_manager) {} |
|
||||||
|
|
||||||
void Kick() override; |
|
||||||
grpc_core::Timestamp Now() override; |
|
||||||
|
|
||||||
private: |
|
||||||
TimerManager* const timer_manager_; |
|
||||||
}; |
|
||||||
|
|
||||||
void StartThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
|
||||||
static void RunThread(void* arg); |
|
||||||
void MainLoop(); |
|
||||||
void RunSomeTimers(std::vector<experimental::EventEngine::Closure*> timers); |
|
||||||
bool WaitUntil(grpc_core::Timestamp next); |
|
||||||
void Kick(); |
|
||||||
|
|
||||||
grpc_core::Mutex mu_; |
|
||||||
grpc_core::CondVar cv_; |
|
||||||
Host host_; |
|
||||||
// number of threads in the system
|
|
||||||
size_t thread_count_ ABSL_GUARDED_BY(mu_) = 0; |
|
||||||
// number of threads sitting around waiting
|
|
||||||
size_t waiter_count_ ABSL_GUARDED_BY(mu_) = 0; |
|
||||||
// Threads waiting to be joined
|
|
||||||
std::vector<grpc_core::Thread> completed_threads_ ABSL_GUARDED_BY(mu_); |
|
||||||
// is there a thread waiting until the next timer should fire?
|
|
||||||
bool has_timed_waiter_ ABSL_GUARDED_BY(mu_) = false; |
|
||||||
// are we shutting down?
|
|
||||||
bool shutdown_ ABSL_GUARDED_BY(mu_) = false; |
|
||||||
// are we shutting down?
|
|
||||||
bool kicked_ ABSL_GUARDED_BY(mu_) = false; |
|
||||||
// the deadline of the current timed waiter thread (only relevant if
|
|
||||||
// has_timed_waiter_ is true)
|
|
||||||
grpc_core::Timestamp timed_waiter_deadline_ ABSL_GUARDED_BY(mu_); |
|
||||||
// generation counter to track which thread is waiting for the next timer
|
|
||||||
uint64_t timed_waiter_generation_ ABSL_GUARDED_BY(mu_) = 0; |
|
||||||
// number of timer wakeups
|
|
||||||
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = 0; |
|
||||||
// actual timer implementation
|
|
||||||
std::unique_ptr<TimerList> timer_list_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_TIMER_MANAGER_H */ |
|
@ -1,62 +0,0 @@ |
|||||||
# Copyright 2017 gRPC authors. |
|
||||||
# |
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
# you may not use this file except in compliance with the License. |
|
||||||
# You may obtain a copy of the License at |
|
||||||
# |
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
# |
|
||||||
# Unless required by applicable law or agreed to in writing, software |
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
# See the License for the specific language governing permissions and |
|
||||||
# limitations under the License. |
|
||||||
|
|
||||||
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") |
|
||||||
|
|
||||||
licenses(["notice"]) |
|
||||||
|
|
||||||
grpc_package( |
|
||||||
name = "test/core/event_engine/iomgr_event_engine", |
|
||||||
visibility = "public", |
|
||||||
) # Useful for third party devs to test their io manager implementation. |
|
||||||
|
|
||||||
grpc_cc_test( |
|
||||||
name = "time_averaged_stats_test", |
|
||||||
srcs = ["time_averaged_stats_test.cc"], |
|
||||||
external_deps = ["gtest"], |
|
||||||
language = "C++", |
|
||||||
uses_event_engine = False, |
|
||||||
uses_polling = False, |
|
||||||
deps = [ |
|
||||||
"//:iomgr_ee_time_averaged_stats", |
|
||||||
"//test/core/util:grpc_suppressions", |
|
||||||
], |
|
||||||
) |
|
||||||
|
|
||||||
grpc_cc_test( |
|
||||||
name = "timer_heap_test", |
|
||||||
srcs = ["timer_heap_test.cc"], |
|
||||||
external_deps = ["gtest"], |
|
||||||
language = "C++", |
|
||||||
uses_event_engine = False, |
|
||||||
uses_polling = False, |
|
||||||
deps = [ |
|
||||||
"//:bitset", |
|
||||||
"//:iomgr_ee_timer", |
|
||||||
"//test/core/util:grpc_suppressions", |
|
||||||
], |
|
||||||
) |
|
||||||
|
|
||||||
grpc_cc_test( |
|
||||||
name = "timer_list_test", |
|
||||||
srcs = ["timer_list_test.cc"], |
|
||||||
external_deps = ["gtest"], |
|
||||||
language = "C++", |
|
||||||
uses_event_engine = False, |
|
||||||
uses_polling = False, |
|
||||||
deps = [ |
|
||||||
"//:iomgr_ee_timer", |
|
||||||
"//test/core/util:grpc_suppressions", |
|
||||||
], |
|
||||||
) |
|
@ -1,179 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/time_averaged_stats.h" |
|
||||||
|
|
||||||
#include <math.h> |
|
||||||
|
|
||||||
#include <gtest/gtest.h> |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, NoRegressNoPersistTest1) { |
|
||||||
TimeAveragedStats tas(1000, 0, 0.0); |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
/* Should have no effect */ |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
/* Should replace old average */ |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, NoRegressNoPersistTest2) { |
|
||||||
TimeAveragedStats tas(1000, 0, 0.0); |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
/* Should replace init value */ |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(3000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, NoRegressNoPersistTest3) { |
|
||||||
TimeAveragedStats tas(1000, 0, 0.0); |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
/* Should replace init value */ |
|
||||||
tas.AddSample(2500); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(2500, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(3500); |
|
||||||
tas.AddSample(4500); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(4000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, SomeRegressNoPersistTest) { |
|
||||||
TimeAveragedStats tas(1000, 0.5, 0.0); |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
/* (2 * 2000 + 0.5 * 1000) / 2.5 */ |
|
||||||
EXPECT_DOUBLE_EQ(1800, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2.5, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, SomeDecayTest) { |
|
||||||
TimeAveragedStats tas(1000, 1, 0.0); |
|
||||||
EXPECT_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
/* Should avg with init value */ |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(1500, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(1500, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(1500, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, NoRegressFullPersistTest) { |
|
||||||
TimeAveragedStats tas(1000, 0, 1.0); |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(0, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
/* Should replace init value */ |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_EQ(2000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_EQ(1, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
/* Will result in average of the 3 samples. */ |
|
||||||
tas.AddSample(2300); |
|
||||||
tas.AddSample(2300); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(2200, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(3, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, NoRegressSomePersistTest) { |
|
||||||
TimeAveragedStats tas(1000, 0, 0.5); |
|
||||||
/* Should replace init value */ |
|
||||||
tas.AddSample(2000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(1, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(2500); |
|
||||||
tas.AddSample(4000); |
|
||||||
tas.UpdateAverage(); |
|
||||||
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2.5, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimeAveragedStatsTest, SomeRegressSomePersistTest) { |
|
||||||
TimeAveragedStats tas(1000, 0.4, 0.6); |
|
||||||
/* Sample weight = 0 */ |
|
||||||
EXPECT_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_EQ(0, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.UpdateAverage(); |
|
||||||
/* (0.6 * 0 * 1000 + 0.4 * 1000 / 0.4) */ |
|
||||||
EXPECT_DOUBLE_EQ(1000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(0.4, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(2640); |
|
||||||
tas.UpdateAverage(); |
|
||||||
/* (1 * 2640 + 0.6 * 0.4 * 1000 + 0.4 * 1000 / (1 + 0.6 * 0.4 + 0.4) */ |
|
||||||
EXPECT_DOUBLE_EQ(2000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(1.64, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(2876.8); |
|
||||||
tas.UpdateAverage(); |
|
||||||
/* (1 * 2876.8 + 0.6 * 1.64 * 2000 + 0.4 * 1000 / (1 + 0.6 * 1.64 + 0.4) */ |
|
||||||
EXPECT_DOUBLE_EQ(2200, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2.384, tas.aggregate_total_weight()); |
|
||||||
|
|
||||||
tas.AddSample(4944.32); |
|
||||||
tas.UpdateAverage(); |
|
||||||
/* (1 * 4944.32 + 0.6 * 2.384 * 2200 + 0.4 * 1000) /
|
|
||||||
(1 + 0.6 * 2.384 + 0.4) */ |
|
||||||
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg()); |
|
||||||
EXPECT_DOUBLE_EQ(2.8304, tas.aggregate_total_weight()); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
|
||||||
::testing::InitGoogleTest(&argc, argv); |
|
||||||
return RUN_ALL_TESTS(); |
|
||||||
} |
|
@ -1,202 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer_heap.h" |
|
||||||
|
|
||||||
#include <stdlib.h> |
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <gmock/gmock.h> |
|
||||||
#include <gtest/gtest.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
|
||||||
#include "src/core/lib/gpr/useful.h" |
|
||||||
#include "src/core/lib/gprpp/bitset.h" |
|
||||||
|
|
||||||
using testing::Contains; |
|
||||||
using testing::Not; |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
namespace { |
|
||||||
int64_t RandomDeadline(void) { return rand(); } |
|
||||||
|
|
||||||
std::vector<Timer> CreateTestElements(size_t num_elements) { |
|
||||||
std::vector<Timer> elems(num_elements); |
|
||||||
for (size_t i = 0; i < num_elements; i++) { |
|
||||||
elems[i].deadline = RandomDeadline(); |
|
||||||
} |
|
||||||
return elems; |
|
||||||
} |
|
||||||
|
|
||||||
void CheckValid(TimerHeap* pq) { |
|
||||||
const std::vector<Timer*>& timers = pq->TestOnlyGetTimers(); |
|
||||||
for (size_t i = 0; i < timers.size(); ++i) { |
|
||||||
size_t left_child = 1u + 2u * i; |
|
||||||
size_t right_child = left_child + 1u; |
|
||||||
if (left_child < timers.size()) { |
|
||||||
EXPECT_LE(timers[i]->deadline, timers[left_child]->deadline); |
|
||||||
} |
|
||||||
if (right_child < timers.size()) { |
|
||||||
EXPECT_LE(timers[i]->deadline, timers[right_child]->deadline); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
TEST(TimerHeapTest, Basics) { |
|
||||||
TimerHeap pq; |
|
||||||
const size_t num_test_elements = 200; |
|
||||||
const size_t num_test_operations = 10000; |
|
||||||
size_t i; |
|
||||||
std::vector<Timer> test_elements = CreateTestElements(num_test_elements); |
|
||||||
grpc_core::BitSet<num_test_elements> inpq; |
|
||||||
|
|
||||||
EXPECT_TRUE(pq.is_empty()); |
|
||||||
CheckValid(&pq); |
|
||||||
for (i = 0; i < num_test_elements; ++i) { |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Not(Contains(&test_elements[i]))); |
|
||||||
pq.Add(&test_elements[i]); |
|
||||||
CheckValid(&pq); |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(&test_elements[i])); |
|
||||||
inpq.set(i); |
|
||||||
} |
|
||||||
for (i = 0; i < num_test_elements; ++i) { |
|
||||||
/* Test that check still succeeds even for element that wasn't just
|
|
||||||
inserted. */ |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(&test_elements[i])); |
|
||||||
} |
|
||||||
|
|
||||||
EXPECT_EQ(pq.TestOnlyGetTimers().size(), num_test_elements); |
|
||||||
CheckValid(&pq); |
|
||||||
|
|
||||||
for (i = 0; i < num_test_operations; ++i) { |
|
||||||
size_t elem_num = static_cast<size_t>(rand()) % num_test_elements; |
|
||||||
Timer* el = &test_elements[elem_num]; |
|
||||||
if (!inpq.is_set(elem_num)) { /* not in pq */ |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Not(Contains(el))); |
|
||||||
el->deadline = RandomDeadline(); |
|
||||||
pq.Add(el); |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(el)); |
|
||||||
inpq.set(elem_num); |
|
||||||
CheckValid(&pq); |
|
||||||
} else { |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Contains(el)); |
|
||||||
pq.Remove(el); |
|
||||||
EXPECT_THAT(pq.TestOnlyGetTimers(), Not(Contains(el))); |
|
||||||
inpq.clear(elem_num); |
|
||||||
CheckValid(&pq); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
struct ElemStruct { |
|
||||||
Timer elem; |
|
||||||
bool inserted = false; |
|
||||||
}; |
|
||||||
|
|
||||||
ElemStruct* SearchElems(std::vector<ElemStruct>& elems, bool inserted) { |
|
||||||
std::vector<size_t> search_order; |
|
||||||
for (size_t i = 0; i < elems.size(); i++) { |
|
||||||
search_order.push_back(i); |
|
||||||
} |
|
||||||
for (size_t i = 0; i < elems.size() * 2; i++) { |
|
||||||
size_t a = static_cast<size_t>(rand()) % elems.size(); |
|
||||||
size_t b = static_cast<size_t>(rand()) % elems.size(); |
|
||||||
std::swap(search_order[a], search_order[b]); |
|
||||||
} |
|
||||||
ElemStruct* out = nullptr; |
|
||||||
for (size_t i = 0; out == nullptr && i < elems.size(); i++) { |
|
||||||
if (elems[search_order[i]].inserted == inserted) { |
|
||||||
out = &elems[search_order[i]]; |
|
||||||
} |
|
||||||
} |
|
||||||
return out; |
|
||||||
} |
|
||||||
|
|
||||||
// TODO(ctiller): this should be an actual fuzzer
|
|
||||||
TEST(TimerHeapTest, RandomMutations) { |
|
||||||
TimerHeap pq; |
|
||||||
|
|
||||||
static const size_t elems_size = 1000; |
|
||||||
std::vector<ElemStruct> elems(elems_size); |
|
||||||
size_t num_inserted = 0; |
|
||||||
|
|
||||||
for (size_t round = 0; round < 10000; round++) { |
|
||||||
int r = rand() % 1000; |
|
||||||
if (r <= 550) { |
|
||||||
/* 55% of the time we try to add something */ |
|
||||||
ElemStruct* el = SearchElems(elems, false); |
|
||||||
if (el != nullptr) { |
|
||||||
el->elem.deadline = RandomDeadline(); |
|
||||||
pq.Add(&el->elem); |
|
||||||
el->inserted = true; |
|
||||||
num_inserted++; |
|
||||||
CheckValid(&pq); |
|
||||||
} |
|
||||||
} else if (r <= 650) { |
|
||||||
/* 10% of the time we try to remove something */ |
|
||||||
ElemStruct* el = SearchElems(elems, true); |
|
||||||
if (el != nullptr) { |
|
||||||
pq.Remove(&el->elem); |
|
||||||
el->inserted = false; |
|
||||||
num_inserted--; |
|
||||||
CheckValid(&pq); |
|
||||||
} |
|
||||||
} else { |
|
||||||
/* the remaining times we pop */ |
|
||||||
if (num_inserted > 0) { |
|
||||||
Timer* top = pq.Top(); |
|
||||||
pq.Pop(); |
|
||||||
for (size_t i = 0; i < elems_size; i++) { |
|
||||||
if (top == &elems[i].elem) { |
|
||||||
GPR_ASSERT(elems[i].inserted); |
|
||||||
elems[i].inserted = false; |
|
||||||
} |
|
||||||
} |
|
||||||
num_inserted--; |
|
||||||
CheckValid(&pq); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
if (num_inserted) { |
|
||||||
int64_t* min_deadline = nullptr; |
|
||||||
for (size_t i = 0; i < elems_size; i++) { |
|
||||||
if (elems[i].inserted) { |
|
||||||
if (min_deadline == nullptr) { |
|
||||||
min_deadline = &elems[i].elem.deadline; |
|
||||||
} else { |
|
||||||
if (elems[i].elem.deadline < *min_deadline) { |
|
||||||
min_deadline = &elems[i].elem.deadline; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
GPR_ASSERT(pq.Top()->deadline == *min_deadline); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
|
||||||
::testing::InitGoogleTest(&argc, argv); |
|
||||||
return RUN_ALL_TESTS(); |
|
||||||
} |
|
@ -1,246 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <cstdint> |
|
||||||
#include <limits> |
|
||||||
|
|
||||||
#include <gmock/gmock.h> |
|
||||||
#include <gtest/gtest.h> |
|
||||||
|
|
||||||
#include <grpc/grpc.h> |
|
||||||
|
|
||||||
#include "src/core/lib/event_engine/iomgr_engine/timer.h" |
|
||||||
#include "src/core/lib/gprpp/time.h" |
|
||||||
|
|
||||||
using testing::Mock; |
|
||||||
using testing::Return; |
|
||||||
using testing::StrictMock; |
|
||||||
|
|
||||||
namespace grpc_event_engine { |
|
||||||
namespace iomgr_engine { |
|
||||||
|
|
||||||
namespace { |
|
||||||
const int64_t kHoursIn25Days = 25 * 24; |
|
||||||
const grpc_core::Duration k25Days = grpc_core::Duration::Hours(kHoursIn25Days); |
|
||||||
|
|
||||||
class MockClosure : public experimental::EventEngine::Closure { |
|
||||||
public: |
|
||||||
MOCK_METHOD(void, Run, ()); |
|
||||||
}; |
|
||||||
|
|
||||||
class MockHost : public TimerListHost { |
|
||||||
public: |
|
||||||
MOCK_METHOD(grpc_core::Timestamp, Now, ()); |
|
||||||
MOCK_METHOD(void, Kick, ()); |
|
||||||
}; |
|
||||||
|
|
||||||
enum class CheckResult { kTimersFired, kCheckedAndEmpty, kNotChecked }; |
|
||||||
|
|
||||||
CheckResult FinishCheck( |
|
||||||
absl::optional<std::vector<experimental::EventEngine::Closure*>> result) { |
|
||||||
if (!result.has_value()) return CheckResult::kNotChecked; |
|
||||||
if (result->empty()) return CheckResult::kCheckedAndEmpty; |
|
||||||
for (auto closure : *result) { |
|
||||||
closure->Run(); |
|
||||||
} |
|
||||||
return CheckResult::kTimersFired; |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
TEST(TimerListTest, Add) { |
|
||||||
Timer timers[20]; |
|
||||||
StrictMock<MockClosure> closures[20]; |
|
||||||
|
|
||||||
const auto kStart = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(100); |
|
||||||
|
|
||||||
StrictMock<MockHost> host; |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
TimerList timer_list(&host); |
|
||||||
|
|
||||||
/* 10 ms timers. will expire in the current epoch */ |
|
||||||
for (int i = 0; i < 10; i++) { |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
timer_list.TimerInit(&timers[i], |
|
||||||
kStart + grpc_core::Duration::Milliseconds(10), |
|
||||||
&closures[i]); |
|
||||||
} |
|
||||||
|
|
||||||
/* 1010 ms timers. will expire in the next epoch */ |
|
||||||
for (int i = 10; i < 20; i++) { |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
timer_list.TimerInit(&timers[i], |
|
||||||
kStart + grpc_core::Duration::Milliseconds(1010), |
|
||||||
&closures[i]); |
|
||||||
} |
|
||||||
|
|
||||||
/* collect timers. Only the first batch should be ready. */ |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(500))); |
|
||||||
for (int i = 0; i < 10; i++) { |
|
||||||
EXPECT_CALL(closures[i], Run()); |
|
||||||
} |
|
||||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
|
||||||
CheckResult::kTimersFired); |
|
||||||
for (int i = 0; i < 10; i++) { |
|
||||||
Mock::VerifyAndClearExpectations(&closures[i]); |
|
||||||
} |
|
||||||
|
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(600))); |
|
||||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
|
||||||
CheckResult::kCheckedAndEmpty); |
|
||||||
|
|
||||||
/* collect the rest of the timers */ |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(1500))); |
|
||||||
for (int i = 10; i < 20; i++) { |
|
||||||
EXPECT_CALL(closures[i], Run()); |
|
||||||
} |
|
||||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
|
||||||
CheckResult::kTimersFired); |
|
||||||
for (int i = 10; i < 20; i++) { |
|
||||||
Mock::VerifyAndClearExpectations(&closures[i]); |
|
||||||
} |
|
||||||
|
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(1600))); |
|
||||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
|
||||||
CheckResult::kCheckedAndEmpty); |
|
||||||
} |
|
||||||
|
|
||||||
/* Cleaning up a list with pending timers. */ |
|
||||||
TEST(TimerListTest, Destruction) { |
|
||||||
Timer timers[5]; |
|
||||||
StrictMock<MockClosure> closures[5]; |
|
||||||
|
|
||||||
StrictMock<MockHost> host; |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
|
||||||
TimerList timer_list(&host); |
|
||||||
|
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
|
||||||
timer_list.TimerInit( |
|
||||||
&timers[0], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(100), |
|
||||||
&closures[0]); |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
|
||||||
timer_list.TimerInit( |
|
||||||
&timers[1], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(3), |
|
||||||
&closures[1]); |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
|
||||||
timer_list.TimerInit( |
|
||||||
&timers[2], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(100), |
|
||||||
&closures[2]); |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
|
||||||
timer_list.TimerInit( |
|
||||||
&timers[3], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(3), |
|
||||||
&closures[3]); |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(0))); |
|
||||||
timer_list.TimerInit( |
|
||||||
&timers[4], grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(1), |
|
||||||
&closures[4]); |
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce( |
|
||||||
Return(grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(2))); |
|
||||||
EXPECT_CALL(closures[4], Run()); |
|
||||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
|
||||||
CheckResult::kTimersFired); |
|
||||||
Mock::VerifyAndClearExpectations(&closures[4]); |
|
||||||
EXPECT_FALSE(timer_list.TimerCancel(&timers[4])); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[0])); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[3])); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[1])); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[2])); |
|
||||||
} |
|
||||||
|
|
||||||
/* Cleans up a list with pending timers that simulate long-running-services.
|
|
||||||
This test does the following: |
|
||||||
1) Simulates grpc server start time to 25 days in the past (completed in |
|
||||||
`main` using TestOnlyGlobalInit()) |
|
||||||
2) Creates 4 timers - one with a deadline 25 days in the future, one just |
|
||||||
3 milliseconds in future, one way out in the future, and one using the |
|
||||||
Timestamp::FromTimespecRoundUp function to compute a deadline of 25 |
|
||||||
days in the future |
|
||||||
3) Simulates 4 milliseconds of elapsed time by changing `now` (cached at |
|
||||||
step 1) to `now+4` |
|
||||||
4) Shuts down the timer list |
|
||||||
https://github.com/grpc/grpc/issues/15904 */
|
|
||||||
TEST(TimerListTest, LongRunningServiceCleanup) { |
|
||||||
Timer timers[4]; |
|
||||||
StrictMock<MockClosure> closures[4]; |
|
||||||
|
|
||||||
const auto kStart = |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(k25Days.millis()); |
|
||||||
|
|
||||||
StrictMock<MockHost> host; |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
TimerList timer_list(&host); |
|
||||||
|
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
timer_list.TimerInit(&timers[0], kStart + k25Days, &closures[0]); |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
timer_list.TimerInit( |
|
||||||
&timers[1], kStart + grpc_core::Duration::Milliseconds(3), &closures[1]); |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
timer_list.TimerInit(&timers[2], |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
std::numeric_limits<int64_t>::max() - 1), |
|
||||||
&closures[2]); |
|
||||||
|
|
||||||
gpr_timespec deadline_spec = |
|
||||||
(kStart + k25Days).as_timespec(gpr_clock_type::GPR_CLOCK_MONOTONIC); |
|
||||||
|
|
||||||
/* Timestamp::FromTimespecRoundUp is how users usually compute a millisecond
|
|
||||||
input value into grpc_timer_init, so we mimic that behavior here */ |
|
||||||
EXPECT_CALL(host, Now()).WillOnce(Return(kStart)); |
|
||||||
timer_list.TimerInit(&timers[3], |
|
||||||
grpc_core::Timestamp::FromTimespecRoundUp(deadline_spec), |
|
||||||
&closures[3]); |
|
||||||
|
|
||||||
EXPECT_CALL(host, Now()) |
|
||||||
.WillOnce(Return(kStart + grpc_core::Duration::Milliseconds(4))); |
|
||||||
EXPECT_CALL(closures[1], Run()); |
|
||||||
EXPECT_EQ(FinishCheck(timer_list.TimerCheck(nullptr)), |
|
||||||
CheckResult::kTimersFired); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[0])); |
|
||||||
EXPECT_FALSE(timer_list.TimerCancel(&timers[1])); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[2])); |
|
||||||
EXPECT_TRUE(timer_list.TimerCancel(&timers[3])); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace iomgr_engine
|
|
||||||
} // namespace grpc_event_engine
|
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
|
||||||
::testing::InitGoogleTest(&argc, argv); |
|
||||||
return RUN_ALL_TESTS(); |
|
||||||
} |
|
Loading…
Reference in new issue