[event_engine] Use durations for scheduling things (#30023)

* [event_engine] Use durations for scheduling things

* fix

* Automated change: Fix sanity tests

* run-after

* fix

* Automated change: Fix sanity tests

* rename

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
revert-30023-duration
Craig Tiller 2 years ago committed by GitHub
parent 2d0d1775a9
commit d9f64437b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      CMakeLists.txt
  3. 2
      build_autogenerated.yaml
  4. 24
      include/grpc/event_engine/event_engine.h
  5. 7
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  6. 1
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  7. 9
      src/core/ext/filters/client_channel/subchannel.cc
  8. 1
      src/core/lib/channel/channel_args_preconditioning.cc
  9. 47
      src/core/lib/event_engine/iomgr_engine.cc
  10. 17
      src/core/lib/event_engine/iomgr_engine.h
  11. 8
      src/core/lib/gprpp/time.cc
  12. 4
      src/core/lib/gprpp/time.h
  13. 17
      src/core/lib/promise/sleep.cc
  14. 5
      src/cpp/server/orca/orca_service.cc
  15. 28
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  16. 23
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  17. 10
      test/core/event_engine/test_suite/fuzzing_event_engine_test.cc
  18. 37
      test/core/event_engine/test_suite/timer_test.cc

@ -1224,7 +1224,6 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/time",
],
tags = ["grpc-autodeps"],
deps = [
@ -2084,6 +2083,7 @@ grpc_cc_library(
external_deps = ["absl/strings:str_format"],
tags = ["grpc-autodeps"],
deps = [
"event_engine_base_hdrs",
"gpr_base",
"gpr_codegen",
"gpr_platform",

2
CMakeLists.txt generated

@ -13814,6 +13814,7 @@ target_include_directories(periodic_update_test
target_link_libraries(periodic_update_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::statusor
gpr
upb
)
@ -16383,6 +16384,7 @@ target_link_libraries(test_core_gprpp_time_test
absl::memory
absl::random_random
absl::status
absl::statusor
absl::cord
absl::str_format
absl::strings

@ -7165,6 +7165,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/periodic_update_test.cc
deps:
- absl/status:statusor
- gpr
- upb
uses_polling: false
@ -8252,6 +8253,7 @@ targets:
- absl/memory:memory
- absl/random:random
- absl/status:status
- absl/status:statusor
- absl/strings:cord
- absl/strings:str_format
- absl/strings:strings

@ -21,7 +21,6 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/time/time.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/memory_allocator.h>
@ -74,6 +73,11 @@ namespace experimental {
////////////////////////////////////////////////////////////////////////////////
class EventEngine {
public:
/// A duration between two events.
///
/// Throughout the EventEngine API durations are used to express how long
/// until an action should be performed.
using Duration = std::chrono::duration<int64_t, std::nano>;
/// A custom closure type for EventEngine task execution.
///
/// Throughout the EventEngine API, \a Closure ownership is retained by the
@ -270,7 +274,7 @@ class EventEngine {
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
absl::Time deadline) = 0;
Duration timeout) = 0;
/// Request cancellation of a connection attempt.
///
@ -328,21 +332,21 @@ class EventEngine {
virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
absl::Time deadline) = 0;
Duration timeout) = 0;
/// Asynchronously perform an SRV record lookup.
///
/// \a on_resolve has the same meaning and expectations as \a
/// LookupHostname's \a on_resolve callback.
virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
absl::Time deadline) = 0;
Duration timeout) = 0;
/// Asynchronously perform a TXT record lookup.
///
/// \a on_resolve has the same meaning and expectations as \a
/// LookupHostname's \a on_resolve callback.
virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
absl::Time deadline) = 0;
Duration timeout) = 0;
/// Cancel an asynchronous lookup operation.
///
/// This shares the same semantics with \a EventEngine::Cancel: successfully
@ -384,13 +388,13 @@ class EventEngine {
/// in some scenarios. This overload is useful in situations where performance
/// is not a critical concern.
virtual void Run(std::function<void()> closure) = 0;
/// Synonymous with scheduling an alarm to run at time \a when.
/// Synonymous with scheduling an alarm to run after duration \a when.
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run, nor will it be deleted. Ownership remains with the caller.
virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0;
/// Synonymous with scheduling an alarm to run at time \a when.
virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0;
/// Synonymous with scheduling an alarm to run after duration \a when.
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
@ -398,10 +402,10 @@ class EventEngine {
/// version's \a closure will be deleted by the EventEngine after the closure
/// has been run, or upon cancellation.
///
/// This version of \a RunAt may be less performant than the \a Closure
/// This version of \a RunAfter may be less performant than the \a Closure
/// version in some scenarios. This overload is useful in situations where
/// performance is not a critical concern.
virtual TaskHandle RunAt(absl::Time when, std::function<void()> closure) = 0;
virtual TaskHandle RunAfter(Duration when, std::function<void()> closure) = 0;
/// Request cancellation of a task.
///
/// If the associated closure has already been scheduled to run, it will not

@ -78,8 +78,6 @@
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "upb/upb.hpp"
@ -996,9 +994,8 @@ void GrpcLb::BalancerCallState::StartQuery() {
}
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_handle_ = GetDefaultEventEngine()->RunAt(
absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()),
[this] {
client_load_report_handle_ =
GetDefaultEventEngine()->RunAfter(client_stats_report_interval_, [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
MaybeSendClientLoadReport();

@ -24,6 +24,7 @@
#include <memory>
#include <set>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

@ -28,8 +28,6 @@
#include <utility>
#include "absl/status/statusor.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/grpc.h>
#include <grpc/slice.h>
@ -943,8 +941,6 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
const Duration time_until_next_attempt =
next_attempt_time_ - ExecCtx::Get()->Now();
auto ee_deadline =
absl::Now() + absl::Milliseconds(time_until_next_attempt.millis());
gpr_log(GPR_INFO,
"subchannel %p %s: connect failed (%s), backing off for %" PRId64
" ms",
@ -952,8 +948,9 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
retry_timer_handle_ = GetDefaultEventEngine()->RunAt(
ee_deadline, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
retry_timer_handle_ = GetDefaultEventEngine()->RunAfter(
time_until_next_attempt,
[self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
{
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;

@ -16,6 +16,7 @@
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include <algorithm>
#include <utility>
namespace grpc_core {

@ -15,6 +15,7 @@
#include "src/core/lib/event_engine/iomgr_engine.h"
#include <algorithm>
#include <string>
#include <type_traits>
#include <utility>
@ -22,7 +23,6 @@
#include "absl/cleanup/cleanup.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/str_cat.h"
#include "absl/time/clock.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
@ -50,15 +50,12 @@ struct ClosureData {
EventEngine::TaskHandle handle;
};
// Timer limits due to quirks in the iomgr implementation.
// If deadline <= Now, the callback will be run inline, which can result in lock
// issues. And absl::InfiniteFuture yields UB.
absl::Time Clamp(absl::Time when) {
absl::Time max = absl::Now() + absl::Hours(8766);
absl::Time min = absl::Now() + absl::Milliseconds(2);
if (when > max) return max;
if (when < min) return min;
return when;
grpc_core::Timestamp ToTimestamp(EventEngine::Duration when) {
grpc_core::ExecCtx::Get()->InvalidateNow();
return grpc_core::ExecCtx::Get()->Now() +
std::max(grpc_core::Duration::Milliseconds(1),
grpc_core::Duration::NanosecondsRoundUp(when.count())) +
grpc_core::Duration::Milliseconds(1);
}
std::string HandleToString(EventEngine::TaskHandle handle) {
@ -92,14 +89,14 @@ bool IomgrEventEngine::Cancel(EventEngine::TaskHandle handle) {
return true;
}
EventEngine::TaskHandle IomgrEventEngine::RunAt(absl::Time when,
std::function<void()> closure) {
return RunAtInternal(when, std::move(closure));
EventEngine::TaskHandle IomgrEventEngine::RunAfter(
Duration when, std::function<void()> closure) {
return RunAfterInternal(when, std::move(closure));
}
EventEngine::TaskHandle IomgrEventEngine::RunAt(absl::Time when,
EventEngine::Closure* closure) {
return RunAtInternal(when, closure);
EventEngine::TaskHandle IomgrEventEngine::RunAfter(
Duration when, EventEngine::Closure* closure) {
return RunAfterInternal(when, closure);
}
void IomgrEventEngine::Run(std::function<void()> closure) {
@ -110,10 +107,10 @@ void IomgrEventEngine::Run(EventEngine::Closure* closure) {
RunInternal(closure);
}
EventEngine::TaskHandle IomgrEventEngine::RunAtInternal(
absl::Time when,
EventEngine::TaskHandle IomgrEventEngine::RunAfterInternal(
Duration when,
absl::variant<std::function<void()>, EventEngine::Closure*> cb) {
when = Clamp(when);
auto when_ts = ToTimestamp(when);
auto* cd = new ClosureData;
cd->cb = std::move(cb);
cd->engine = this;
@ -134,14 +131,6 @@ EventEngine::TaskHandle IomgrEventEngine::RunAtInternal(
[](std::function<void()> fn) { fn(); });
},
cd, nullptr);
// kludge to deal with realtime/monotonic clock conversion
absl::Time absl_now = absl::Now();
grpc_core::Duration duration = grpc_core::Duration::Milliseconds(
absl::ToInt64Milliseconds(when - absl_now) + 1);
grpc_core::ExecCtx::Get()->InvalidateNow();
grpc_core::Timestamp when_internal = grpc_core::ExecCtx::Get()->Now() +
duration +
grpc_core::Duration::Milliseconds(1);
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
aba_token_.fetch_add(1)};
grpc_core::MutexLock lock(&mu_);
@ -149,7 +138,7 @@ EventEngine::TaskHandle IomgrEventEngine::RunAtInternal(
cd->handle = handle;
GRPC_EVENT_ENGINE_TRACE("IomgrEventEngine:%p scheduling callback:%s", this,
HandleToString(handle).c_str());
grpc_timer_init(&cd->timer, when_internal, &cd->closure);
grpc_timer_init(&cd->timer, when_ts, &cd->closure);
return handle;
}
@ -188,7 +177,7 @@ bool IomgrEventEngine::CancelConnect(EventEngine::ConnectionHandle /*handle*/) {
EventEngine::ConnectionHandle IomgrEventEngine::Connect(
OnConnectCallback /*on_connect*/, const ResolvedAddress& /*addr*/,
const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
absl::Time /*deadline*/) {
Duration /*deadline*/) {
GPR_ASSERT(false && "unimplemented");
}

@ -25,7 +25,6 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/endpoint_config.h>
@ -65,13 +64,13 @@ class IomgrEventEngine final : public EventEngine {
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
absl::Time deadline) override;
Duration timeout) override;
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
absl::Time deadline) override;
Duration timeout) override;
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
absl::Time deadline) override;
Duration timeout) override;
bool CancelLookup(LookupTaskHandle handle) override;
};
@ -89,7 +88,7 @@ class IomgrEventEngine final : public EventEngine {
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
absl::Time deadline) override;
Duration timeout) override;
bool CancelConnect(ConnectionHandle handle) override;
bool IsWorkerThread() override;
@ -97,13 +96,13 @@ class IomgrEventEngine final : public EventEngine {
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(std::function<void()> closure) override;
TaskHandle RunAt(absl::Time when, Closure* closure) override;
TaskHandle RunAt(absl::Time when, std::function<void()> closure) override;
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when, std::function<void()> closure) override;
bool Cancel(TaskHandle handle) override;
private:
EventEngine::TaskHandle RunAtInternal(
absl::Time when,
EventEngine::TaskHandle RunAfterInternal(
Duration when,
absl::variant<std::function<void()>, EventEngine::Closure*> cb);
void RunInternal(

@ -17,6 +17,7 @@
#include "src/core/lib/gprpp/time.h"
#include <atomic>
#include <chrono>
#include <cstdint>
#include <limits>
#include <string>
@ -190,6 +191,13 @@ std::string Duration::ToJsonString() const {
return absl::StrFormat("%d.%09ds", ts.tv_sec, ts.tv_nsec);
}
Duration::operator grpc_event_engine::experimental::EventEngine::Duration()
const {
return std::chrono::milliseconds(
Clamp(millis_, std::numeric_limits<int64_t>::min() / GPR_NS_PER_MS,
std::numeric_limits<int64_t>::max() / GPR_NS_PER_MS));
}
void TestOnlySetProcessEpoch(gpr_timespec epoch) {
g_process_epoch_seconds.store(
gpr_convert_clock_type(epoch, GPR_CLOCK_MONOTONIC).tv_sec);

@ -23,6 +23,7 @@
#include <ostream>
#include <string>
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/time.h>
@ -207,6 +208,9 @@ class Duration {
constexpr int64_t millis() const { return millis_; }
double seconds() const { return static_cast<double>(millis_) / 1000.0; }
// NOLINTNEXTLINE: google-explicit-constructor
operator grpc_event_engine::experimental::EventEngine::Duration() const;
gpr_timespec as_timespec() const;
std::string ToString() const;

@ -16,9 +16,6 @@
#include "src/core/lib/promise/sleep.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
@ -57,16 +54,6 @@ void Sleep::OnTimer() {
tmp_waker.Wakeup();
}
// TODO(hork): refactor gpr_base to allow a separate time_util target.
namespace {
absl::Time ToAbslTime(Timestamp timestamp) {
if (timestamp == Timestamp::InfFuture()) return absl::InfiniteFuture();
if (timestamp == Timestamp::InfPast()) return absl::InfinitePast();
return absl::Now() +
absl::Milliseconds((timestamp - ExecCtx::Get()->Now()).millis());
}
} // namespace
Poll<absl::Status> Sleep::operator()() {
MutexLock lock(&mu_);
switch (stage_) {
@ -75,8 +62,8 @@ Poll<absl::Status> Sleep::operator()() {
return absl::OkStatus();
}
stage_ = Stage::kStarted;
timer_handle_ =
GetDefaultEventEngine()->RunAt(ToAbslTime(deadline_), [this] {
timer_handle_ = GetDefaultEventEngine()->RunAfter(
deadline_ - ExecCtx::Get()->Now(), [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
OnTimer();

@ -21,7 +21,6 @@
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "google/protobuf/duration.upb.h"
@ -124,8 +123,8 @@ class OrcaService::Reactor : public ServerWriteReactor<ByteBuffer>,
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc::internal::MutexLock lock(&timer_mu_);
timer_handle_ = GetDefaultEventEngine()->RunAt(
absl::Now() + absl::Milliseconds(report_interval_.millis()),
timer_handle_ = GetDefaultEventEngine()->RunAfter(
report_interval_,
[self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
}

@ -14,6 +14,8 @@
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include <chrono>
namespace grpc_event_engine {
namespace experimental {
@ -24,10 +26,10 @@ const intptr_t kTaskHandleSalt = 12345;
FuzzingEventEngine::FuzzingEventEngine(Options options)
: final_tick_length_(options.final_tick_length) {
for (const auto& delay : options.actions.tick_lengths()) {
tick_increments_[delay.id()] += absl::Microseconds(delay.delay_us());
tick_increments_[delay.id()] += std::chrono::microseconds(delay.delay_us());
}
for (const auto& delay : options.actions.run_delay()) {
task_delays_[delay.id()] += absl::Microseconds(delay.delay_us());
task_delays_[delay.id()] += std::chrono::microseconds(delay.delay_us());
}
}
@ -56,7 +58,7 @@ void FuzzingEventEngine::Tick() {
}
}
absl::Time FuzzingEventEngine::Now() {
FuzzingEventEngine::Time FuzzingEventEngine::Now() {
grpc_core::MutexLock lock(&mu_);
return now_;
}
@ -71,7 +73,7 @@ FuzzingEventEngine::CreateListener(Listener::AcceptCallback,
EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
OnConnectCallback, const ResolvedAddress&, const EndpointConfig&,
MemoryAllocator, absl::Time) {
MemoryAllocator, Duration) {
abort();
}
@ -84,19 +86,21 @@ std::unique_ptr<EventEngine::DNSResolver> FuzzingEventEngine::GetDNSResolver(
abort();
}
void FuzzingEventEngine::Run(Closure* closure) { RunAt(Now(), closure); }
void FuzzingEventEngine::Run(Closure* closure) {
RunAfter(Duration::zero(), closure);
}
void FuzzingEventEngine::Run(std::function<void()> closure) {
RunAt(Now(), closure);
RunAfter(Duration::zero(), closure);
}
EventEngine::TaskHandle FuzzingEventEngine::RunAt(absl::Time when,
Closure* closure) {
return RunAt(when, [closure]() { closure->Run(); });
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
Closure* closure) {
return RunAfter(when, [closure]() { closure->Run(); });
}
EventEngine::TaskHandle FuzzingEventEngine::RunAt(
absl::Time when, std::function<void()> closure) {
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
Duration when, std::function<void()> closure) {
grpc_core::MutexLock lock(&mu_);
const intptr_t id = next_task_id_;
++next_task_id_;
@ -108,7 +112,7 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAt(
}
auto task = std::make_shared<Task>(id, std::move(closure));
tasks_by_id_.emplace(id, task);
tasks_by_time_.emplace(when, std::move(task));
tasks_by_time_.emplace(now_ + when, std::move(task));
return TaskHandle{id, kTaskHandleSalt};
}

@ -15,6 +15,7 @@
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
#define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
#include <chrono>
#include <cstdint>
#include <map>
@ -32,7 +33,7 @@ class FuzzingEventEngine : public EventEngine {
struct Options {
// After all scheduled tick lengths are completed, this is the amount of
// time Now() will be incremented each tick.
absl::Duration final_tick_length = absl::Seconds(1);
Duration final_tick_length = std::chrono::seconds(1);
fuzzing_event_engine::Actions actions;
};
explicit FuzzingEventEngine(Options options);
@ -49,7 +50,7 @@ class FuzzingEventEngine : public EventEngine {
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
absl::Time deadline) override;
Duration timeout) override;
bool CancelConnect(ConnectionHandle handle) override;
@ -60,11 +61,13 @@ class FuzzingEventEngine : public EventEngine {
void Run(Closure* closure) override;
void Run(std::function<void()> closure) override;
TaskHandle RunAt(absl::Time when, Closure* closure) override;
TaskHandle RunAt(absl::Time when, std::function<void()> closure) override;
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when, std::function<void()> closure) override;
bool Cancel(TaskHandle handle) override;
absl::Time Now() ABSL_LOCKS_EXCLUDED(mu_);
using Time = std::chrono::time_point<FuzzingEventEngine, Duration>;
Time Now() ABSL_LOCKS_EXCLUDED(mu_);
private:
struct Task {
@ -74,17 +77,17 @@ class FuzzingEventEngine : public EventEngine {
std::function<void()> closure;
};
const absl::Duration final_tick_length_;
const Duration final_tick_length_;
grpc_core::Mutex mu_;
intptr_t next_task_id_ ABSL_GUARDED_BY(mu_) = 1;
intptr_t current_tick_ ABSL_GUARDED_BY(mu_) = 0;
absl::Time now_ ABSL_GUARDED_BY(mu_) = absl::Now();
std::map<intptr_t, absl::Duration> tick_increments_ ABSL_GUARDED_BY(mu_);
std::map<intptr_t, absl::Duration> task_delays_ ABSL_GUARDED_BY(mu_);
Time now_ ABSL_GUARDED_BY(mu_) = Time::min();
std::map<intptr_t, Duration> tick_increments_ ABSL_GUARDED_BY(mu_);
std::map<intptr_t, Duration> task_delays_ ABSL_GUARDED_BY(mu_);
std::map<intptr_t, std::shared_ptr<Task>> tasks_by_id_ ABSL_GUARDED_BY(mu_);
std::multimap<absl::Time, std::shared_ptr<Task>> tasks_by_time_
std::multimap<Time, std::shared_ptr<Task>> tasks_by_time_
ABSL_GUARDED_BY(mu_);
};

@ -14,8 +14,11 @@
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include <chrono>
#include <thread>
#include "absl/time/clock.h"
#include <grpc/grpc.h>
#include "test/core/event_engine/test_suite/event_engine_test.h"
@ -29,12 +32,15 @@ class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
ThreadedFuzzingEventEngine()
: FuzzingEventEngine([]() {
Options options;
options.final_tick_length = absl::Milliseconds(10);
options.final_tick_length = std::chrono::milliseconds(10);
return options;
}()),
main_([this]() {
while (!done_.load()) {
absl::SleepFor(absl::Milliseconds(10));
auto tick_start = absl::Now();
while (absl::Now() - tick_start < absl::Milliseconds(10)) {
absl::SleepFor(absl::Milliseconds(1));
}
Tick();
}
}) {}

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono>
#include <random>
#include <thread>
@ -29,6 +30,7 @@
#include "test/core/event_engine/test_suite/event_engine_test.h"
using ::testing::ElementsAre;
using namespace std::chrono_literals;
class EventEngineTimerTest : public EventEngineTest {
public:
@ -45,7 +47,7 @@ TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
grpc_core::MutexLock lock(&mu_);
engine->RunAt(absl::Now(), [this]() {
engine->RunAfter(0ms, [this]() {
grpc_core::MutexLock lock(&mu_);
signaled_ = true;
cv_.Signal();
@ -57,7 +59,7 @@ TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
TEST_F(EventEngineTimerTest, SupportsCancellation) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
auto handle = engine->RunAt(absl::InfiniteFuture(), []() {});
auto handle = engine->RunAfter(24h, []() {});
ASSERT_TRUE(engine->Cancel(handle));
}
@ -65,7 +67,7 @@ TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
grpc_core::ExecCtx exec_ctx;
{
auto engine = this->NewEventEngine();
auto handle = engine->RunAt(absl::InfiniteFuture(), [this]() {
auto handle = engine->RunAfter(24h, [this]() {
grpc_core::MutexLock lock(&mu_);
signaled_ = true;
});
@ -78,20 +80,20 @@ TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
grpc_core::ExecCtx exec_ctx;
// Note: this is a brittle test if the first call to `RunAt` takes longer than
// the second callback's wait time.
// Note: this is a brittle test if the first call to `RunAfter` takes longer
// than the second callback's wait time.
std::vector<uint8_t> ordered;
uint8_t count = 0;
grpc_core::MutexLock lock(&mu_);
{
auto engine = this->NewEventEngine();
engine->RunAt(absl::Now() + absl::Milliseconds(100), [&]() {
engine->RunAfter(100ms, [&]() {
grpc_core::MutexLock lock(&mu_);
ordered.push_back(2);
++count;
cv_.Signal();
});
engine->RunAt(absl::Now(), [&]() {
engine->RunAfter(0ms, [&]() {
grpc_core::MutexLock lock(&mu_);
ordered.push_back(1);
++count;
@ -110,7 +112,7 @@ TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
grpc_core::MutexLock lock(&mu_);
auto handle = engine->RunAt(absl::Now(), [this]() {
auto handle = engine->RunAfter(0ms, [this]() {
grpc_core::MutexLock lock(&mu_);
signaled_ = true;
cv_.Signal();
@ -130,10 +132,9 @@ void EventEngineTimerTest::ScheduleCheckCB(absl::Time when,
// to the lowest common denominator until EventEngines can compare relative
// times with supported resolution.
grpc_core::ExecCtx exec_ctx;
int64_t now_millis = absl::ToUnixMillis(absl::Now());
int64_t when_millis = absl::ToUnixMillis(when);
EXPECT_LE(when_millis, now_millis);
if (when_millis > now_millis) ++(*fail_count);
auto now = absl::Now();
EXPECT_LE(when, now);
if (when > now) ++(*fail_count);
if (++(*call_count) == total_expected) {
grpc_core::MutexLock lock(&mu_);
signaled_ = true;
@ -160,11 +161,13 @@ TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
std::uniform_real_distribution<> dis(timeout_min_seconds,
timeout_max_seconds);
for (int call_n = 0; call_n < call_count_per_thread; ++call_n) {
absl::Time when = absl::Now() + absl::Seconds(dis(gen));
engine->RunAt(
when, absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this,
when, &call_count, &failed_call_count,
thread_count * call_count_per_thread));
const auto dur = static_cast<int64_t>(1e9 * dis(gen));
auto deadline = absl::Now() + absl::Nanoseconds(dur);
engine->RunAfter(
std::chrono::nanoseconds(dur),
absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this,
deadline, &call_count, &failed_call_count,
thread_count * call_count_per_thread));
}
});
}

Loading…
Cancel
Save