From 96f5cddb5f878ae33584868dfaa1652e69180c65 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 15 Sep 2022 21:41:04 -0700 Subject: [PATCH] [time] Introduce time sources (#30815) * [time] Introduce time sources * make import trivial * Automated change: Fix sanity tests * fix * Automated change: Fix sanity tests * fix * Automated change: Fix sanity tests * Automated change: Fix sanity tests * review feedback * fix * Automated change: Fix sanity tests * ios fix * fix Co-authored-by: ctiller --- BUILD | 14 +++-- .../channel_idle/channel_idle_filter.cc | 6 +-- .../filters/client_channel/backup_poller.cc | 7 ++- .../client_channel/lb_policy/grpclb/grpclb.cc | 8 +-- .../outlier_detection/outlier_detection.cc | 10 ++-- .../lb_policy/priority/priority.cc | 5 +- .../client_channel/lb_policy/rls/rls.cc | 22 ++++---- .../lb_policy/xds/xds_cluster_manager.cc | 3 +- .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 4 +- .../google_c2p/google_c2p_resolver.cc | 13 +++-- .../resolver/polling_resolver.cc | 10 ++-- .../filters/client_channel/retry_filter.cc | 7 ++- .../ext/filters/client_channel/subchannel.cc | 6 +-- .../subchannel_stream_client.cc | 3 +- .../fault_injection/fault_injection_filter.cc | 3 +- .../transport/chttp2/server/chttp2_server.cc | 4 +- .../chttp2/transport/chttp2_transport.cc | 13 +++-- .../chttp2/transport/flow_control.cc | 5 +- .../transport/chttp2/transport/frame_ping.cc | 3 +- .../chttp2/transport/hpack_encoder.cc | 3 +- .../ext/transport/chttp2/transport/writing.cc | 2 +- src/core/ext/xds/xds_client.cc | 7 +-- src/core/ext/xds/xds_client.h | 3 +- src/core/lib/backoff/backoff.cc | 6 +-- src/core/lib/channel/channel_trace.cc | 7 ++- src/core/lib/gpr/time_precise.h | 2 +- src/core/lib/gprpp/time.cc | 21 ++++++++ src/core/lib/gprpp/time.h | 49 +++++++++++++++++ src/core/lib/iomgr/ev_epoll1_linux.cc | 2 +- src/core/lib/iomgr/ev_poll_posix.cc | 4 +- src/core/lib/iomgr/exec_ctx.cc | 8 --- src/core/lib/iomgr/exec_ctx.h | 32 +++-------- src/core/lib/iomgr/iocp_windows.cc | 2 +- src/core/lib/iomgr/tcp_posix.cc | 2 +- src/core/lib/iomgr/timer_generic.cc | 8 +-- src/core/lib/iomgr/timer_manager.cc | 3 +- src/core/lib/promise/sleep.cc | 4 +- .../lib/resource_quota/periodic_update.cc | 5 +- .../google_default_credentials.cc | 2 +- .../security/credentials/jwt/jwt_verifier.cc | 4 +- .../credentials/oauth2/oauth2_credentials.cc | 3 +- src/core/lib/surface/completion_queue.cc | 8 +-- src/core/lib/transport/bdp_estimator.cc | 4 +- src/core/lib/transport/metadata_batch.cc | 5 +- src/core/lib/transport/status_conversion.cc | 4 +- test/core/backoff/backoff_test.cc | 10 ++-- .../resolvers/dns_resolver_cooldown_test.cc | 10 ++-- .../end2end/fixtures/http_proxy_fixture.cc | 4 +- test/core/end2end/fuzzers/api_fuzzer.cc | 8 ++- test/core/end2end/fuzzers/server_fuzzer.cc | 6 +-- .../event_engine/posix/timer_manager_test.cc | 4 +- test/core/iomgr/endpoint_tests.cc | 4 +- test/core/iomgr/resolve_address_posix_test.cc | 3 +- test/core/iomgr/resolve_address_test.cc | 3 +- test/core/iomgr/tcp_server_posix_test.cc | 2 +- test/core/iomgr/timer_list_test.cc | 4 +- test/core/promise/sleep_test.cc | 17 +++--- .../resource_quota/periodic_update_test.cc | 20 +++---- test/core/security/ssl_server_fuzzer.cc | 2 +- .../surface/concurrent_connectivity_test.cc | 4 +- .../binder/end2end/fuzzers/server_fuzzer.cc | 6 +-- .../transport/chttp2/flow_control_fuzzer.cc | 4 +- .../transport/chttp2/settings_timeout_test.cc | 14 ++--- test/core/util/passthru_endpoint.cc | 2 +- test/core/util/port_server_client.cc | 10 ++-- test/cpp/common/time_jump_test.cc | 15 +++--- test/cpp/common/timer_test.cc | 53 +++++++++---------- .../end2end/connection_attempt_injector.cc | 2 +- test/cpp/microbenchmarks/bm_chttp2_hpack.cc | 2 +- test/cpp/naming/cancel_ares_query_test.cc | 4 +- 70 files changed, 286 insertions(+), 273 deletions(-) diff --git a/BUILD b/BUILD index 0482ed710ca..b5c788b70e6 100644 --- a/BUILD +++ b/BUILD @@ -2024,7 +2024,6 @@ grpc_cc_library( ], external_deps = ["absl/functional:function_ref"], deps = [ - "exec_ctx", "gpr_platform", "time", "useful", @@ -2199,10 +2198,15 @@ grpc_cc_library( hdrs = [ "src/core/lib/gprpp/time.h", ], - external_deps = ["absl/strings:str_format"], + external_deps = [ + "absl/strings:str_format", + "absl/types:optional", + ], deps = [ "event_engine_base_hdrs", "gpr", + "gpr_tls", + "no_destruct", "useful", ], ) @@ -3003,7 +3007,6 @@ grpc_cc_library( ], hdrs = ["src/core/lib/transport/bdp_estimator.h"], deps = [ - "exec_ctx", "gpr", "grpc_trace", "time", @@ -3053,7 +3056,6 @@ grpc_cc_library( language = "c++", visibility = ["@grpc:alt_grpc_base_legacy"], deps = [ - "exec_ctx", "gpr_platform", "time", ], @@ -4974,7 +4976,6 @@ grpc_cc_library( "closure", "config", "debug_location", - "exec_ctx", "gpr", "grpc_base", "grpc_client_channel", @@ -5184,7 +5185,6 @@ grpc_cc_library( "config", "debug_location", "env", - "exec_ctx", "gpr", "grpc_base", "grpc_client_channel", @@ -5227,7 +5227,6 @@ grpc_cc_library( "closure", "config", "debug_location", - "exec_ctx", "gpr", "grpc_base", "grpc_client_channel", @@ -6819,7 +6818,6 @@ grpc_cc_library( ], deps = [ "bdp_estimator", - "exec_ctx", "experiments", "gpr", "grpc_trace", diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc index c2218ed5c46..a19ae4f3c05 100644 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/channel_idle_filter.cc @@ -167,7 +167,7 @@ void MaxAgeFilter::PostInit() { max_age_activity_.Set(MakeActivity( TrySeq( // First sleep until the max connection age - Sleep(ExecCtx::Get()->Now() + max_connection_age_), + Sleep(Timestamp::Now() + max_connection_age_), // Then send a goaway. [this] { GRPC_CHANNEL_STACK_REF(this->channel_stack(), @@ -192,7 +192,7 @@ void MaxAgeFilter::PostInit() { }, // Sleep for the grace period [this] { - return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_); + return Sleep(Timestamp::Now() + max_connection_age_grace_); }), ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) { // OnDone -- close the connection if the promise completed @@ -246,7 +246,7 @@ void ChannelIdleFilter::StartIdleTimer() { auto channel_stack = channel_stack_->Ref(); auto timeout = client_idle_timeout_; auto promise = Loop([timeout, idle_filter_state]() { - return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), + return TrySeq(Sleep(Timestamp::Now() + timeout), [idle_filter_state]() -> Poll> { if (idle_filter_state->CheckTimer()) { return Continue{}; diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc index 063706c9d2d..e59fe87c552 100644 --- a/src/core/ext/filters/client_channel/backup_poller.cc +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -33,7 +33,6 @@ #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -132,11 +131,11 @@ static void run_poller(void* arg, grpc_error_handle error) { return; } grpc_error_handle err = - grpc_pollset_work(p->pollset, nullptr, grpc_core::ExecCtx::Get()->Now()); + grpc_pollset_work(p->pollset, nullptr, grpc_core::Timestamp::Now()); gpr_mu_unlock(p->pollset_mu); GRPC_LOG_IF_ERROR("Run client channel backup poller", err); grpc_timer_init(&p->polling_timer, - grpc_core::ExecCtx::Get()->Now() + g_poll_interval, + grpc_core::Timestamp::Now() + g_poll_interval, &p->run_poller_closure); } @@ -153,7 +152,7 @@ static void g_poller_init_locked() { GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller, grpc_schedule_on_exec_ctx); grpc_timer_init(&g_poller->polling_timer, - grpc_core::ExecCtx::Get()->Now() + g_poll_interval, + grpc_core::Timestamp::Now() + g_poll_interval, &g_poller->run_poller_closure); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index c9b8b5dd7a4..6ef41a6304f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -868,7 +868,7 @@ GrpcLb::BalancerCallState::BalancerCallState( const Timestamp deadline = grpclb_policy()->lb_call_timeout_ == Duration::Zero() ? Timestamp::InfFuture() - : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_; + : Timestamp::Now() + grpclb_policy()->lb_call_timeout_; lb_call_ = grpc_channel_create_pollset_set_call( grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, grpclb_policy_->interested_parties(), @@ -1548,7 +1548,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { if (is_initial_update) { fallback_at_startup_checks_pending_ = true; // Start timer. - Timestamp deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_; + Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); // Start watching the channel's connectivity state. If the channel @@ -1643,7 +1643,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() { Timestamp next_try = lb_call_backoff_.NextAttemptTime(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); - Duration timeout = next_try - ExecCtx::Get()->Now(); + Duration timeout = next_try - Timestamp::Now(); if (timeout > Duration::Zero()) { gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.", this, timeout.millis()); @@ -1813,7 +1813,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { void GrpcLb::CacheDeletedSubchannelLocked( RefCountedPtr subchannel) { - Timestamp deletion_time = ExecCtx::Get()->Now() + subchannel_cache_interval_; + Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_; cached_subchannels_[deletion_time].push_back(std::move(subchannel)); if (!subchannel_cache_timer_pending_) { Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release(); diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index ebf85fe0bc0..f13f84beda1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -58,7 +58,6 @@ #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/timer.h" @@ -296,7 +295,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy { base_ejection_time_in_millis * multiplier_, std::max(base_ejection_time_in_millis, max_ejection_time_in_millis))); - if (change_time < ExecCtx::Get()->Now()) { + if (change_time < Timestamp::Now()) { Uneject(); return true; } @@ -615,8 +614,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this); } - ejection_timer_ = - MakeOrphanable(Ref(), ExecCtx::Get()->Now()); + ejection_timer_ = MakeOrphanable(Ref(), Timestamp::Now()); for (const auto& p : subchannel_state_map_) { p.second->RotateBucket(); // Reset call counters. } @@ -835,7 +833,7 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { std::map failure_percentage_ejection_candidates; size_t ejected_host_count = 0; double success_rate_sum = 0; - auto time_now = ExecCtx::Get()->Now(); + auto time_now = Timestamp::Now(); auto& config = parent_->config_->outlier_detection_config(); for (auto& state : parent_->subchannel_state_map_) { auto* subchannel_state = state.second.get(); @@ -1006,7 +1004,7 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) { } timer_pending_ = false; parent_->ejection_timer_ = - MakeOrphanable(parent_, ExecCtx::Get()->Now()); + MakeOrphanable(parent_, Timestamp::Now()); } Unref(DEBUG_LOCATION, "Timer"); GRPC_ERROR_UNREF(error); diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index 4172ecda1d0..25294992735 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -51,7 +51,6 @@ #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" @@ -532,7 +531,7 @@ PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer( } GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); Ref(DEBUG_LOCATION, "Timer").release(); - grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval, + grpc_timer_init(&timer_, Timestamp::Now() + kChildRetentionInterval, &on_timer_); } @@ -594,7 +593,7 @@ PriorityLb::ChildPriority::FailoverTimer::FailoverTimer( Ref(DEBUG_LOCATION, "Timer").release(); grpc_timer_init( &timer_, - ExecCtx::Get()->Now() + + Timestamp::Now() + child_priority_->priority_policy_->child_failover_timeout_, &on_timer_); } diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index ceea9a2a874..ed8460213c1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -1014,7 +1014,7 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) { gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s", lb_policy_.get(), this, key.ToString().c_str()); } - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); MutexLock lock(&lb_policy_->mu_); if (lb_policy_->is_shutdown_) { return PickResult::Fail( @@ -1164,7 +1164,7 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr lb_policy, GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr), lb_policy_(std::move(lb_policy)), backoff_state_(MakeCacheEntryBackoff()), - min_expiration_time_(ExecCtx::Get()->Now() + kMinExpirationTime), + min_expiration_time_(Timestamp::Now() + kMinExpirationTime), lru_iterator_(lb_policy_->cache_.lru_list_.insert( lb_policy_->cache_.lru_list_.end(), key)) {} @@ -1242,12 +1242,12 @@ void RlsLb::Cache::Entry::ResetBackoff() { } bool RlsLb::Cache::Entry::ShouldRemove() const { - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); return data_expiration_time_ < now && backoff_expiration_time_ < now; } bool RlsLb::Cache::Entry::CanEvict() const { - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); return min_expiration_time_ < now; } @@ -1273,7 +1273,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked( backoff_state_ = MakeCacheEntryBackoff(); } backoff_time_ = backoff_state_->NextAttemptTime(); - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); backoff_expiration_time_ = now + (backoff_time_ - now) * 2; backoff_timer_ = MakeOrphanable( Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_); @@ -1282,7 +1282,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked( } // Request succeeded, so store the result. header_data_ = std::move(response.header_data); - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); data_expiration_time_ = now + lb_policy_->config_->max_age(); stale_time_ = now + lb_policy_->config_->stale_age(); status_ = absl::OkStatus(); @@ -1348,7 +1348,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked( // RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) { - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); lb_policy_->Ref(DEBUG_LOCATION, "CacheCleanupTimer").release(); GRPC_CLOSURE_INIT(&timer_callback_, OnCleanupTimer, this, nullptr); grpc_timer_init(&cleanup_timer_, now + kCacheCleanupTimerInterval, @@ -1431,7 +1431,7 @@ void RlsLb::Cache::OnCleanupTimer(void* arg, grpc_error_handle error) { ++it; } } - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); lb_policy.release(); grpc_timer_init(&cache->cleanup_timer_, now + kCacheCleanupTimerInterval, @@ -1500,7 +1500,7 @@ void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange( // bool RlsLb::RlsChannel::Throttle::ShouldThrottle() { - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); while (!requests_.empty() && now - requests_.front() > window_size_) { requests_.pop_front(); } @@ -1528,7 +1528,7 @@ bool RlsLb::RlsChannel::Throttle::ShouldThrottle() { } void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) { - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); requests_.push_back(now); if (!success) failures_.push_back(now); } @@ -1708,7 +1708,7 @@ void RlsLb::RlsRequest::StartCallLocked() { MutexLock lock(&lb_policy_->mu_); if (lb_policy_->is_shutdown_) return; } - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); deadline_ = now + lb_policy_->config_->lookup_service_timeout(); grpc_metadata_array_init(&recv_initial_metadata_); grpc_metadata_array_init(&recv_trailing_metadata_); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index bb7f4a768d8..31a89339476 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -50,7 +50,6 @@ #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" @@ -530,7 +529,7 @@ void XdsClusterManagerLb::ClusterChild::DeactivateLocked() { // Start a timer to delete the child. Ref(DEBUG_LOCATION, "ClusterChild+timer").release(); grpc_timer_init(&delayed_removal_timer_, - ExecCtx::Get()->Now() + + Timestamp::Now() + Duration::Milliseconds( GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS), &on_delayed_removal_timer_); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index ade784f6ae2..6ec73812633 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -284,7 +284,7 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm( "request:%p ev_driver=%p. next ares process poll time in " "%" PRId64 " ms", driver->request, driver, until_next_ares_backup_poll_alarm.millis()); - return grpc_core::ExecCtx::Get()->Now() + until_next_ares_backup_poll_alarm; + return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm; } static void on_timeout(void* arg, grpc_error_handle error) { @@ -496,7 +496,7 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver, grpc_schedule_on_exec_ctx); grpc_timer_init(&ev_driver->query_timeout, - grpc_core::ExecCtx::Get()->Now() + timeout, + grpc_core::Timestamp::Now() + timeout, &ev_driver->on_timeout_locked); // Initialize the backup poll alarm grpc_core::Timestamp next_ares_backup_poll_alarm = diff --git a/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc b/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc index b852fd07621..be26b7a8c8e 100644 --- a/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc @@ -52,7 +52,6 @@ #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/json/json.h" #include "src/core/lib/resolver/resolver.h" @@ -167,12 +166,12 @@ GoogleCloud2ProdResolver::MetadataQuery::MetadataQuery( const_cast(GRPC_ARG_RESOURCE_QUOTA), resolver_->resource_quota_.get(), grpc_resource_quota_arg_vtable()); grpc_channel_args args = {1, &resource_quota_arg}; - http_request_ = HttpRequest::Get( - std::move(*uri), &args, pollent, &request, - ExecCtx::Get()->Now() + Duration::Seconds(10), // 10s timeout - &on_done_, &response_, - RefCountedPtr( - grpc_insecure_credentials_create())); + http_request_ = + HttpRequest::Get(std::move(*uri), &args, pollent, &request, + Timestamp::Now() + Duration::Seconds(10), // 10s timeout + &on_done_, &response_, + RefCountedPtr( + grpc_insecure_credentials_create())); http_request_->Start(); } diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc index 4b7dd24baae..fdffb51fcbb 100644 --- a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc @@ -190,7 +190,7 @@ void PollingResolver::GetResultStatus(absl::Status status) { // Also see https://github.com/grpc/grpc/issues/26079. ExecCtx::Get()->InvalidateNow(); Timestamp next_try = backoff_.NextAttemptTime(); - Duration timeout = next_try - ExecCtx::Get()->Now(); + Duration timeout = next_try - Timestamp::Now(); GPR_ASSERT(!have_next_resolution_timer_); have_next_resolution_timer_ = true; if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { @@ -223,11 +223,11 @@ void PollingResolver::MaybeStartResolvingLocked() { const Timestamp earliest_next_resolution = *last_resolution_timestamp_ + min_time_between_resolutions_; const Duration time_until_next_resolution = - earliest_next_resolution - ExecCtx::Get()->Now(); + earliest_next_resolution - Timestamp::Now(); if (time_until_next_resolution > Duration::Zero()) { if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { const Duration last_resolution_ago = - ExecCtx::Get()->Now() - *last_resolution_timestamp_; + Timestamp::Now() - *last_resolution_timestamp_; gpr_log(GPR_INFO, "[polling resolver %p] in cooldown from last resolution " "(from %" PRId64 " ms ago); will resolve again in %" PRId64 @@ -239,7 +239,7 @@ void PollingResolver::MaybeStartResolvingLocked() { Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release(); GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr); grpc_timer_init(&next_resolution_timer_, - ExecCtx::Get()->Now() + time_until_next_resolution, + Timestamp::Now() + time_until_next_resolution, &on_next_resolution_); return; } @@ -249,7 +249,7 @@ void PollingResolver::MaybeStartResolvingLocked() { void PollingResolver::StartResolvingLocked() { request_ = StartRequest(); - last_resolution_timestamp_ = ExecCtx::Get()->Now(); + last_resolution_timestamp_ = Timestamp::Now(); if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { gpr_log(GPR_INFO, "[polling resolver %p] starting resolution, request_=%p", this, request_.get()); diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index abb6f508018..efea61e65e6 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -725,8 +725,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld, if (calld->retry_policy_ != nullptr && calld->retry_policy_->per_attempt_recv_timeout().has_value()) { Timestamp per_attempt_recv_deadline = - ExecCtx::Get()->Now() + - *calld->retry_policy_->per_attempt_recv_timeout(); + Timestamp::Now() + *calld->retry_policy_->per_attempt_recv_timeout(); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64 @@ -2604,7 +2603,7 @@ void RetryFilter::CallData::StartRetryTimer( Timestamp next_attempt_time; if (server_pushback.has_value()) { GPR_ASSERT(*server_pushback >= Duration::Zero()); - next_attempt_time = ExecCtx::Get()->Now() + *server_pushback; + next_attempt_time = Timestamp::Now() + *server_pushback; retry_backoff_.Reset(); } else { next_attempt_time = retry_backoff_.NextAttemptTime(); @@ -2612,7 +2611,7 @@ void RetryFilter::CallData::StartRetryTimer( if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_, - this, (next_attempt_time - ExecCtx::Get()->Now()).millis()); + this, (next_attempt_time - Timestamp::Now()).millis()); } // Schedule retry after computed delay. GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 2b3e35ca010..570cfe51f8a 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -767,7 +767,7 @@ void Subchannel::ResetBackoff() { GetDefaultEventEngine()->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { - next_attempt_time_ = ExecCtx::Get()->Now(); + next_attempt_time_ = Timestamp::Now(); } } @@ -878,7 +878,7 @@ void Subchannel::OnRetryTimerLocked() { void Subchannel::StartConnectingLocked() { // Set next attempt time. - const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now(); + const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now(); next_attempt_time_ = backoff_.NextAttemptTime(); // Report CONNECTING. SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus()); @@ -913,7 +913,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { // transition back to IDLE. if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { const Duration time_until_next_attempt = - next_attempt_time_ - ExecCtx::Get()->Now(); + next_attempt_time_ - Timestamp::Now(); gpr_log(GPR_INFO, "subchannel %p %s: connect failed (%s), backing off for %" PRId64 " ms", diff --git a/src/core/ext/filters/client_channel/subchannel_stream_client.cc b/src/core/ext/filters/client_channel/subchannel_stream_client.cc index 2d0301c0bfd..396d8d2b570 100644 --- a/src/core/ext/filters/client_channel/subchannel_stream_client.cc +++ b/src/core/ext/filters/client_channel/subchannel_stream_client.cc @@ -32,7 +32,6 @@ #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/transport/error_utils.h" @@ -128,7 +127,7 @@ void SubchannelStreamClient::StartRetryTimerLocked() { if (GPR_UNLIKELY(tracer_ != nullptr)) { gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...", tracer_, this); - Duration timeout = next_try - ExecCtx::Get()->Now(); + Duration timeout = next_try - Timestamp::Now(); if (timeout > Duration::Zero()) { gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_, this, timeout.millis()); diff --git a/src/core/ext/filters/fault_injection/fault_injection_filter.cc b/src/core/ext/filters/fault_injection/fault_injection_filter.cc index e1174ace8ad..567b5ab1a6b 100644 --- a/src/core/ext/filters/fault_injection/fault_injection_filter.cc +++ b/src/core/ext/filters/fault_injection/fault_injection_filter.cc @@ -42,7 +42,6 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/sleep.h" #include "src/core/lib/promise/try_seq.h" @@ -251,7 +250,7 @@ bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota() const { Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() { if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota()) { active_fault_ = FaultHandle{true}; - return ExecCtx::Get()->Now() + delay_time_; + return Timestamp::Now() + delay_time_; } return Timestamp::InfPast(); } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 0049ab89c14..51495038fcb 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -358,7 +358,7 @@ void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() { // Timestamp GetConnectionDeadline(const ChannelArgs& args) { - return ExecCtx::Get()->Now() + + return Timestamp::Now() + std::max( Duration::Milliseconds(1), args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS) @@ -587,7 +587,7 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() { this, nullptr); grpc_timer_init( &drain_grace_timer_, - ExecCtx::Get()->Now() + + Timestamp::Now() + std::max( Duration::Zero(), listener_->args_ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 980bbea2c4b..8341bd9ab32 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -442,7 +442,7 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::Timestamp::Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } else { // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no @@ -1602,8 +1602,7 @@ class GracefulGoaway : public grpc_core::RefCounted { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); Ref().release(); // Ref for the timer grpc_timer_init( - &timer_, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(20), + &timer_, grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(20), GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr)); } @@ -2647,7 +2646,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) { GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::Timestamp::Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { @@ -2661,7 +2660,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) { GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::Timestamp::Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); @@ -2690,7 +2689,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) { GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_watchdog_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, + grpc_core::Timestamp::Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); t->keepalive_ping_started = true; } @@ -2726,7 +2725,7 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) { GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::Timestamp::Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 062d0f6272d..61f6854722e 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -38,7 +38,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/memory_quota.h" grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl"); @@ -114,7 +113,7 @@ TransportFlowControl::TransportFlowControl(const char* name, .set_min_control_value(-1) .set_max_control_value(25) .set_integral_range(10)), - last_pid_update_(ExecCtx::Get()->Now()) {} + last_pid_update_(Timestamp::Now()) {} uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { const uint32_t target_announced_window = @@ -207,7 +206,7 @@ double TransportFlowControl::TargetLogBdp() { } double TransportFlowControl::SmoothLogBdp(double value) { - Timestamp now = ExecCtx::Get()->Now(); + Timestamp now = Timestamp::Now(); double bdp_error = value - pid_controller_.last_control_value(); const double dt = (now - last_pid_update_).seconds(); last_pid_update_ = now; diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.cc b/src/core/ext/transport/chttp2/transport/frame_ping.cc index 3c53444963c..7e07528df5b 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.cc +++ b/src/core/ext/transport/chttp2/transport/frame_ping.cc @@ -32,7 +32,6 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/exec_ctx.h" static bool g_disable_ping_ack = false; @@ -95,7 +94,7 @@ grpc_error_handle grpc_chttp2_ping_parser_parse(void* parser, grpc_chttp2_ack_ping(t, p->opaque_8bytes); } else { if (!t->is_client) { - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); grpc_core::Timestamp next_allowed_ping = t->ping_recv_state.last_ping_recv_time + t->ping_policy.min_recv_ping_interval_without_data; diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index 97bebe1edee..788417417db 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -33,7 +33,6 @@ #include "src/core/ext/transport/chttp2/transport/hpack_constants.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder_table.h" #include "src/core/ext/transport/chttp2/transport/varint.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/surface/validate_metadata.h" #include "src/core/lib/transport/timeout_encoding.h" @@ -519,7 +518,7 @@ void HPackCompressor::Framer::EncodeRepeatingSliceValue( } void HPackCompressor::Framer::Encode(GrpcTimeoutMetadata, Timestamp deadline) { - Timeout timeout = Timeout::FromDuration(deadline - ExecCtx::Get()->Now()); + Timeout timeout = Timeout::FromDuration(deadline - Timestamp::Now()); for (auto it = compressor_->previous_timeouts_.begin(); it != compressor_->previous_timeouts_.end(); ++it) { double ratio = timeout.RatioVersus(it->timeout); diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index a103d0f7ca4..8aa2149ffbd 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -111,7 +111,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { // in a loop while draining the currently-held combiner. Also see // https://github.com/grpc/grpc/issues/26079. grpc_core::ExecCtx::Get()->InvalidateNow(); - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); grpc_core::Duration next_allowed_ping_interval = grpc_core::Duration::Zero(); if (t->is_client) { diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 3f3bc8e33fd..6818a2e1cdb 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -46,6 +46,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/uri/uri_parser.h" #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -162,7 +163,7 @@ class XdsClient::ChannelState::AdsCallState XdsClient* xds_client() const { return ads_call_state_->xds_client(); } AdsCallState* ads_call_state_; - const Timestamp update_time_ = ExecCtx::Get()->Now(); + const Timestamp update_time_ = Timestamp::Now(); Result result_; }; @@ -626,7 +627,7 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { if (shutting_down_) return; const Timestamp next_attempt_time = backoff_.NextAttemptTime(); const Duration timeout = - std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero()); + std::max(next_attempt_time - Timestamp::Now(), Duration::Zero()); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] xds server %s: call attempt failed; " @@ -1958,7 +1959,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( } } // Compute load report interval. - const Timestamp now = ExecCtx::Get()->Now(); + const Timestamp now = Timestamp::Now(); snapshot.load_report_interval = now - load_report.last_report_time; load_report.last_report_time = now; // Record snapshot. diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index b936e9ac82c..67229be1fca 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -45,7 +45,6 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/uri/uri_parser.h" namespace grpc_core { @@ -252,7 +251,7 @@ class XdsClient : public DualRefCounted { std::map, LocalityState, XdsLocalityName::Less> locality_stats; - Timestamp last_report_time = ExecCtx::Get()->Now(); + Timestamp last_report_time = Timestamp::Now(); }; // Load report data. diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index 8238e5cfae4..bec0af3ea8d 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -22,8 +22,6 @@ #include -#include "src/core/lib/iomgr/exec_ctx.h" - namespace grpc_core { BackOff::BackOff(const Options& options) : options_(options) { Reset(); } @@ -31,14 +29,14 @@ BackOff::BackOff(const Options& options) : options_(options) { Reset(); } Timestamp BackOff::NextAttemptTime() { if (initial_) { initial_ = false; - return current_backoff_ + ExecCtx::Get()->Now(); + return current_backoff_ + Timestamp::Now(); } current_backoff_ = std::min(current_backoff_ * options_.multiplier(), options_.max_backoff()); const Duration jitter = Duration::FromSecondsAsDouble( absl::Uniform(rand_gen_, -options_.jitter() * current_backoff_.seconds(), options_.jitter() * current_backoff_.seconds())); - return ExecCtx::Get()->Now() + current_backoff_ + jitter; + return Timestamp::Now() + current_backoff_ + jitter; } void BackOff::Reset() { diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc index 09d9af5660a..7d767b90290 100644 --- a/src/core/lib/channel/channel_trace.cc +++ b/src/core/lib/channel/channel_trace.cc @@ -30,7 +30,6 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_refcount.h" @@ -41,7 +40,7 @@ ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data, RefCountedPtr referenced_entity) : severity_(severity), data_(data), - timestamp_(ExecCtx::Get()->Now().as_timespec(GPR_CLOCK_REALTIME)), + timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)), next_(nullptr), referenced_entity_(std::move(referenced_entity)), memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)) {} @@ -49,7 +48,7 @@ ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data, ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data) : severity_(severity), data_(data), - timestamp_(ExecCtx::Get()->Now().as_timespec(GPR_CLOCK_REALTIME)), + timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)), next_(nullptr), memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)) {} @@ -65,7 +64,7 @@ ChannelTrace::ChannelTrace(size_t max_event_memory) return; // tracing is disabled if max_event_memory_ == 0 } gpr_mu_init(&tracer_mu_); - time_created_ = ExecCtx::Get()->Now().as_timespec(GPR_CLOCK_REALTIME); + time_created_ = Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME); } ChannelTrace::~ChannelTrace() { diff --git a/src/core/lib/gpr/time_precise.h b/src/core/lib/gpr/time_precise.h index 1dcfa3b57e6..a41047afb0b 100644 --- a/src/core/lib/gpr/time_precise.h +++ b/src/core/lib/gpr/time_precise.h @@ -28,7 +28,7 @@ // low as a usec. Use other clock sources or gpr_precise_clock_now(), // where you need high resolution clocks. // -// Using gpr_get_cycle_counter() is preferred to using ExecCtx::Get()->Now() +// Using gpr_get_cycle_counter() is preferred to using Timestamp::Now() // whenever possible. #if GPR_CYCLE_COUNTER_CUSTOM diff --git a/src/core/lib/gprpp/time.cc b/src/core/lib/gprpp/time.cc index cddc4da2563..e588b128a8d 100644 --- a/src/core/lib/gprpp/time.cc +++ b/src/core/lib/gprpp/time.cc @@ -28,6 +28,8 @@ #include #include +#include "src/core/lib/gprpp/no_destruct.h" + namespace grpc_core { namespace { @@ -35,6 +37,13 @@ namespace { std::atomic g_process_epoch_seconds; std::atomic g_process_epoch_cycles; +class GprNowTimeSource final : public Timestamp::Source { + public: + Timestamp Now() override { + return Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); + } +}; + GPR_ATTRIBUTE_NOINLINE std::pair InitTime() { gpr_cycle_counter cycles_start = 0; gpr_cycle_counter cycles_end = 0; @@ -133,6 +142,18 @@ int64_t TimespanToMillisRoundDown(gpr_timespec ts) { } // namespace +GPR_THREAD_LOCAL(Timestamp::Source*) +Timestamp::thread_local_time_source_{ + NoDestructSingleton::Get()}; + +Timestamp ScopedTimeCache::Now() { + if (!cached_time_.has_value()) { + previous()->InvalidateCache(); + cached_time_ = previous()->Now(); + } + return cached_time_.value(); +} + Timestamp Timestamp::FromTimespecRoundUp(gpr_timespec ts) { return FromMillisecondsAfterProcessEpoch(TimespanToMillisRoundUp(gpr_time_sub( gpr_convert_clock_type(ts, GPR_CLOCK_MONOTONIC), StartTime()))); diff --git a/src/core/lib/gprpp/time.h b/src/core/lib/gprpp/time.h index a8e07790c34..b695c4b373a 100644 --- a/src/core/lib/gprpp/time.h +++ b/src/core/lib/gprpp/time.h @@ -23,11 +23,14 @@ #include #include +#include "absl/types/optional.h" + #include #include #include #include "src/core/lib/gpr/time_precise.h" +#include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" namespace grpc_core { @@ -61,6 +64,35 @@ class Duration; // Timestamp represents a discrete point in time. class Timestamp { public: + // Base interface for time providers. + class Source { + public: + // Return the current time. + virtual Timestamp Now() = 0; + virtual void InvalidateCache() {} + + protected: + // We don't delete through this interface, so non-virtual dtor is fine. + ~Source() = default; + }; + + class ScopedSource : public Source { + public: + ScopedSource() : previous_(thread_local_time_source_) { + thread_local_time_source_ = this; + } + ScopedSource(const ScopedSource&) = delete; + ScopedSource& operator=(const ScopedSource&) = delete; + void InvalidateCache() override { previous_->InvalidateCache(); } + + protected: + ~ScopedSource() { thread_local_time_source_ = previous_; } + Source* previous() const { return previous_; } + + private: + Source* const previous_; + }; + constexpr Timestamp() = default; // Constructs a Timestamp from a gpr_timespec. static Timestamp FromTimespecRoundDown(gpr_timespec t); @@ -70,6 +102,8 @@ class Timestamp { static Timestamp FromCycleCounterRoundUp(gpr_cycle_counter c); static Timestamp FromCycleCounterRoundDown(gpr_cycle_counter c); + static Timestamp Now() { return thread_local_time_source_->Now(); } + static constexpr Timestamp FromMillisecondsAfterProcessEpoch(int64_t millis) { return Timestamp(millis); } @@ -116,6 +150,21 @@ class Timestamp { explicit constexpr Timestamp(int64_t millis) : millis_(millis) {} int64_t millis_ = 0; + static GPR_THREAD_LOCAL(Timestamp::Source*) thread_local_time_source_; +}; + +class ScopedTimeCache final : public Timestamp::ScopedSource { + public: + Timestamp Now() override; + + void InvalidateCache() override { + cached_time_ = absl::nullopt; + Timestamp::ScopedSource::InvalidateCache(); + } + void TestOnlySetNow(Timestamp now) { cached_time_ = now; } + + private: + absl::optional cached_time_; }; // Duration represents a span of time. diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 164df39fa40..2bb095a4165 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -633,7 +633,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) { if (millis == grpc_core::Timestamp::InfFuture()) return -1; - int64_t delta = (millis - grpc_core::ExecCtx::Get()->Now()).millis(); + int64_t delta = (millis - grpc_core::Timestamp::Now()).millis(); if (delta > INT_MAX) { return INT_MAX; } else if (delta < 0) { diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 8206097ddae..968428c4857 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -947,7 +947,7 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset, while (keep_polling) { keep_polling = 0; if (!pollset->kicked_without_pollers || - deadline <= grpc_core::ExecCtx::Get()->Now()) { + deadline <= grpc_core::Timestamp::Now()) { if (!added_worker) { push_front_worker(pollset, &worker); added_worker = 1; @@ -1145,7 +1145,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline) { if (deadline == grpc_core::Timestamp::InfFuture()) return -1; if (deadline.is_process_epoch()) return 0; - int64_t n = (deadline - grpc_core::ExecCtx::Get()->Now()).millis(); + int64_t n = (deadline - grpc_core::Timestamp::Now()).millis(); if (n < 0) return 0; if (n > INT_MAX) return -1; return static_cast(n); diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index a7b72dd5e4b..5a37133ce05 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -77,14 +77,6 @@ bool ExecCtx::Flush() { return did_something; } -Timestamp ExecCtx::Now() { - if (!now_is_valid_) { - now_ = Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); - now_is_valid_ = true; - } - return now_; -} - void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure, grpc_error_handle error) { (void)location; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index db8fec8a64f..08aeb230838 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -176,30 +176,14 @@ class ExecCtx { } } - /** Returns the stored current time relative to start if valid, - * otherwise refreshes the stored time, sets it valid and returns the new - * value. - */ - Timestamp Now(); - - /** Invalidates the stored time value. A new time value will be set on calling - * Now(). - */ - void InvalidateNow() { now_is_valid_ = false; } - - /** To be used only by shutdown code in iomgr */ + Timestamp Now() { return Timestamp::Now(); } + void InvalidateNow() { time_cache_.InvalidateCache(); } void SetNowIomgrShutdown() { - now_ = Timestamp::InfFuture(); - now_is_valid_ = true; - } - - /** To be used only for testing. - * Sets the now value. - */ - void TestOnlySetNow(Timestamp new_val) { - now_ = new_val; - now_is_valid_ = true; + // We get to do a test only set now on this path just because iomgr + // is getting removed and no point adding more interfaces for it. + time_cache_.TestOnlySetNow(Timestamp::InfFuture()); } + void TestOnlySetNow(Timestamp now) { time_cache_.TestOnlySetNow(now); } /** Gets pointer to current exec_ctx. */ static ExecCtx* Get() { return exec_ctx_; } @@ -226,9 +210,7 @@ class ExecCtx { unsigned starting_cpu_ = std::numeric_limits::max(); - bool now_is_valid_ = false; - Timestamp now_; - + ScopedTimeCache time_cache_; static GPR_THREAD_LOCAL(ExecCtx*) exec_ctx_; ExecCtx* last_exec_ctx_ = Get(); }; diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index 11980776cac..13a2e9acb6c 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -48,7 +48,7 @@ static DWORD deadline_to_millis_timeout(grpc_core::Timestamp deadline) { if (deadline == grpc_core::Timestamp::InfFuture()) { return INFINITE; } - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); if (deadline < now) return 0; grpc_core::Duration timeout = deadline - now; if (timeout.millis() > std::numeric_limits::max()) return INFINITE; diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index e55c288dea6..268854f392c 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -582,7 +582,7 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) { } gpr_mu_lock(p->pollset_mu); grpc_core::Timestamp deadline = - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(10); + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10); GRPC_LOG_IF_ERROR( "backup_poller:pollset_work", grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline)); diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 779ce0929dd..72525e4dc12 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -253,7 +253,7 @@ static void timer_list_init() { g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); - g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now(); + g_shared_mutables.min_timer = grpc_core::Timestamp::Now(); g_last_seen_min_timer = 0; @@ -343,7 +343,7 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline, if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) { gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]", timer, deadline.milliseconds_after_process_epoch(), - grpc_core::ExecCtx::Get()->Now().milliseconds_after_process_epoch(), + grpc_core::Timestamp::Now().milliseconds_after_process_epoch(), closure, closure->cb); } @@ -358,7 +358,7 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline, gpr_mu_lock(&shard->mu); timer->pending = true; - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); if (deadline <= now) { timer->pending = false; grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE); @@ -665,7 +665,7 @@ static grpc_timer_check_result run_some_expired_timers( static grpc_timer_check_result timer_check(grpc_core::Timestamp* next) { // prelude - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 5cb236c913f..49b3bbedb54 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -184,8 +184,7 @@ static bool wait_until(grpc_core::Timestamp next) { g_timed_waiter_deadline = next; if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) { - grpc_core::Duration wait_time = - next - grpc_core::ExecCtx::Get()->Now(); + grpc_core::Duration wait_time = next - grpc_core::Timestamp::Now(); gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time.millis()); } diff --git a/src/core/lib/promise/sleep.cc b/src/core/lib/promise/sleep.cc index abada25478e..44d5651d781 100644 --- a/src/core/lib/promise/sleep.cc +++ b/src/core/lib/promise/sleep.cc @@ -41,7 +41,7 @@ Poll Sleep::operator()() { // TODO(ctiller): the following can be safely removed when we remove ExecCtx. ExecCtx::Get()->InvalidateNow(); // If the deadline is earlier than now we can just return. - if (deadline_ <= ExecCtx::Get()->Now()) return absl::OkStatus(); + if (deadline_ <= Timestamp::Now()) return absl::OkStatus(); if (closure_ == nullptr) { // TODO(ctiller): it's likely we'll want a pool of closures - probably per // cpu? - to avoid allocating/deallocating on fast paths. @@ -54,7 +54,7 @@ Poll Sleep::operator()() { Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) : waker_(Activity::current()->MakeOwningWaker()), timer_handle_(GetDefaultEventEngine()->RunAfter( - deadline - ExecCtx::Get()->Now(), this)) {} + deadline - Timestamp::Now(), this)) {} void Sleep::ActiveClosure::Run() { ApplicationCallbackExecCtx callback_exec_ctx; diff --git a/src/core/lib/resource_quota/periodic_update.cc b/src/core/lib/resource_quota/periodic_update.cc index 4a3faaed736..78d424ccca9 100644 --- a/src/core/lib/resource_quota/periodic_update.cc +++ b/src/core/lib/resource_quota/periodic_update.cc @@ -19,13 +19,12 @@ #include #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef f) { if (period_start_ == Timestamp::ProcessEpoch()) { - period_start_ = ExecCtx::Get()->Now(); + period_start_ = Timestamp::Now(); updates_remaining_.store(1, std::memory_order_release); return false; } @@ -34,7 +33,7 @@ bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef f) { // We can now safely mutate any non-atomic mutable variables (we've got a // guarantee that no other thread will), and by the time this function returns // we must store a postive number into updates_remaining_. - auto now = ExecCtx::Get()->Now(); + auto now = Timestamp::Now(); Duration time_so_far = now - period_start_; if (time_so_far < period_) { // At most double the number of updates remaining until the next period. diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index 058ccd16977..671fb2e16e1 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -218,7 +218,7 @@ static int is_metadata_server_reachable() { GPR_ASSERT(uri.ok()); // params are hardcoded auto http_request = grpc_core::HttpRequest::Get( std::move(*uri), nullptr /* channel args */, &detector.pollent, &request, - grpc_core::ExecCtx::Get()->Now() + max_detection_delay, + grpc_core::Timestamp::Now() + max_detection_delay, GRPC_CLOSURE_CREATE(on_metadata_server_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response, diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc index 9a4a4712f5a..421bebb141e 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc @@ -733,7 +733,7 @@ static void on_openid_config_retrieved(void* user_data, } ctx->http_request = grpc_core::HttpRequest::Get( std::move(*uri), nullptr /* channel args */, &ctx->pollent, &req, - grpc_core::ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, + grpc_core::Timestamp::Now() + grpc_jwt_verifier_max_delay, GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS], grpc_core::CreateHttpRequestSSLCredentials()); @@ -864,7 +864,7 @@ static void retrieve_key_and_verify(verifier_cb_ctx* ctx) { } ctx->http_request = grpc_core::HttpRequest::Get( std::move(*uri), nullptr /* channel args */, &ctx->pollent, &req, - grpc_core::ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, http_cb, + grpc_core::Timestamp::Now() + grpc_jwt_verifier_max_delay, http_cb, &ctx->responses[rsp_idx], grpc_core::CreateHttpRequestSSLCredentials()); ctx->http_request->Start(); gpr_free(host); diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index 4c17745ebb8..ea92745a002 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -49,7 +49,6 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/http/httpcli_ssl_credentials.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/json/json.h" @@ -335,7 +334,7 @@ grpc_oauth2_token_fetcher_credentials::GetRequestMetadata( if (start_fetch) { fetch_oauth2(new grpc_credentials_metadata_request(Ref()), &pollent_, on_oauth2_token_fetcher_http_response, - grpc_core::ExecCtx::Get()->Now() + refresh_threshold); + grpc_core::Timestamp::Now() + refresh_threshold); } return [pending_request]() diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index b114bc53178..325c08b1c23 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -929,7 +929,7 @@ class ExecCtxNext : public grpc_core::ExecCtx { return true; } } - return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); + return !a->first_loop && a->deadline < grpc_core::Timestamp::Now(); } private: @@ -1033,7 +1033,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, } if (!is_finished_arg.first_loop && - grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { + grpc_core::Timestamp::Now() >= deadline_millis) { ret.type = GRPC_QUEUE_TIMEOUT; ret.success = 0; dump_pending_tags(cq); @@ -1188,7 +1188,7 @@ class ExecCtxPluck : public grpc_core::ExecCtx { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); + return !a->first_loop && a->deadline < grpc_core::Timestamp::Now(); } private: @@ -1279,7 +1279,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } if (!is_finished_arg.first_loop && - grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { + grpc_core::Timestamp::Now() >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); ret.type = GRPC_QUEUE_TIMEOUT; diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 83e2165ac3a..cc3ede26e2d 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -25,8 +25,6 @@ #include -#include "src/core/lib/iomgr/exec_ctx.h" - grpc_core::TraceFlag grpc_bdp_estimator_trace(false, "bdp_estimator"); namespace grpc_core { @@ -82,7 +80,7 @@ Timestamp BdpEstimator::CompletePing() { } ping_state_ = PingState::UNSCHEDULED; accumulator_ = 0; - return ExecCtx::Get()->Now() + inter_ping_delay_; + return Timestamp::Now() + inter_ping_delay_; } } // namespace grpc_core diff --git a/src/core/lib/transport/metadata_batch.cc b/src/core/lib/transport/metadata_batch.cc index f26d131e816..b0cfca78186 100644 --- a/src/core/lib/transport/metadata_batch.cc +++ b/src/core/lib/transport/metadata_batch.cc @@ -24,7 +24,6 @@ #include "absl/strings/match.h" #include "absl/strings/str_cat.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/timeout_encoding.h" namespace grpc_core { @@ -120,11 +119,11 @@ GrpcTimeoutMetadata::ValueType GrpcTimeoutMetadata::MementoToValue( if (timeout == Duration::Infinity()) { return Timestamp::InfFuture(); } - return ExecCtx::Get()->Now() + timeout; + return Timestamp::Now() + timeout; } Slice GrpcTimeoutMetadata::Encode(ValueType x) { - return Timeout::FromDuration(x - ExecCtx::Get()->Now()).Encode(); + return Timeout::FromDuration(x - Timestamp::Now()).Encode(); } TeMetadata::MementoType TeMetadata::ParseMemento( diff --git a/src/core/lib/transport/status_conversion.cc b/src/core/lib/transport/status_conversion.cc index 3b11a85fbdc..d301c2a36bb 100644 --- a/src/core/lib/transport/status_conversion.cc +++ b/src/core/lib/transport/status_conversion.cc @@ -20,8 +20,6 @@ #include "src/core/lib/transport/status_conversion.h" -#include "src/core/lib/iomgr/exec_ctx.h" - grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status) { switch (status) { case GRPC_STATUS_OK: @@ -50,7 +48,7 @@ grpc_status_code grpc_http2_error_to_grpc_status( case GRPC_HTTP2_CANCEL: /* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been * exceeded */ - return grpc_core::ExecCtx::Get()->Now() > deadline + return grpc_core::Timestamp::Now() > deadline ? GRPC_STATUS_DEADLINE_EXCEEDED : GRPC_STATUS_CANCELLED; case GRPC_HTTP2_ENHANCE_YOUR_CALM: diff --git a/test/core/backoff/backoff_test.cc b/test/core/backoff/backoff_test.cc index 198213d5842..dad640872b7 100644 --- a/test/core/backoff/backoff_test.cc +++ b/test/core/backoff/backoff_test.cc @@ -46,11 +46,11 @@ TEST(BackOffTest, ConstantBackOff) { BackOff backoff(options); grpc_core::Timestamp next_attempt_start_time = backoff.NextAttemptTime(); - EXPECT_EQ(next_attempt_start_time - grpc_core::ExecCtx::Get()->Now(), + EXPECT_EQ(next_attempt_start_time - grpc_core::Timestamp::Now(), initial_backoff); for (int i = 0; i < 10000; i++) { next_attempt_start_time = backoff.NextAttemptTime(); - EXPECT_EQ(next_attempt_start_time - grpc_core::ExecCtx::Get()->Now(), + EXPECT_EQ(next_attempt_start_time - grpc_core::Timestamp::Now(), initial_backoff); } } @@ -68,7 +68,7 @@ TEST(BackOffTest, MinConnect) { .set_max_backoff(max_backoff); BackOff backoff(options); grpc_core::Timestamp next = backoff.NextAttemptTime(); - EXPECT_EQ(next - grpc_core::ExecCtx::Get()->Now(), initial_backoff); + EXPECT_EQ(next - grpc_core::Timestamp::Now(), initial_backoff); } TEST(BackOffTest, NoJitterBackOff) { @@ -145,7 +145,7 @@ TEST(BackOffTest, JitterBackOff) { grpc_core::ExecCtx exec_ctx; grpc_core::Timestamp next = backoff.NextAttemptTime(); - EXPECT_EQ(next - grpc_core::ExecCtx::Get()->Now(), initial_backoff); + EXPECT_EQ(next - grpc_core::Timestamp::Now(), initial_backoff); auto expected_next_lower_bound = grpc_core::Duration::Milliseconds( static_cast(current_backoff.millis()) * (1 - jitter)); @@ -157,7 +157,7 @@ TEST(BackOffTest, JitterBackOff) { // next-now must be within (jitter*100)% of the current backoff (which // increases by * multiplier up to max_backoff). const grpc_core::Duration timeout_millis = - next - grpc_core::ExecCtx::Get()->Now(); + next - grpc_core::Timestamp::Now(); EXPECT_GE(timeout_millis, expected_next_lower_bound); EXPECT_LE(timeout_millis, expected_next_upper_bound); current_backoff = std::min( diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 1b2a6a49dae..5a8f66559b9 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -94,11 +94,11 @@ class TestDNSResolver : public grpc_core::DNSResolver { last_resolution_time = now; } // For correct time diff comparisons, make sure that any subsequent calls - // to grpc_core::ExecCtx::Get()->Now() on this thread don't return a time + // to grpc_core::Timestamp::Now() on this thread don't return a time // which is earlier than that returned by the call(s) to // gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important // because the resolver's last_resolution_timestamp_ will be taken from - // grpc_core::ExecCtx::Get()->Now() right after this returns. + // grpc_core::Timestamp::Now() right after this returns. grpc_core::ExecCtx::Get()->InvalidateNow(); return result; } @@ -164,11 +164,11 @@ static grpc_ares_request* test_dns_lookup_ares( } last_resolution_time = now; // For correct time diff comparisons, make sure that any subsequent calls - // to grpc_core::ExecCtx::Get()->Now() on this thread don't return a time + // to grpc_core::Timestamp::Now() on this thread don't return a time // which is earlier than that returned by the call(s) to // gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important // because the resolver's last_resolution_timestamp_ will be taken from - // grpc_core::ExecCtx::Get()->Now() right after this returns. + // grpc_core::Timestamp::Now() right after this returns. grpc_core::ExecCtx::Get()->InvalidateNow(); return result; } @@ -217,7 +217,7 @@ static void poll_pollset_until_request_done(iomgr_args* args) { if (done) { break; } - grpc_core::Duration time_left = deadline - grpc_core::ExecCtx::Get()->Now(); + grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now(); gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64, done, time_left.millis()); ASSERT_GE(time_left, grpc_core::Duration::Zero()); grpc_pollset_worker* worker = nullptr; diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 23acaa5058c..8be511f689d 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -552,7 +552,7 @@ static void on_read_request_done_locked(void* arg, grpc_error_handle error) { // Connect to requested address. // The connection callback inherits our reference to conn. const grpc_core::Timestamp deadline = - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(10); + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10); GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn, grpc_schedule_on_exec_ctx); auto args = grpc_core::CoreConfiguration::Get() @@ -616,7 +616,7 @@ static void thread_main(void* arg) { gpr_mu_lock(proxy->mu); GRPC_LOG_IF_ERROR("grpc_pollset_work", grpc_pollset_work(proxy->pollset[0], &worker, - grpc_core::ExecCtx::Get()->Now() + + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(1))); gpr_mu_unlock(proxy->mu); grpc_core::ExecCtx::Get()->Flush(); diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 468a2916275..dfa3cc124dc 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -151,7 +151,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver { : name_(std::string(name)), on_done_(std::move(on_done)) { grpc_timer_init( &timer_, - grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(), + grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(), GRPC_CLOSURE_CREATE(FinishResolve, this, grpc_schedule_on_exec_ctx)); } @@ -242,8 +242,7 @@ grpc_ares_request* my_dns_lookup_ares( r->on_done = on_done; r->addresses = addresses; grpc_timer_init( - &r->timer, - grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(), + &r->timer, grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(), GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx)); return nullptr; } @@ -308,8 +307,7 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep, fc->ep = ep; fc->deadline = deadline; grpc_timer_init( - &fc->timer, - grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(), + &fc->timer, grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(), GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx)); } diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index c4fda22b9cf..6eec7469846 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -116,7 +116,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_server_shutdown_and_notify(server, cq, tag(0xdead)); grpc_server_cancel_all_calls(server); grpc_core::Timestamp deadline = - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(5); + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(5); for (int i = 0; i <= requested_calls; i++) { // A single grpc_completion_queue_next might not be sufficient for getting // the tag from shutdown, because we might potentially get blocked by @@ -132,7 +132,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { nullptr); grpc_core::ExecCtx::Get()->InvalidateNow(); } while (ev.type != GRPC_OP_COMPLETE && - grpc_core::ExecCtx::Get()->Now() < deadline); + grpc_core::Timestamp::Now() < deadline); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); } grpc_completion_queue_shutdown(cq); @@ -142,7 +142,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { nullptr); grpc_core::ExecCtx::Get()->InvalidateNow(); } while (ev.type != GRPC_QUEUE_SHUTDOWN && - grpc_core::ExecCtx::Get()->Now() < deadline); + grpc_core::Timestamp::Now() < deadline); GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); } grpc_server_destroy(server); diff --git a/test/core/event_engine/posix/timer_manager_test.cc b/test/core/event_engine/posix/timer_manager_test.cc index ec3370a7610..77c744283ff 100644 --- a/test/core/event_engine/posix/timer_manager_test.cc +++ b/test/core/event_engine/posix/timer_manager_test.cc @@ -35,7 +35,7 @@ namespace posix_engine { TEST(TimerManagerTest, StressTest) { grpc_core::ExecCtx exec_ctx; - auto now = exec_ctx.Now(); + auto now = grpc_core::Timestamp::Now(); auto test_deadline = now + grpc_core::Duration::Seconds(15); std::vector timers; constexpr int kTimerCount = 500; @@ -58,7 +58,7 @@ TEST(TimerManagerTest, StressTest) { // Wait for all callbacks to have been called while (called.load(std::memory_order_relaxed) < kTimerCount) { exec_ctx.InvalidateNow(); - if (exec_ctx.Now() > test_deadline) { + if (grpc_core::Timestamp::Now() > test_deadline) { FAIL() << "Deadline exceeded. " << called.load(std::memory_order_relaxed) << "/" << kTimerCount << " callbacks executed"; diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index a6199fdcbaa..13c68e7a8b6 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -265,7 +265,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_mu_lock(g_mu); while (!state.read_done || !state.write_done) { grpc_pollset_worker* worker = nullptr; - GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline); + GPR_ASSERT(grpc_core::Timestamp::Now() < deadline); GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); } @@ -291,7 +291,7 @@ static void wait_for_fail_count(int* fail_count, int want_fail_count) { gpr_mu_lock(g_mu); grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp( grpc_timeout_seconds_to_deadline(10)); - while (grpc_core::ExecCtx::Get()->Now() < deadline && + while (grpc_core::Timestamp::Now() < deadline && *fail_count < want_fail_count) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc index c00fe53488e..02009e45070 100644 --- a/test/core/iomgr/resolve_address_posix_test.cc +++ b/test/core/iomgr/resolve_address_posix_test.cc @@ -103,8 +103,7 @@ static void actually_poll(void* argsp) { if (args->done) { break; } - grpc_core::Duration time_left = - deadline - grpc_core::ExecCtx::Get()->Now(); + grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now(); gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64, args->done, time_left.millis()); ASSERT_GE(time_left, grpc_core::Duration::Zero()); diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc index 8523c19fee4..7c258901f51 100644 --- a/test/core/iomgr/resolve_address_test.cc +++ b/test/core/iomgr/resolve_address_test.cc @@ -99,8 +99,7 @@ class ResolveAddressTest : public ::testing::Test { if (done_) { break; } - grpc_core::Duration time_left = - deadline - grpc_core::ExecCtx::Get()->Now(); + grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now(); gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64, done_, time_left.millis()); ASSERT_GE(time_left, grpc_core::Duration::Zero()); diff --git a/test/core/iomgr/tcp_server_posix_test.cc b/test/core/iomgr/tcp_server_posix_test.cc index 6bb83b3d979..607bfc49e1a 100644 --- a/test/core/iomgr/tcp_server_posix_test.cc +++ b/test/core/iomgr/tcp_server_posix_test.cc @@ -284,7 +284,7 @@ static grpc_error_handle tcp_connect(const test_addr* remote, } gpr_log(GPR_DEBUG, "wait"); while (g_nconnects == nconnects_before && - deadline > grpc_core::ExecCtx::Get()->Now()) { + deadline > grpc_core::Timestamp::Now()) { grpc_pollset_worker* worker = nullptr; grpc_error_handle err; if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) != diff --git a/test/core/iomgr/timer_list_test.cc b/test/core/iomgr/timer_list_test.cc index 96be56a185e..89d5c62c3f7 100644 --- a/test/core/iomgr/timer_list_test.cc +++ b/test/core/iomgr/timer_list_test.cc @@ -58,7 +58,7 @@ static void add_test(void) { grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace); memset(cb_called, 0, sizeof(cb_called)); - grpc_core::Timestamp start = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp start = grpc_core::Timestamp::Now(); /* 10 ms timers. will expire in the current epoch */ for (i = 0; i < 10; i++) { @@ -178,7 +178,7 @@ void long_running_service_cleanup_test(void) { gpr_log(GPR_INFO, "long_running_service_cleanup_test"); - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); GPR_ASSERT(now.milliseconds_after_process_epoch() >= k25Days.millis()); grpc_timer_list_init(); grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace); diff --git a/test/core/promise/sleep_test.cc b/test/core/promise/sleep_test.cc index 3e97353ce3e..2bfd7effdde 100644 --- a/test/core/promise/sleep_test.cc +++ b/test/core/promise/sleep_test.cc @@ -36,7 +36,7 @@ namespace { TEST(Sleep, Zzzz) { ExecCtx exec_ctx; absl::Notification done; - Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1); + Timestamp done_time = Timestamp::Now() + Duration::Seconds(1); // Sleep for one second then set done to true. auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), [&done](absl::Status r) { @@ -45,13 +45,13 @@ TEST(Sleep, Zzzz) { }); done.WaitForNotification(); exec_ctx.InvalidateNow(); - EXPECT_GE(ExecCtx::Get()->Now(), done_time); + EXPECT_GE(Timestamp::Now(), done_time); } TEST(Sleep, AlreadyDone) { ExecCtx exec_ctx; absl::Notification done; - Timestamp done_time = ExecCtx::Get()->Now() - Duration::Seconds(1); + Timestamp done_time = Timestamp::Now() - Duration::Seconds(1); // Sleep for no time at all then set done to true. auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), [&done](absl::Status r) { @@ -64,7 +64,7 @@ TEST(Sleep, AlreadyDone) { TEST(Sleep, Cancel) { ExecCtx exec_ctx; absl::Notification done; - Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1); + Timestamp done_time = Timestamp::Now() + Duration::Seconds(1); // Sleep for one second but race it to complete immediately auto activity = MakeActivity( Race(Sleep(done_time), [] { return absl::CancelledError(); }), @@ -74,14 +74,14 @@ TEST(Sleep, Cancel) { }); done.WaitForNotification(); exec_ctx.InvalidateNow(); - EXPECT_LT(ExecCtx::Get()->Now(), done_time); + EXPECT_LT(Timestamp::Now(), done_time); } TEST(Sleep, MoveSemantics) { // ASAN should help determine if there are any memory leaks here ExecCtx exec_ctx; absl::Notification done; - Timestamp done_time = ExecCtx::Get()->Now() + Duration::Milliseconds(111); + Timestamp done_time = Timestamp::Now() + Duration::Milliseconds(111); Sleep donor(done_time); Sleep sleeper = std::move(donor); auto activity = MakeActivity(std::move(sleeper), InlineWakeupScheduler(), @@ -91,7 +91,7 @@ TEST(Sleep, MoveSemantics) { }); done.WaitForNotification(); exec_ctx.InvalidateNow(); - EXPECT_GE(ExecCtx::Get()->Now(), done_time); + EXPECT_GE(Timestamp::Now(), done_time); } TEST(Sleep, StressTest) { @@ -104,7 +104,8 @@ TEST(Sleep, StressTest) { for (int i = 0; i < kNumActivities; i++) { auto notification = std::make_shared(); auto activity = MakeActivity( - Sleep(exec_ctx.Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(), + Sleep(Timestamp::Now() + Duration::Seconds(1)), + ExecCtxWakeupScheduler(), [notification](absl::Status /*r*/) { notification->Notify(); }); notifications.push_back(std::move(notification)); activities.push_back(std::move(activity)); diff --git a/test/core/resource_quota/periodic_update_test.cc b/test/core/resource_quota/periodic_update_test.cc index 8d381e33255..81db560a255 100644 --- a/test/core/resource_quota/periodic_update_test.cc +++ b/test/core/resource_quota/periodic_update_test.cc @@ -40,14 +40,14 @@ TEST(PeriodicUpdateTest, SimpleTest) { { ExecCtx exec_ctx; upd = absl::make_unique(Duration::Seconds(1)); - start = exec_ctx.Now(); + start = Timestamp::Now(); } // Wait until the first period has elapsed. bool done = false; while (!done) { ExecCtx exec_ctx; upd->Tick([&](Duration elapsed) { - reset_start = ExecCtx::Get()->Now(); + reset_start = Timestamp::Now(); EXPECT_GE(elapsed, Duration::Seconds(1)); done = true; }); @@ -55,7 +55,7 @@ TEST(PeriodicUpdateTest, SimpleTest) { // Ensure that took at least 1 second. { ExecCtx exec_ctx; - EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1)); + EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(1)); start = reset_start; } // Do ten more update cycles @@ -64,8 +64,8 @@ TEST(PeriodicUpdateTest, SimpleTest) { while (!done) { ExecCtx exec_ctx; upd->Tick([&](Duration) { - reset_start = ExecCtx::Get()->Now(); - EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1)); + reset_start = Timestamp::Now(); + EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(1)); done = true; }); } @@ -73,8 +73,8 @@ TEST(PeriodicUpdateTest, SimpleTest) { // allowance for the presumed inaccuracy of this type. { ExecCtx exec_ctx; - EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1)); - EXPECT_LE(exec_ctx.Now() - start, Duration::Seconds(3)); + EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(1)); + EXPECT_LE(Timestamp::Now() - start, Duration::Seconds(3)); start = reset_start; } } @@ -88,7 +88,7 @@ TEST(PeriodicUpdate, ThreadTest) { { ExecCtx exec_ctx; upd = absl::make_unique(Duration::Seconds(1)); - start = exec_ctx.Now(); + start = Timestamp::Now(); } // Run ten threads all updating the counter continuously, for a total of ten // update cycles. @@ -113,8 +113,8 @@ TEST(PeriodicUpdate, ThreadTest) { // Ensure our ten cycles took at least 10 seconds, and no more than 30. { ExecCtx exec_ctx; - EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(10)); - EXPECT_LE(exec_ctx.Now() - start, Duration::Seconds(30)); + EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(10)); + EXPECT_LE(Timestamp::Now() - start, Duration::Seconds(30)); } } diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc index 6b84af77d68..9ab89d409b2 100644 --- a/test/core/security/ssl_server_fuzzer.cc +++ b/test/core/security/ssl_server_fuzzer.cc @@ -90,7 +90,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { creds->create_security_connector(grpc_core::ChannelArgs()); GPR_ASSERT(sc != nullptr); grpc_core::Timestamp deadline = - grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(); + grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(); struct handshake_state state; state.done_callback_called = false; diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index d63a3a49a50..149c2dbad3a 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -159,8 +159,8 @@ void bad_server_thread(void* vargs) { gpr_mu_lock(args->mu); while (!args->stop.load(std::memory_order_acquire)) { - grpc_core::Timestamp deadline = grpc_core::ExecCtx::Get()->Now() + - grpc_core::Duration::Milliseconds(100); + grpc_core::Timestamp deadline = + grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(100); grpc_pollset_worker* worker = nullptr; if (!GRPC_LOG_IF_ERROR( diff --git a/test/core/transport/binder/end2end/fuzzers/server_fuzzer.cc b/test/core/transport/binder/end2end/fuzzers/server_fuzzer.cc index 2ecf781cc92..44d8aa168f1 100644 --- a/test/core/transport/binder/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/transport/binder/end2end/fuzzers/server_fuzzer.cc @@ -94,7 +94,7 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) { grpc_server_shutdown_and_notify(server, cq, tag(0xdead)); grpc_server_cancel_all_calls(server); grpc_core::Timestamp deadline = - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(5); + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(5); for (int i = 0; i <= requested_calls; i++) { // A single grpc_completion_queue_next might not be sufficient for getting // the tag from shutdown, because we might potentially get blocked by @@ -110,7 +110,7 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) { nullptr); grpc_core::ExecCtx::Get()->InvalidateNow(); } while (ev.type != GRPC_OP_COMPLETE && - grpc_core::ExecCtx::Get()->Now() < deadline); + grpc_core::Timestamp::Now() < deadline); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); } grpc_completion_queue_shutdown(cq); @@ -120,7 +120,7 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) { nullptr); grpc_core::ExecCtx::Get()->InvalidateNow(); } while (ev.type != GRPC_QUEUE_SHUTDOWN && - grpc_core::ExecCtx::Get()->Now() < deadline); + grpc_core::Timestamp::Now() < deadline); GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); } grpc_server_destroy(server); diff --git a/test/core/transport/chttp2/flow_control_fuzzer.cc b/test/core/transport/chttp2/flow_control_fuzzer.cc index 2b1daa35767..ea5bcfd7695 100644 --- a/test/core/transport/chttp2/flow_control_fuzzer.cc +++ b/test/core/transport/chttp2/flow_control_fuzzer.cc @@ -162,7 +162,7 @@ void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) { kMaxAdvanceTimeMillis), GPR_TIMESPAN)); exec_ctx.InvalidateNow(); - if (exec_ctx.Now() >= next_bdp_ping_) { + if (Timestamp::Now() >= next_bdp_ping_) { scheduled_write_ = true; } } break; @@ -289,7 +289,7 @@ void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) { } if (scheduled_write_) { SendToRemote send; - if (exec_ctx.Now() >= next_bdp_ping_) { + if (Timestamp::Now() >= next_bdp_ping_) { if (auto* bdp = tfc_->bdp_estimator()) { bdp->SchedulePing(); bdp->StartPing(); diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc index 3d3fd152062..9980c9c2710 100644 --- a/test/core/transport/chttp2/settings_timeout_test.cc +++ b/test/core/transport/chttp2/settings_timeout_test.cc @@ -146,7 +146,7 @@ class Client { grpc_tcp_client_connect( state.closure(), &endpoint_, pollset_set, grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), - addresses_or->data(), ExecCtx::Get()->Now() + Duration::Seconds(1)); + addresses_or->data(), Timestamp::Now() + Duration::Seconds(1)); ASSERT_TRUE(PollUntilDone(&state, Timestamp::InfFuture())); ASSERT_EQ(GRPC_ERROR_NONE, state.error()); grpc_pollset_set_destroy(pollset_set); @@ -162,7 +162,7 @@ class Client { bool retval = true; // Use a deadline of 3 seconds, which is a lot more than we should // need for a 1-second timeout, but this helps avoid flakes. - Timestamp deadline = ExecCtx::Get()->Now() + Duration::Seconds(3); + Timestamp deadline = Timestamp::Now() + Duration::Seconds(3); while (true) { EventState state; grpc_endpoint_read(endpoint_, &read_buffer, state.closure(), @@ -226,15 +226,15 @@ class Client { while (true) { grpc_pollset_worker* worker = nullptr; gpr_mu_lock(mu_); - GRPC_LOG_IF_ERROR("grpc_pollset_work", - grpc_pollset_work(pollset_, &worker, - ExecCtx::Get()->Now() + - Duration::Milliseconds(100))); + GRPC_LOG_IF_ERROR( + "grpc_pollset_work", + grpc_pollset_work(pollset_, &worker, + Timestamp::Now() + Duration::Milliseconds(100))); // Flushes any work scheduled before or during polling. ExecCtx::Get()->Flush(); gpr_mu_unlock(mu_); if (state != nullptr && state->done()) return true; - if (ExecCtx::Get()->Now() >= deadline) return false; + if (Timestamp::Now() >= deadline) return false; } } diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index c42d37f1fbd..047458b6457 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -499,7 +499,7 @@ static void sched_next_channel_action_locked(half* m) { grpc_timer_init(&m->parent->channel_effects->timer, grpc_core::Duration::Milliseconds( m->parent->channel_effects->actions[0].wait_ms) + - grpc_core::ExecCtx::Get()->Now(), + grpc_core::Timestamp::Now(), GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m, grpc_schedule_on_exec_ctx)); } diff --git a/test/core/util/port_server_client.cc b/test/core/util/port_server_client.cc index 5643ea02627..790eaaac4ef 100644 --- a/test/core/util/port_server_client.cc +++ b/test/core/util/port_server_client.cc @@ -107,7 +107,7 @@ void grpc_free_port_using_server(int port) { GPR_ASSERT(uri.ok()); auto http_request = grpc_core::HttpRequest::Get( std::move(*uri), nullptr /* channel args */, &pr.pops, &req, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(30), + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30), GRPC_CLOSURE_CREATE(freed_port_from_server, &pr, grpc_schedule_on_exec_ctx), &rsp, @@ -121,7 +121,7 @@ void grpc_free_port_using_server(int port) { if (!GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker, - grpc_core::ExecCtx::Get()->Now() + + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(1)))) { pr.done = 1; } @@ -191,7 +191,7 @@ static void got_port_from_server(void* arg, grpc_error_handle error) { GPR_ASSERT(uri.ok()); pr->http_request = grpc_core::HttpRequest::Get( std::move(*uri), nullptr /* channel args */, &pr->pops, &req, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(30), + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30), GRPC_CLOSURE_CREATE(got_port_from_server, pr, grpc_schedule_on_exec_ctx), &pr->response, @@ -238,7 +238,7 @@ int grpc_pick_port_using_server(void) { GPR_ASSERT(uri.ok()); auto http_request = grpc_core::HttpRequest::Get( std::move(*uri), nullptr /* channel args */, &pr.pops, &req, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(30), + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30), GRPC_CLOSURE_CREATE(got_port_from_server, &pr, grpc_schedule_on_exec_ctx), &pr.response, @@ -252,7 +252,7 @@ int grpc_pick_port_using_server(void) { if (!GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker, - grpc_core::ExecCtx::Get()->Now() + + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(1)))) { pr.port = 0; } diff --git a/test/cpp/common/time_jump_test.cc b/test/cpp/common/time_jump_test.cc index 97a839efa53..53037c4316d 100644 --- a/test/cpp/common/time_jump_test.cc +++ b/test/cpp/common/time_jump_test.cc @@ -90,14 +90,13 @@ INSTANTIATE_TEST_SUITE_P(TimeJump, TimeJumpTest, TEST_P(TimeJumpTest, TimerRunning) { grpc_core::ExecCtx exec_ctx; grpc_timer timer; - grpc_timer_init( - &timer, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(3), - GRPC_CLOSURE_CREATE( - [](void*, grpc_error_handle error) { - GPR_ASSERT(error == GRPC_ERROR_CANCELLED); - }, - nullptr, grpc_schedule_on_exec_ctx)); + grpc_timer_init(&timer, + grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(3), + GRPC_CLOSURE_CREATE( + [](void*, grpc_error_handle error) { + GPR_ASSERT(error == GRPC_ERROR_CANCELLED); + }, + nullptr, grpc_schedule_on_exec_ctx)); gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); std::ostringstream cmd; cmd << "sudo date `date -v" << GetParam() << " \"+%m%d%H%M%y\"`"; diff --git a/test/cpp/common/timer_test.cc b/test/cpp/common/timer_test.cc index cb6a5f377ff..97f02028501 100644 --- a/test/cpp/common/timer_test.cc +++ b/test/cpp/common/timer_test.cc @@ -87,7 +87,7 @@ TEST_F(TimerTest, OneTimerExpires) { int timer_fired = 0; grpc_timer_init( &timer, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Milliseconds(500), + grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(500), GRPC_CLOSURE_CREATE( [](void* arg, grpc_error_handle) { int* timer_fired = static_cast(arg); @@ -113,7 +113,7 @@ TEST_F(TimerTest, MultipleTimersExpire) { int timer_fired = 0; for (int i = 0; i < kNumTimers; ++i) { grpc_timer_init(&timers[i], - grpc_core::ExecCtx::Get()->Now() + + grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(500) + grpc_core::Duration::Milliseconds(i), GRPC_CLOSURE_CREATE( @@ -147,7 +147,7 @@ TEST_F(TimerTest, CancelSomeTimers) { // and set a small firing time for timers which need to execute. grpc_timer_init( &timers[i], - grpc_core::ExecCtx::Get()->Now() + + grpc_core::Timestamp::Now() + ((i < kNumTimers / 2) ? grpc_core ::Duration::Milliseconds(60000) : grpc_core ::Duration::Milliseconds(100) + grpc_core::Duration::Milliseconds(i)), @@ -183,8 +183,7 @@ TEST_F(TimerTest, DISABLED_TimerNotCanceled) { grpc_core::ExecCtx exec_ctx; grpc_timer timer; grpc_timer_init( - &timer, - grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(10), + &timer, grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10), GRPC_CLOSURE_CREATE([](void*, grpc_error_handle) {}, nullptr, grpc_schedule_on_exec_ctx)); } @@ -198,17 +197,17 @@ TEST_F(TimerTest, DISABLED_CancelRace) { grpc_timer timers[kNumTimers]; for (int i = 0; i < kNumTimers; ++i) { grpc_timer* arg = (i != 0) ? &timers[i - 1] : nullptr; - grpc_timer_init(&timers[i], - grpc_core::ExecCtx::Get()->Now() + - grpc_core::Duration::Milliseconds(100), - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error_handle /*error*/) { - grpc_timer* timer = static_cast(arg); - if (timer) { - grpc_timer_cancel(timer); - } - }, - arg, grpc_schedule_on_exec_ctx)); + grpc_timer_init( + &timers[i], + grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(100), + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error_handle /*error*/) { + grpc_timer* timer = static_cast(arg); + if (timer) { + grpc_timer_cancel(timer); + } + }, + arg, grpc_schedule_on_exec_ctx)); } gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); } @@ -230,17 +229,17 @@ TEST_F(TimerTest, DISABLED_CancelNextTimer) { if (i < kNumTimers - 1) { arg = &timers[i + 1]; } - grpc_timer_init(&timers[i], - grpc_core::ExecCtx::Get()->Now() + - grpc_core::Duration::Milliseconds(100), - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error_handle /*error*/) { - grpc_timer* timer = static_cast(arg); - if (timer) { - grpc_timer_cancel(timer); - } - }, - arg, grpc_schedule_on_exec_ctx)); + grpc_timer_init( + &timers[i], + grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(100), + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error_handle /*error*/) { + grpc_timer* timer = static_cast(arg); + if (timer) { + grpc_timer_cancel(timer); + } + }, + arg, grpc_schedule_on_exec_ctx)); } grpc_timer_cancel(&timers[0]); gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); diff --git a/test/cpp/end2end/connection_attempt_injector.cc b/test/cpp/end2end/connection_attempt_injector.cc index 61f430b6618..21388f6a6a5 100644 --- a/test/cpp/end2end/connection_attempt_injector.cc +++ b/test/cpp/end2end/connection_attempt_injector.cc @@ -192,7 +192,7 @@ ConnectionAttemptInjector::InjectedDelay::InjectedDelay( const grpc_resolved_address* addr, grpc_core::Timestamp deadline) : attempt_(closure, ep, interested_parties, config, addr, deadline) { GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr); - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp now = grpc_core::Timestamp::Now(); duration = std::min(duration, deadline - now); grpc_timer_init(&timer_, now + duration, &timer_callback_); } diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index ca2cbce7718..1b6e73ab81d 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -73,7 +73,7 @@ BENCHMARK(BM_HpackEncoderInitDestroy); static void BM_HpackEncoderEncodeDeadline(benchmark::State& state) { TrackCounters track_counters; grpc_core::ExecCtx exec_ctx; - grpc_core::Timestamp saved_now = grpc_core::ExecCtx::Get()->Now(); + grpc_core::Timestamp saved_now = grpc_core::Timestamp::Now(); auto arena = grpc_core::MakeScopedArena(1024, g_memory_allocator); grpc_metadata_batch b(arena.get()); diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index d28784db0d1..51db720b9f0 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -221,7 +221,7 @@ void MaybePollArbitraryPollsetTwice() { gpr_mu_lock(mu); GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(pollset, &worker, grpc_core::ExecCtx::Get()->Now())); + grpc_pollset_work(pollset, &worker, grpc_core::Timestamp::Now())); gpr_mu_unlock(mu); grpc_core::ExecCtx::Get()->Flush(); // Make a second zero-timeout poll (in case the first one @@ -229,7 +229,7 @@ void MaybePollArbitraryPollsetTwice() { gpr_mu_lock(mu); GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(pollset, &worker, grpc_core::ExecCtx::Get()->Now())); + grpc_pollset_work(pollset, &worker, grpc_core::Timestamp::Now())); gpr_mu_unlock(mu); grpc_core::ExecCtx::Get()->Flush(); grpc_pollset_destroy(pollset);