From d9f64437b09d97bd9aebc6bfa46d5a61e7477c41 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Jun 2022 09:39:31 -0700 Subject: [PATCH] [event_engine] Use durations for scheduling things (#30023) * [event_engine] Use durations for scheduling things * fix * Automated change: Fix sanity tests * run-after * fix * Automated change: Fix sanity tests * rename Co-authored-by: ctiller --- BUILD | 2 +- CMakeLists.txt | 2 + build_autogenerated.yaml | 2 + include/grpc/event_engine/event_engine.h | 24 ++++++---- .../client_channel/lb_policy/grpclb/grpclb.cc | 7 +-- .../lb_policy/xds/xds_cluster_resolver.cc | 1 + .../ext/filters/client_channel/subchannel.cc | 9 ++-- .../channel/channel_args_preconditioning.cc | 1 + src/core/lib/event_engine/iomgr_engine.cc | 47 +++++++------------ src/core/lib/event_engine/iomgr_engine.h | 17 ++++--- src/core/lib/gprpp/time.cc | 8 ++++ src/core/lib/gprpp/time.h | 4 ++ src/core/lib/promise/sleep.cc | 17 +------ src/cpp/server/orca/orca_service.cc | 5 +- .../fuzzing_event_engine.cc | 28 ++++++----- .../fuzzing_event_engine.h | 23 +++++---- .../test_suite/fuzzing_event_engine_test.cc | 10 +++- .../event_engine/test_suite/timer_test.cc | 37 ++++++++------- 18 files changed, 125 insertions(+), 119 deletions(-) diff --git a/BUILD b/BUILD index d0a6c546f2b..d07df3ee332 100644 --- a/BUILD +++ b/BUILD @@ -1224,7 +1224,6 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/status", - "absl/time", ], tags = ["grpc-autodeps"], deps = [ @@ -2084,6 +2083,7 @@ grpc_cc_library( external_deps = ["absl/strings:str_format"], tags = ["grpc-autodeps"], deps = [ + "event_engine_base_hdrs", "gpr_base", "gpr_codegen", "gpr_platform", diff --git a/CMakeLists.txt b/CMakeLists.txt index e1f610546bf..dedbe0981e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13814,6 +13814,7 @@ target_include_directories(periodic_update_test target_link_libraries(periodic_update_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::statusor gpr upb ) @@ -16383,6 +16384,7 @@ target_link_libraries(test_core_gprpp_time_test absl::memory absl::random_random absl::status + absl::statusor absl::cord absl::str_format absl::strings diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 191f69a7802..1c6fb7a829c 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7165,6 +7165,7 @@ targets: - src/core/lib/slice/slice_string_helpers.cc - test/core/resource_quota/periodic_update_test.cc deps: + - absl/status:statusor - gpr - upb uses_polling: false @@ -8252,6 +8253,7 @@ targets: - absl/memory:memory - absl/random:random - absl/status:status + - absl/status:statusor - absl/strings:cord - absl/strings:str_format - absl/strings:strings diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index dfd6e7f5c20..a34ce7df771 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -21,7 +21,6 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/time/time.h" #include #include @@ -74,6 +73,11 @@ namespace experimental { //////////////////////////////////////////////////////////////////////////////// class EventEngine { public: + /// A duration between two events. + /// + /// Throughout the EventEngine API durations are used to express how long + /// until an action should be performed. + using Duration = std::chrono::duration; /// A custom closure type for EventEngine task execution. /// /// Throughout the EventEngine API, \a Closure ownership is retained by the @@ -270,7 +274,7 @@ class EventEngine { const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, - absl::Time deadline) = 0; + Duration timeout) = 0; /// Request cancellation of a connection attempt. /// @@ -328,21 +332,21 @@ class EventEngine { virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, absl::string_view name, absl::string_view default_port, - absl::Time deadline) = 0; + Duration timeout) = 0; /// Asynchronously perform an SRV record lookup. /// /// \a on_resolve has the same meaning and expectations as \a /// LookupHostname's \a on_resolve callback. virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, absl::string_view name, - absl::Time deadline) = 0; + Duration timeout) = 0; /// Asynchronously perform a TXT record lookup. /// /// \a on_resolve has the same meaning and expectations as \a /// LookupHostname's \a on_resolve callback. virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, absl::string_view name, - absl::Time deadline) = 0; + Duration timeout) = 0; /// Cancel an asynchronous lookup operation. /// /// This shares the same semantics with \a EventEngine::Cancel: successfully @@ -384,13 +388,13 @@ class EventEngine { /// in some scenarios. This overload is useful in situations where performance /// is not a critical concern. virtual void Run(std::function closure) = 0; - /// Synonymous with scheduling an alarm to run at time \a when. + /// Synonymous with scheduling an alarm to run after duration \a when. /// /// The \a closure will execute when time \a when arrives unless it has been /// cancelled via the \a Cancel method. If cancelled, the closure will not be /// run, nor will it be deleted. Ownership remains with the caller. - virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0; - /// Synonymous with scheduling an alarm to run at time \a when. + virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0; + /// Synonymous with scheduling an alarm to run after duration \a when. /// /// The \a closure will execute when time \a when arrives unless it has been /// cancelled via the \a Cancel method. If cancelled, the closure will not be @@ -398,10 +402,10 @@ class EventEngine { /// version's \a closure will be deleted by the EventEngine after the closure /// has been run, or upon cancellation. /// - /// This version of \a RunAt may be less performant than the \a Closure + /// This version of \a RunAfter may be less performant than the \a Closure /// version in some scenarios. This overload is useful in situations where /// performance is not a critical concern. - virtual TaskHandle RunAt(absl::Time when, std::function closure) = 0; + virtual TaskHandle RunAfter(Duration when, std::function closure) = 0; /// Request cancellation of a task. /// /// If the associated closure has already been scheduled to run, it will not diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 883b81a3be3..a37e01a1537 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -78,8 +78,6 @@ #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" -#include "absl/time/clock.h" -#include "absl/time/time.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include "upb/upb.hpp" @@ -996,9 +994,8 @@ void GrpcLb::BalancerCallState::StartQuery() { } void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { - client_load_report_handle_ = GetDefaultEventEngine()->RunAt( - absl::Now() + absl::Milliseconds(client_stats_report_interval_.millis()), - [this] { + client_load_report_handle_ = + GetDefaultEventEngine()->RunAfter(client_stats_report_interval_, [this] { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; MaybeSendClientLoadReport(); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 07edc3ea2b3..31df610de1b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 1c75d6949a1..f357f9f121d 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -28,8 +28,6 @@ #include #include "absl/status/statusor.h" -#include "absl/time/clock.h" -#include "absl/time/time.h" #include #include @@ -943,8 +941,6 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { const Duration time_until_next_attempt = next_attempt_time_ - ExecCtx::Get()->Now(); - auto ee_deadline = - absl::Now() + absl::Milliseconds(time_until_next_attempt.millis()); gpr_log(GPR_INFO, "subchannel %p %s: connect failed (%s), backing off for %" PRId64 " ms", @@ -952,8 +948,9 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - retry_timer_handle_ = GetDefaultEventEngine()->RunAt( - ee_deadline, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { + retry_timer_handle_ = GetDefaultEventEngine()->RunAfter( + time_until_next_attempt, + [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; diff --git a/src/core/lib/channel/channel_args_preconditioning.cc b/src/core/lib/channel/channel_args_preconditioning.cc index 1fe51ad6879..a251be38a10 100644 --- a/src/core/lib/channel/channel_args_preconditioning.cc +++ b/src/core/lib/channel/channel_args_preconditioning.cc @@ -16,6 +16,7 @@ #include "src/core/lib/channel/channel_args_preconditioning.h" +#include #include namespace grpc_core { diff --git a/src/core/lib/event_engine/iomgr_engine.cc b/src/core/lib/event_engine/iomgr_engine.cc index 9f56955e8a4..20f93f141bd 100644 --- a/src/core/lib/event_engine/iomgr_engine.cc +++ b/src/core/lib/event_engine/iomgr_engine.cc @@ -15,6 +15,7 @@ #include "src/core/lib/event_engine/iomgr_engine.h" +#include #include #include #include @@ -22,7 +23,6 @@ #include "absl/cleanup/cleanup.h" #include "absl/container/flat_hash_set.h" #include "absl/strings/str_cat.h" -#include "absl/time/clock.h" #include #include @@ -50,15 +50,12 @@ struct ClosureData { EventEngine::TaskHandle handle; }; -// Timer limits due to quirks in the iomgr implementation. -// If deadline <= Now, the callback will be run inline, which can result in lock -// issues. And absl::InfiniteFuture yields UB. -absl::Time Clamp(absl::Time when) { - absl::Time max = absl::Now() + absl::Hours(8766); - absl::Time min = absl::Now() + absl::Milliseconds(2); - if (when > max) return max; - if (when < min) return min; - return when; +grpc_core::Timestamp ToTimestamp(EventEngine::Duration when) { + grpc_core::ExecCtx::Get()->InvalidateNow(); + return grpc_core::ExecCtx::Get()->Now() + + std::max(grpc_core::Duration::Milliseconds(1), + grpc_core::Duration::NanosecondsRoundUp(when.count())) + + grpc_core::Duration::Milliseconds(1); } std::string HandleToString(EventEngine::TaskHandle handle) { @@ -92,14 +89,14 @@ bool IomgrEventEngine::Cancel(EventEngine::TaskHandle handle) { return true; } -EventEngine::TaskHandle IomgrEventEngine::RunAt(absl::Time when, - std::function closure) { - return RunAtInternal(when, std::move(closure)); +EventEngine::TaskHandle IomgrEventEngine::RunAfter( + Duration when, std::function closure) { + return RunAfterInternal(when, std::move(closure)); } -EventEngine::TaskHandle IomgrEventEngine::RunAt(absl::Time when, - EventEngine::Closure* closure) { - return RunAtInternal(when, closure); +EventEngine::TaskHandle IomgrEventEngine::RunAfter( + Duration when, EventEngine::Closure* closure) { + return RunAfterInternal(when, closure); } void IomgrEventEngine::Run(std::function closure) { @@ -110,10 +107,10 @@ void IomgrEventEngine::Run(EventEngine::Closure* closure) { RunInternal(closure); } -EventEngine::TaskHandle IomgrEventEngine::RunAtInternal( - absl::Time when, +EventEngine::TaskHandle IomgrEventEngine::RunAfterInternal( + Duration when, absl::variant, EventEngine::Closure*> cb) { - when = Clamp(when); + auto when_ts = ToTimestamp(when); auto* cd = new ClosureData; cd->cb = std::move(cb); cd->engine = this; @@ -134,14 +131,6 @@ EventEngine::TaskHandle IomgrEventEngine::RunAtInternal( [](std::function fn) { fn(); }); }, cd, nullptr); - // kludge to deal with realtime/monotonic clock conversion - absl::Time absl_now = absl::Now(); - grpc_core::Duration duration = grpc_core::Duration::Milliseconds( - absl::ToInt64Milliseconds(when - absl_now) + 1); - grpc_core::ExecCtx::Get()->InvalidateNow(); - grpc_core::Timestamp when_internal = grpc_core::ExecCtx::Get()->Now() + - duration + - grpc_core::Duration::Milliseconds(1); EventEngine::TaskHandle handle{reinterpret_cast(cd), aba_token_.fetch_add(1)}; grpc_core::MutexLock lock(&mu_); @@ -149,7 +138,7 @@ EventEngine::TaskHandle IomgrEventEngine::RunAtInternal( cd->handle = handle; GRPC_EVENT_ENGINE_TRACE("IomgrEventEngine:%p scheduling callback:%s", this, HandleToString(handle).c_str()); - grpc_timer_init(&cd->timer, when_internal, &cd->closure); + grpc_timer_init(&cd->timer, when_ts, &cd->closure); return handle; } @@ -188,7 +177,7 @@ bool IomgrEventEngine::CancelConnect(EventEngine::ConnectionHandle /*handle*/) { EventEngine::ConnectionHandle IomgrEventEngine::Connect( OnConnectCallback /*on_connect*/, const ResolvedAddress& /*addr*/, const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/, - absl::Time /*deadline*/) { + Duration /*deadline*/) { GPR_ASSERT(false && "unimplemented"); } diff --git a/src/core/lib/event_engine/iomgr_engine.h b/src/core/lib/event_engine/iomgr_engine.h index e66e9b91c89..ebc239eb638 100644 --- a/src/core/lib/event_engine/iomgr_engine.h +++ b/src/core/lib/event_engine/iomgr_engine.h @@ -25,7 +25,6 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" -#include "absl/time/time.h" #include "absl/types/variant.h" #include @@ -65,13 +64,13 @@ class IomgrEventEngine final : public EventEngine { LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, absl::string_view name, absl::string_view default_port, - absl::Time deadline) override; + Duration timeout) override; LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, absl::string_view name, - absl::Time deadline) override; + Duration timeout) override; LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, absl::string_view name, - absl::Time deadline) override; + Duration timeout) override; bool CancelLookup(LookupTaskHandle handle) override; }; @@ -89,7 +88,7 @@ class IomgrEventEngine final : public EventEngine { const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, - absl::Time deadline) override; + Duration timeout) override; bool CancelConnect(ConnectionHandle handle) override; bool IsWorkerThread() override; @@ -97,13 +96,13 @@ class IomgrEventEngine final : public EventEngine { const DNSResolver::ResolverOptions& options) override; void Run(Closure* closure) override; void Run(std::function closure) override; - TaskHandle RunAt(absl::Time when, Closure* closure) override; - TaskHandle RunAt(absl::Time when, std::function closure) override; + TaskHandle RunAfter(Duration when, Closure* closure) override; + TaskHandle RunAfter(Duration when, std::function closure) override; bool Cancel(TaskHandle handle) override; private: - EventEngine::TaskHandle RunAtInternal( - absl::Time when, + EventEngine::TaskHandle RunAfterInternal( + Duration when, absl::variant, EventEngine::Closure*> cb); void RunInternal( diff --git a/src/core/lib/gprpp/time.cc b/src/core/lib/gprpp/time.cc index bab63d79581..cddc4da2563 100644 --- a/src/core/lib/gprpp/time.cc +++ b/src/core/lib/gprpp/time.cc @@ -17,6 +17,7 @@ #include "src/core/lib/gprpp/time.h" #include +#include #include #include #include @@ -190,6 +191,13 @@ std::string Duration::ToJsonString() const { return absl::StrFormat("%d.%09ds", ts.tv_sec, ts.tv_nsec); } +Duration::operator grpc_event_engine::experimental::EventEngine::Duration() + const { + return std::chrono::milliseconds( + Clamp(millis_, std::numeric_limits::min() / GPR_NS_PER_MS, + std::numeric_limits::max() / GPR_NS_PER_MS)); +} + void TestOnlySetProcessEpoch(gpr_timespec epoch) { g_process_epoch_seconds.store( gpr_convert_clock_type(epoch, GPR_CLOCK_MONOTONIC).tv_sec); diff --git a/src/core/lib/gprpp/time.h b/src/core/lib/gprpp/time.h index 716798048eb..2c5cb42060d 100644 --- a/src/core/lib/gprpp/time.h +++ b/src/core/lib/gprpp/time.h @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -207,6 +208,9 @@ class Duration { constexpr int64_t millis() const { return millis_; } double seconds() const { return static_cast(millis_) / 1000.0; } + // NOLINTNEXTLINE: google-explicit-constructor + operator grpc_event_engine::experimental::EventEngine::Duration() const; + gpr_timespec as_timespec() const; std::string ToString() const; diff --git a/src/core/lib/promise/sleep.cc b/src/core/lib/promise/sleep.cc index 08482f4d9e8..0c10c406f66 100644 --- a/src/core/lib/promise/sleep.cc +++ b/src/core/lib/promise/sleep.cc @@ -16,9 +16,6 @@ #include "src/core/lib/promise/sleep.h" -#include "absl/time/clock.h" -#include "absl/time/time.h" - #include #include "src/core/lib/event_engine/event_engine_factory.h" @@ -57,16 +54,6 @@ void Sleep::OnTimer() { tmp_waker.Wakeup(); } -// TODO(hork): refactor gpr_base to allow a separate time_util target. -namespace { -absl::Time ToAbslTime(Timestamp timestamp) { - if (timestamp == Timestamp::InfFuture()) return absl::InfiniteFuture(); - if (timestamp == Timestamp::InfPast()) return absl::InfinitePast(); - return absl::Now() + - absl::Milliseconds((timestamp - ExecCtx::Get()->Now()).millis()); -} -} // namespace - Poll Sleep::operator()() { MutexLock lock(&mu_); switch (stage_) { @@ -75,8 +62,8 @@ Poll Sleep::operator()() { return absl::OkStatus(); } stage_ = Stage::kStarted; - timer_handle_ = - GetDefaultEventEngine()->RunAt(ToAbslTime(deadline_), [this] { + timer_handle_ = GetDefaultEventEngine()->RunAfter( + deadline_ - ExecCtx::Get()->Now(), [this] { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; OnTimer(); diff --git a/src/cpp/server/orca/orca_service.cc b/src/cpp/server/orca/orca_service.cc index 22c402d1123..d25990fa21c 100644 --- a/src/cpp/server/orca/orca_service.cc +++ b/src/cpp/server/orca/orca_service.cc @@ -21,7 +21,6 @@ #include #include "absl/base/thread_annotations.h" -#include "absl/time/clock.h" #include "absl/time/time.h" #include "absl/types/optional.h" #include "google/protobuf/duration.upb.h" @@ -124,8 +123,8 @@ class OrcaService::Reactor : public ServerWriteReactor, grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc::internal::MutexLock lock(&timer_mu_); - timer_handle_ = GetDefaultEventEngine()->RunAt( - absl::Now() + absl::Milliseconds(report_interval_.millis()), + timer_handle_ = GetDefaultEventEngine()->RunAfter( + report_interval_, [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); }); } diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index f4b2e45e505..1449872124a 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -14,6 +14,8 @@ #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include + namespace grpc_event_engine { namespace experimental { @@ -24,10 +26,10 @@ const intptr_t kTaskHandleSalt = 12345; FuzzingEventEngine::FuzzingEventEngine(Options options) : final_tick_length_(options.final_tick_length) { for (const auto& delay : options.actions.tick_lengths()) { - tick_increments_[delay.id()] += absl::Microseconds(delay.delay_us()); + tick_increments_[delay.id()] += std::chrono::microseconds(delay.delay_us()); } for (const auto& delay : options.actions.run_delay()) { - task_delays_[delay.id()] += absl::Microseconds(delay.delay_us()); + task_delays_[delay.id()] += std::chrono::microseconds(delay.delay_us()); } } @@ -56,7 +58,7 @@ void FuzzingEventEngine::Tick() { } } -absl::Time FuzzingEventEngine::Now() { +FuzzingEventEngine::Time FuzzingEventEngine::Now() { grpc_core::MutexLock lock(&mu_); return now_; } @@ -71,7 +73,7 @@ FuzzingEventEngine::CreateListener(Listener::AcceptCallback, EventEngine::ConnectionHandle FuzzingEventEngine::Connect( OnConnectCallback, const ResolvedAddress&, const EndpointConfig&, - MemoryAllocator, absl::Time) { + MemoryAllocator, Duration) { abort(); } @@ -84,19 +86,21 @@ std::unique_ptr FuzzingEventEngine::GetDNSResolver( abort(); } -void FuzzingEventEngine::Run(Closure* closure) { RunAt(Now(), closure); } +void FuzzingEventEngine::Run(Closure* closure) { + RunAfter(Duration::zero(), closure); +} void FuzzingEventEngine::Run(std::function closure) { - RunAt(Now(), closure); + RunAfter(Duration::zero(), closure); } -EventEngine::TaskHandle FuzzingEventEngine::RunAt(absl::Time when, - Closure* closure) { - return RunAt(when, [closure]() { closure->Run(); }); +EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when, + Closure* closure) { + return RunAfter(when, [closure]() { closure->Run(); }); } -EventEngine::TaskHandle FuzzingEventEngine::RunAt( - absl::Time when, std::function closure) { +EventEngine::TaskHandle FuzzingEventEngine::RunAfter( + Duration when, std::function closure) { grpc_core::MutexLock lock(&mu_); const intptr_t id = next_task_id_; ++next_task_id_; @@ -108,7 +112,7 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAt( } auto task = std::make_shared(id, std::move(closure)); tasks_by_id_.emplace(id, task); - tasks_by_time_.emplace(when, std::move(task)); + tasks_by_time_.emplace(now_ + when, std::move(task)); return TaskHandle{id, kTaskHandleSalt}; } diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index 6dbed40ebf4..38d9efeb8fc 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -15,6 +15,7 @@ #ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H #define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H +#include #include #include @@ -32,7 +33,7 @@ class FuzzingEventEngine : public EventEngine { struct Options { // After all scheduled tick lengths are completed, this is the amount of // time Now() will be incremented each tick. - absl::Duration final_tick_length = absl::Seconds(1); + Duration final_tick_length = std::chrono::seconds(1); fuzzing_event_engine::Actions actions; }; explicit FuzzingEventEngine(Options options); @@ -49,7 +50,7 @@ class FuzzingEventEngine : public EventEngine { const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, - absl::Time deadline) override; + Duration timeout) override; bool CancelConnect(ConnectionHandle handle) override; @@ -60,11 +61,13 @@ class FuzzingEventEngine : public EventEngine { void Run(Closure* closure) override; void Run(std::function closure) override; - TaskHandle RunAt(absl::Time when, Closure* closure) override; - TaskHandle RunAt(absl::Time when, std::function closure) override; + TaskHandle RunAfter(Duration when, Closure* closure) override; + TaskHandle RunAfter(Duration when, std::function closure) override; bool Cancel(TaskHandle handle) override; - absl::Time Now() ABSL_LOCKS_EXCLUDED(mu_); + using Time = std::chrono::time_point; + + Time Now() ABSL_LOCKS_EXCLUDED(mu_); private: struct Task { @@ -74,17 +77,17 @@ class FuzzingEventEngine : public EventEngine { std::function closure; }; - const absl::Duration final_tick_length_; + const Duration final_tick_length_; grpc_core::Mutex mu_; intptr_t next_task_id_ ABSL_GUARDED_BY(mu_) = 1; intptr_t current_tick_ ABSL_GUARDED_BY(mu_) = 0; - absl::Time now_ ABSL_GUARDED_BY(mu_) = absl::Now(); - std::map tick_increments_ ABSL_GUARDED_BY(mu_); - std::map task_delays_ ABSL_GUARDED_BY(mu_); + Time now_ ABSL_GUARDED_BY(mu_) = Time::min(); + std::map tick_increments_ ABSL_GUARDED_BY(mu_); + std::map task_delays_ ABSL_GUARDED_BY(mu_); std::map> tasks_by_id_ ABSL_GUARDED_BY(mu_); - std::multimap> tasks_by_time_ + std::multimap> tasks_by_time_ ABSL_GUARDED_BY(mu_); }; diff --git a/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc b/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc index 1d392841815..7f42579aaec 100644 --- a/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc +++ b/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc @@ -14,8 +14,11 @@ #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include #include +#include "absl/time/clock.h" + #include #include "test/core/event_engine/test_suite/event_engine_test.h" @@ -29,12 +32,15 @@ class ThreadedFuzzingEventEngine : public FuzzingEventEngine { ThreadedFuzzingEventEngine() : FuzzingEventEngine([]() { Options options; - options.final_tick_length = absl::Milliseconds(10); + options.final_tick_length = std::chrono::milliseconds(10); return options; }()), main_([this]() { while (!done_.load()) { - absl::SleepFor(absl::Milliseconds(10)); + auto tick_start = absl::Now(); + while (absl::Now() - tick_start < absl::Milliseconds(10)) { + absl::SleepFor(absl::Milliseconds(1)); + } Tick(); } }) {} diff --git a/test/core/event_engine/test_suite/timer_test.cc b/test/core/event_engine/test_suite/timer_test.cc index fcc3895dcdc..35a977c8a5b 100644 --- a/test/core/event_engine/test_suite/timer_test.cc +++ b/test/core/event_engine/test_suite/timer_test.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -29,6 +30,7 @@ #include "test/core/event_engine/test_suite/event_engine_test.h" using ::testing::ElementsAre; +using namespace std::chrono_literals; class EventEngineTimerTest : public EventEngineTest { public: @@ -45,7 +47,7 @@ TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) { grpc_core::ExecCtx exec_ctx; auto engine = this->NewEventEngine(); grpc_core::MutexLock lock(&mu_); - engine->RunAt(absl::Now(), [this]() { + engine->RunAfter(0ms, [this]() { grpc_core::MutexLock lock(&mu_); signaled_ = true; cv_.Signal(); @@ -57,7 +59,7 @@ TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) { TEST_F(EventEngineTimerTest, SupportsCancellation) { grpc_core::ExecCtx exec_ctx; auto engine = this->NewEventEngine(); - auto handle = engine->RunAt(absl::InfiniteFuture(), []() {}); + auto handle = engine->RunAfter(24h, []() {}); ASSERT_TRUE(engine->Cancel(handle)); } @@ -65,7 +67,7 @@ TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) { grpc_core::ExecCtx exec_ctx; { auto engine = this->NewEventEngine(); - auto handle = engine->RunAt(absl::InfiniteFuture(), [this]() { + auto handle = engine->RunAfter(24h, [this]() { grpc_core::MutexLock lock(&mu_); signaled_ = true; }); @@ -78,20 +80,20 @@ TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) { TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) { grpc_core::ExecCtx exec_ctx; - // Note: this is a brittle test if the first call to `RunAt` takes longer than - // the second callback's wait time. + // Note: this is a brittle test if the first call to `RunAfter` takes longer + // than the second callback's wait time. std::vector ordered; uint8_t count = 0; grpc_core::MutexLock lock(&mu_); { auto engine = this->NewEventEngine(); - engine->RunAt(absl::Now() + absl::Milliseconds(100), [&]() { + engine->RunAfter(100ms, [&]() { grpc_core::MutexLock lock(&mu_); ordered.push_back(2); ++count; cv_.Signal(); }); - engine->RunAt(absl::Now(), [&]() { + engine->RunAfter(0ms, [&]() { grpc_core::MutexLock lock(&mu_); ordered.push_back(1); ++count; @@ -110,7 +112,7 @@ TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) { grpc_core::ExecCtx exec_ctx; auto engine = this->NewEventEngine(); grpc_core::MutexLock lock(&mu_); - auto handle = engine->RunAt(absl::Now(), [this]() { + auto handle = engine->RunAfter(0ms, [this]() { grpc_core::MutexLock lock(&mu_); signaled_ = true; cv_.Signal(); @@ -130,10 +132,9 @@ void EventEngineTimerTest::ScheduleCheckCB(absl::Time when, // to the lowest common denominator until EventEngines can compare relative // times with supported resolution. grpc_core::ExecCtx exec_ctx; - int64_t now_millis = absl::ToUnixMillis(absl::Now()); - int64_t when_millis = absl::ToUnixMillis(when); - EXPECT_LE(when_millis, now_millis); - if (when_millis > now_millis) ++(*fail_count); + auto now = absl::Now(); + EXPECT_LE(when, now); + if (when > now) ++(*fail_count); if (++(*call_count) == total_expected) { grpc_core::MutexLock lock(&mu_); signaled_ = true; @@ -160,11 +161,13 @@ TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) { std::uniform_real_distribution<> dis(timeout_min_seconds, timeout_max_seconds); for (int call_n = 0; call_n < call_count_per_thread; ++call_n) { - absl::Time when = absl::Now() + absl::Seconds(dis(gen)); - engine->RunAt( - when, absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this, - when, &call_count, &failed_call_count, - thread_count * call_count_per_thread)); + const auto dur = static_cast(1e9 * dis(gen)); + auto deadline = absl::Now() + absl::Nanoseconds(dur); + engine->RunAfter( + std::chrono::nanoseconds(dur), + absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this, + deadline, &call_count, &failed_call_count, + thread_count * call_count_per_thread)); } }); }