[time] Introduce time sources (#30815)

* [time] Introduce time sources

* make import trivial

* Automated change: Fix sanity tests

* fix

* Automated change: Fix sanity tests

* fix

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* review feedback

* fix

* Automated change: Fix sanity tests

* ios fix

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31008/head^2
Craig Tiller 2 years ago committed by GitHub
parent ead11434c3
commit 96f5cddb5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      BUILD
  2. 6
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  3. 7
      src/core/ext/filters/client_channel/backup_poller.cc
  4. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 10
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  7. 22
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  8. 3
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  9. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  10. 13
      src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc
  11. 10
      src/core/ext/filters/client_channel/resolver/polling_resolver.cc
  12. 7
      src/core/ext/filters/client_channel/retry_filter.cc
  13. 6
      src/core/ext/filters/client_channel/subchannel.cc
  14. 3
      src/core/ext/filters/client_channel/subchannel_stream_client.cc
  15. 3
      src/core/ext/filters/fault_injection/fault_injection_filter.cc
  16. 4
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  17. 13
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  18. 5
      src/core/ext/transport/chttp2/transport/flow_control.cc
  19. 3
      src/core/ext/transport/chttp2/transport/frame_ping.cc
  20. 3
      src/core/ext/transport/chttp2/transport/hpack_encoder.cc
  21. 2
      src/core/ext/transport/chttp2/transport/writing.cc
  22. 7
      src/core/ext/xds/xds_client.cc
  23. 3
      src/core/ext/xds/xds_client.h
  24. 6
      src/core/lib/backoff/backoff.cc
  25. 7
      src/core/lib/channel/channel_trace.cc
  26. 2
      src/core/lib/gpr/time_precise.h
  27. 21
      src/core/lib/gprpp/time.cc
  28. 49
      src/core/lib/gprpp/time.h
  29. 2
      src/core/lib/iomgr/ev_epoll1_linux.cc
  30. 4
      src/core/lib/iomgr/ev_poll_posix.cc
  31. 8
      src/core/lib/iomgr/exec_ctx.cc
  32. 32
      src/core/lib/iomgr/exec_ctx.h
  33. 2
      src/core/lib/iomgr/iocp_windows.cc
  34. 2
      src/core/lib/iomgr/tcp_posix.cc
  35. 8
      src/core/lib/iomgr/timer_generic.cc
  36. 3
      src/core/lib/iomgr/timer_manager.cc
  37. 4
      src/core/lib/promise/sleep.cc
  38. 5
      src/core/lib/resource_quota/periodic_update.cc
  39. 2
      src/core/lib/security/credentials/google_default/google_default_credentials.cc
  40. 4
      src/core/lib/security/credentials/jwt/jwt_verifier.cc
  41. 3
      src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
  42. 8
      src/core/lib/surface/completion_queue.cc
  43. 4
      src/core/lib/transport/bdp_estimator.cc
  44. 5
      src/core/lib/transport/metadata_batch.cc
  45. 4
      src/core/lib/transport/status_conversion.cc
  46. 10
      test/core/backoff/backoff_test.cc
  47. 10
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  48. 4
      test/core/end2end/fixtures/http_proxy_fixture.cc
  49. 8
      test/core/end2end/fuzzers/api_fuzzer.cc
  50. 6
      test/core/end2end/fuzzers/server_fuzzer.cc
  51. 4
      test/core/event_engine/posix/timer_manager_test.cc
  52. 4
      test/core/iomgr/endpoint_tests.cc
  53. 3
      test/core/iomgr/resolve_address_posix_test.cc
  54. 3
      test/core/iomgr/resolve_address_test.cc
  55. 2
      test/core/iomgr/tcp_server_posix_test.cc
  56. 4
      test/core/iomgr/timer_list_test.cc
  57. 17
      test/core/promise/sleep_test.cc
  58. 20
      test/core/resource_quota/periodic_update_test.cc
  59. 2
      test/core/security/ssl_server_fuzzer.cc
  60. 4
      test/core/surface/concurrent_connectivity_test.cc
  61. 6
      test/core/transport/binder/end2end/fuzzers/server_fuzzer.cc
  62. 4
      test/core/transport/chttp2/flow_control_fuzzer.cc
  63. 14
      test/core/transport/chttp2/settings_timeout_test.cc
  64. 2
      test/core/util/passthru_endpoint.cc
  65. 10
      test/core/util/port_server_client.cc
  66. 15
      test/cpp/common/time_jump_test.cc
  67. 53
      test/cpp/common/timer_test.cc
  68. 2
      test/cpp/end2end/connection_attempt_injector.cc
  69. 2
      test/cpp/microbenchmarks/bm_chttp2_hpack.cc
  70. 4
      test/cpp/naming/cancel_ares_query_test.cc

14
BUILD

@ -2024,7 +2024,6 @@ grpc_cc_library(
],
external_deps = ["absl/functional:function_ref"],
deps = [
"exec_ctx",
"gpr_platform",
"time",
"useful",
@ -2199,10 +2198,15 @@ grpc_cc_library(
hdrs = [
"src/core/lib/gprpp/time.h",
],
external_deps = ["absl/strings:str_format"],
external_deps = [
"absl/strings:str_format",
"absl/types:optional",
],
deps = [
"event_engine_base_hdrs",
"gpr",
"gpr_tls",
"no_destruct",
"useful",
],
)
@ -3003,7 +3007,6 @@ grpc_cc_library(
],
hdrs = ["src/core/lib/transport/bdp_estimator.h"],
deps = [
"exec_ctx",
"gpr",
"grpc_trace",
"time",
@ -3053,7 +3056,6 @@ grpc_cc_library(
language = "c++",
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"exec_ctx",
"gpr_platform",
"time",
],
@ -4974,7 +4976,6 @@ grpc_cc_library(
"closure",
"config",
"debug_location",
"exec_ctx",
"gpr",
"grpc_base",
"grpc_client_channel",
@ -5184,7 +5185,6 @@ grpc_cc_library(
"config",
"debug_location",
"env",
"exec_ctx",
"gpr",
"grpc_base",
"grpc_client_channel",
@ -5227,7 +5227,6 @@ grpc_cc_library(
"closure",
"config",
"debug_location",
"exec_ctx",
"gpr",
"grpc_base",
"grpc_client_channel",
@ -6819,7 +6818,6 @@ grpc_cc_library(
],
deps = [
"bdp_estimator",
"exec_ctx",
"experiments",
"gpr",
"grpc_trace",

@ -167,7 +167,7 @@ void MaxAgeFilter::PostInit() {
max_age_activity_.Set(MakeActivity(
TrySeq(
// First sleep until the max connection age
Sleep(ExecCtx::Get()->Now() + max_connection_age_),
Sleep(Timestamp::Now() + max_connection_age_),
// Then send a goaway.
[this] {
GRPC_CHANNEL_STACK_REF(this->channel_stack(),
@ -192,7 +192,7 @@ void MaxAgeFilter::PostInit() {
},
// Sleep for the grace period
[this] {
return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_);
return Sleep(Timestamp::Now() + max_connection_age_grace_);
}),
ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) {
// OnDone -- close the connection if the promise completed
@ -246,7 +246,7 @@ void ChannelIdleFilter::StartIdleTimer() {
auto channel_stack = channel_stack_->Ref();
auto timeout = client_idle_timeout_;
auto promise = Loop([timeout, idle_filter_state]() {
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout),
return TrySeq(Sleep(Timestamp::Now() + timeout),
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
if (idle_filter_state->CheckTimer()) {
return Continue{};

@ -33,7 +33,6 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
@ -132,11 +131,11 @@ static void run_poller(void* arg, grpc_error_handle error) {
return;
}
grpc_error_handle err =
grpc_pollset_work(p->pollset, nullptr, grpc_core::ExecCtx::Get()->Now());
grpc_pollset_work(p->pollset, nullptr, grpc_core::Timestamp::Now());
gpr_mu_unlock(p->pollset_mu);
GRPC_LOG_IF_ERROR("Run client channel backup poller", err);
grpc_timer_init(&p->polling_timer,
grpc_core::ExecCtx::Get()->Now() + g_poll_interval,
grpc_core::Timestamp::Now() + g_poll_interval,
&p->run_poller_closure);
}
@ -153,7 +152,7 @@ static void g_poller_init_locked() {
GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&g_poller->polling_timer,
grpc_core::ExecCtx::Get()->Now() + g_poll_interval,
grpc_core::Timestamp::Now() + g_poll_interval,
&g_poller->run_poller_closure);
}
}

@ -868,7 +868,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
const Timestamp deadline =
grpclb_policy()->lb_call_timeout_ == Duration::Zero()
? Timestamp::InfFuture()
: ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_;
: Timestamp::Now() + grpclb_policy()->lb_call_timeout_;
lb_call_ = grpc_channel_create_pollset_set_call(
grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
grpclb_policy_->interested_parties(),
@ -1548,7 +1548,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (is_initial_update) {
fallback_at_startup_checks_pending_ = true;
// Start timer.
Timestamp deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
@ -1643,7 +1643,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
Timestamp next_try = lb_call_backoff_.NextAttemptTime();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
Duration timeout = next_try - ExecCtx::Get()->Now();
Duration timeout = next_try - Timestamp::Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
this, timeout.millis());
@ -1813,7 +1813,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
void GrpcLb::CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel) {
Timestamp deletion_time = ExecCtx::Get()->Now() + subchannel_cache_interval_;
Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
cached_subchannels_[deletion_time].push_back(std::move(subchannel));
if (!subchannel_cache_timer_pending_) {
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();

@ -58,7 +58,6 @@
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
@ -296,7 +295,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
base_ejection_time_in_millis * multiplier_,
std::max(base_ejection_time_in_millis,
max_ejection_time_in_millis)));
if (change_time < ExecCtx::Get()->Now()) {
if (change_time < Timestamp::Now()) {
Uneject();
return true;
}
@ -615,8 +614,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this);
}
ejection_timer_ =
MakeOrphanable<EjectionTimer>(Ref(), ExecCtx::Get()->Now());
ejection_timer_ = MakeOrphanable<EjectionTimer>(Ref(), Timestamp::Now());
for (const auto& p : subchannel_state_map_) {
p.second->RotateBucket(); // Reset call counters.
}
@ -835,7 +833,7 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) {
std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
size_t ejected_host_count = 0;
double success_rate_sum = 0;
auto time_now = ExecCtx::Get()->Now();
auto time_now = Timestamp::Now();
auto& config = parent_->config_->outlier_detection_config();
for (auto& state : parent_->subchannel_state_map_) {
auto* subchannel_state = state.second.get();
@ -1006,7 +1004,7 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked(grpc_error_handle error) {
}
timer_pending_ = false;
parent_->ejection_timer_ =
MakeOrphanable<EjectionTimer>(parent_, ExecCtx::Get()->Now());
MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
}
Unref(DEBUG_LOCATION, "Timer");
GRPC_ERROR_UNREF(error);

@ -51,7 +51,6 @@
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
@ -532,7 +531,7 @@ PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval,
grpc_timer_init(&timer_, Timestamp::Now() + kChildRetentionInterval,
&on_timer_);
}
@ -594,7 +593,7 @@ PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() +
Timestamp::Now() +
child_priority_->priority_policy_->child_failover_timeout_,
&on_timer_);
}

@ -1014,7 +1014,7 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
gpr_log(GPR_INFO, "[rlslb %p] picker=%p: request keys: %s",
lb_policy_.get(), this, key.ToString().c_str());
}
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
MutexLock lock(&lb_policy_->mu_);
if (lb_policy_->is_shutdown_) {
return PickResult::Fail(
@ -1164,7 +1164,7 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "CacheEntry" : nullptr),
lb_policy_(std::move(lb_policy)),
backoff_state_(MakeCacheEntryBackoff()),
min_expiration_time_(ExecCtx::Get()->Now() + kMinExpirationTime),
min_expiration_time_(Timestamp::Now() + kMinExpirationTime),
lru_iterator_(lb_policy_->cache_.lru_list_.insert(
lb_policy_->cache_.lru_list_.end(), key)) {}
@ -1242,12 +1242,12 @@ void RlsLb::Cache::Entry::ResetBackoff() {
}
bool RlsLb::Cache::Entry::ShouldRemove() const {
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
return data_expiration_time_ < now && backoff_expiration_time_ < now;
}
bool RlsLb::Cache::Entry::CanEvict() const {
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
return min_expiration_time_ < now;
}
@ -1273,7 +1273,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
backoff_state_ = MakeCacheEntryBackoff();
}
backoff_time_ = backoff_state_->NextAttemptTime();
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
backoff_timer_ = MakeOrphanable<BackoffTimer>(
Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
@ -1282,7 +1282,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
}
// Request succeeded, so store the result.
header_data_ = std::move(response.header_data);
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
data_expiration_time_ = now + lb_policy_->config_->max_age();
stale_time_ = now + lb_policy_->config_->stale_age();
status_ = absl::OkStatus();
@ -1348,7 +1348,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
//
RlsLb::Cache::Cache(RlsLb* lb_policy) : lb_policy_(lb_policy) {
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
lb_policy_->Ref(DEBUG_LOCATION, "CacheCleanupTimer").release();
GRPC_CLOSURE_INIT(&timer_callback_, OnCleanupTimer, this, nullptr);
grpc_timer_init(&cleanup_timer_, now + kCacheCleanupTimerInterval,
@ -1431,7 +1431,7 @@ void RlsLb::Cache::OnCleanupTimer(void* arg, grpc_error_handle error) {
++it;
}
}
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
lb_policy.release();
grpc_timer_init(&cache->cleanup_timer_,
now + kCacheCleanupTimerInterval,
@ -1500,7 +1500,7 @@ void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
//
bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
while (!requests_.empty() && now - requests_.front() > window_size_) {
requests_.pop_front();
}
@ -1528,7 +1528,7 @@ bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
}
void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
requests_.push_back(now);
if (!success) failures_.push_back(now);
}
@ -1708,7 +1708,7 @@ void RlsLb::RlsRequest::StartCallLocked() {
MutexLock lock(&lb_policy_->mu_);
if (lb_policy_->is_shutdown_) return;
}
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
deadline_ = now + lb_policy_->config_->lookup_service_timeout();
grpc_metadata_array_init(&recv_initial_metadata_);
grpc_metadata_array_init(&recv_trailing_metadata_);

@ -50,7 +50,6 @@
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
@ -530,7 +529,7 @@ void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
// Start a timer to delete the child.
Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
grpc_timer_init(&delayed_removal_timer_,
ExecCtx::Get()->Now() +
Timestamp::Now() +
Duration::Milliseconds(
GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS),
&on_delayed_removal_timer_);

@ -284,7 +284,7 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm(
"request:%p ev_driver=%p. next ares process poll time in "
"%" PRId64 " ms",
driver->request, driver, until_next_ares_backup_poll_alarm.millis());
return grpc_core::ExecCtx::Get()->Now() + until_next_ares_backup_poll_alarm;
return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm;
}
static void on_timeout(void* arg, grpc_error_handle error) {
@ -496,7 +496,7 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver)
GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&ev_driver->query_timeout,
grpc_core::ExecCtx::Get()->Now() + timeout,
grpc_core::Timestamp::Now() + timeout,
&ev_driver->on_timeout_locked);
// Initialize the backup poll alarm
grpc_core::Timestamp next_ares_backup_poll_alarm =

@ -52,7 +52,6 @@
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/resolver/resolver.h"
@ -167,12 +166,12 @@ GoogleCloud2ProdResolver::MetadataQuery::MetadataQuery(
const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA),
resolver_->resource_quota_.get(), grpc_resource_quota_arg_vtable());
grpc_channel_args args = {1, &resource_quota_arg};
http_request_ = HttpRequest::Get(
std::move(*uri), &args, pollent, &request,
ExecCtx::Get()->Now() + Duration::Seconds(10), // 10s timeout
&on_done_, &response_,
RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create()));
http_request_ =
HttpRequest::Get(std::move(*uri), &args, pollent, &request,
Timestamp::Now() + Duration::Seconds(10), // 10s timeout
&on_done_, &response_,
RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create()));
http_request_->Start();
}

@ -190,7 +190,7 @@ void PollingResolver::GetResultStatus(absl::Status status) {
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
Timestamp next_try = backoff_.NextAttemptTime();
Duration timeout = next_try - ExecCtx::Get()->Now();
Duration timeout = next_try - Timestamp::Now();
GPR_ASSERT(!have_next_resolution_timer_);
have_next_resolution_timer_ = true;
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
@ -223,11 +223,11 @@ void PollingResolver::MaybeStartResolvingLocked() {
const Timestamp earliest_next_resolution =
*last_resolution_timestamp_ + min_time_between_resolutions_;
const Duration time_until_next_resolution =
earliest_next_resolution - ExecCtx::Get()->Now();
earliest_next_resolution - Timestamp::Now();
if (time_until_next_resolution > Duration::Zero()) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
const Duration last_resolution_ago =
ExecCtx::Get()->Now() - *last_resolution_timestamp_;
Timestamp::Now() - *last_resolution_timestamp_;
gpr_log(GPR_INFO,
"[polling resolver %p] in cooldown from last resolution "
"(from %" PRId64 " ms ago); will resolve again in %" PRId64
@ -239,7 +239,7 @@ void PollingResolver::MaybeStartResolvingLocked() {
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release();
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
grpc_timer_init(&next_resolution_timer_,
ExecCtx::Get()->Now() + time_until_next_resolution,
Timestamp::Now() + time_until_next_resolution,
&on_next_resolution_);
return;
}
@ -249,7 +249,7 @@ void PollingResolver::MaybeStartResolvingLocked() {
void PollingResolver::StartResolvingLocked() {
request_ = StartRequest();
last_resolution_timestamp_ = ExecCtx::Get()->Now();
last_resolution_timestamp_ = Timestamp::Now();
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] starting resolution, request_=%p",
this, request_.get());

@ -725,8 +725,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
if (calld->retry_policy_ != nullptr &&
calld->retry_policy_->per_attempt_recv_timeout().has_value()) {
Timestamp per_attempt_recv_deadline =
ExecCtx::Get()->Now() +
*calld->retry_policy_->per_attempt_recv_timeout();
Timestamp::Now() + *calld->retry_policy_->per_attempt_recv_timeout();
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64
@ -2604,7 +2603,7 @@ void RetryFilter::CallData::StartRetryTimer(
Timestamp next_attempt_time;
if (server_pushback.has_value()) {
GPR_ASSERT(*server_pushback >= Duration::Zero());
next_attempt_time = ExecCtx::Get()->Now() + *server_pushback;
next_attempt_time = Timestamp::Now() + *server_pushback;
retry_backoff_.Reset();
} else {
next_attempt_time = retry_backoff_.NextAttemptTime();
@ -2612,7 +2611,7 @@ void RetryFilter::CallData::StartRetryTimer(
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
this, (next_attempt_time - ExecCtx::Get()->Now()).millis());
this, (next_attempt_time - Timestamp::Now()).millis());
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr);

@ -767,7 +767,7 @@ void Subchannel::ResetBackoff() {
GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked();
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now();
next_attempt_time_ = Timestamp::Now();
}
}
@ -878,7 +878,7 @@ void Subchannel::OnRetryTimerLocked() {
void Subchannel::StartConnectingLocked() {
// Set next attempt time.
const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now();
const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now();
next_attempt_time_ = backoff_.NextAttemptTime();
// Report CONNECTING.
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
@ -913,7 +913,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
// transition back to IDLE.
if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
const Duration time_until_next_attempt =
next_attempt_time_ - ExecCtx::Get()->Now();
next_attempt_time_ - Timestamp::Now();
gpr_log(GPR_INFO,
"subchannel %p %s: connect failed (%s), backing off for %" PRId64
" ms",

@ -32,7 +32,6 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/transport/error_utils.h"
@ -128,7 +127,7 @@ void SubchannelStreamClient::StartRetryTimerLocked() {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...",
tracer_, this);
Duration timeout = next_try - ExecCtx::Get()->Now();
Duration timeout = next_try - Timestamp::Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_,
this, timeout.millis());

@ -42,7 +42,6 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
@ -251,7 +250,7 @@ bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota() const {
Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() {
if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota()) {
active_fault_ = FaultHandle{true};
return ExecCtx::Get()->Now() + delay_time_;
return Timestamp::Now() + delay_time_;
}
return Timestamp::InfPast();
}

@ -358,7 +358,7 @@ void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
//
Timestamp GetConnectionDeadline(const ChannelArgs& args) {
return ExecCtx::Get()->Now() +
return Timestamp::Now() +
std::max(
Duration::Milliseconds(1),
args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)
@ -587,7 +587,7 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() {
this, nullptr);
grpc_timer_init(
&drain_grace_timer_,
ExecCtx::Get()->Now() +
Timestamp::Now() +
std::max(
Duration::Zero(),
listener_->args_

@ -442,7 +442,7 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
} else {
// Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
@ -1602,8 +1602,7 @@ class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
Ref().release(); // Ref for the timer
grpc_timer_init(
&timer_,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(20),
&timer_, grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(20),
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr));
}
@ -2647,7 +2646,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
} else if (error == GRPC_ERROR_CANCELLED) {
@ -2661,7 +2660,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
@ -2690,7 +2689,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_watchdog_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
grpc_core::Timestamp::Now() + t->keepalive_timeout,
&t->keepalive_watchdog_fired_locked);
t->keepalive_ping_started = true;
}
@ -2726,7 +2725,7 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
}

@ -38,7 +38,6 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
@ -114,7 +113,7 @@ TransportFlowControl::TransportFlowControl(const char* name,
.set_min_control_value(-1)
.set_max_control_value(25)
.set_integral_range(10)),
last_pid_update_(ExecCtx::Get()->Now()) {}
last_pid_update_(Timestamp::Now()) {}
uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
const uint32_t target_announced_window =
@ -207,7 +206,7 @@ double TransportFlowControl::TargetLogBdp() {
}
double TransportFlowControl::SmoothLogBdp(double value) {
Timestamp now = ExecCtx::Get()->Now();
Timestamp now = Timestamp::Now();
double bdp_error = value - pid_controller_.last_control_value();
const double dt = (now - last_pid_update_).seconds();
last_pid_update_ = now;

@ -32,7 +32,6 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
static bool g_disable_ping_ack = false;
@ -95,7 +94,7 @@ grpc_error_handle grpc_chttp2_ping_parser_parse(void* parser,
grpc_chttp2_ack_ping(t, p->opaque_8bytes);
} else {
if (!t->is_client) {
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
grpc_core::Timestamp next_allowed_ping =
t->ping_recv_state.last_ping_recv_time +
t->ping_policy.min_recv_ping_interval_without_data;

@ -33,7 +33,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_constants.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder_table.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
@ -519,7 +518,7 @@ void HPackCompressor::Framer::EncodeRepeatingSliceValue(
}
void HPackCompressor::Framer::Encode(GrpcTimeoutMetadata, Timestamp deadline) {
Timeout timeout = Timeout::FromDuration(deadline - ExecCtx::Get()->Now());
Timeout timeout = Timeout::FromDuration(deadline - Timestamp::Now());
for (auto it = compressor_->previous_timeouts_.begin();
it != compressor_->previous_timeouts_.end(); ++it) {
double ratio = timeout.RatioVersus(it->timeout);

@ -111,7 +111,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
// in a loop while draining the currently-held combiner. Also see
// https://github.com/grpc/grpc/issues/26079.
grpc_core::ExecCtx::Get()->InvalidateNow();
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
grpc_core::Duration next_allowed_ping_interval = grpc_core::Duration::Zero();
if (t->is_client) {

@ -46,6 +46,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/uri/uri_parser.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -162,7 +163,7 @@ class XdsClient::ChannelState::AdsCallState
XdsClient* xds_client() const { return ads_call_state_->xds_client(); }
AdsCallState* ads_call_state_;
const Timestamp update_time_ = ExecCtx::Get()->Now();
const Timestamp update_time_ = Timestamp::Now();
Result result_;
};
@ -626,7 +627,7 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
if (shutting_down_) return;
const Timestamp next_attempt_time = backoff_.NextAttemptTime();
const Duration timeout =
std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero());
std::max(next_attempt_time - Timestamp::Now(), Duration::Zero());
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: call attempt failed; "
@ -1958,7 +1959,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
}
}
// Compute load report interval.
const Timestamp now = ExecCtx::Get()->Now();
const Timestamp now = Timestamp::Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.

@ -45,7 +45,6 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
@ -252,7 +251,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
Timestamp last_report_time = ExecCtx::Get()->Now();
Timestamp last_report_time = Timestamp::Now();
};
// Load report data.

@ -22,8 +22,6 @@
#include <algorithm>
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
BackOff::BackOff(const Options& options) : options_(options) { Reset(); }
@ -31,14 +29,14 @@ BackOff::BackOff(const Options& options) : options_(options) { Reset(); }
Timestamp BackOff::NextAttemptTime() {
if (initial_) {
initial_ = false;
return current_backoff_ + ExecCtx::Get()->Now();
return current_backoff_ + Timestamp::Now();
}
current_backoff_ = std::min(current_backoff_ * options_.multiplier(),
options_.max_backoff());
const Duration jitter = Duration::FromSecondsAsDouble(
absl::Uniform(rand_gen_, -options_.jitter() * current_backoff_.seconds(),
options_.jitter() * current_backoff_.seconds()));
return ExecCtx::Get()->Now() + current_backoff_ + jitter;
return Timestamp::Now() + current_backoff_ + jitter;
}
void BackOff::Reset() {

@ -30,7 +30,6 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_refcount.h"
@ -41,7 +40,7 @@ ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data,
RefCountedPtr<BaseNode> referenced_entity)
: severity_(severity),
data_(data),
timestamp_(ExecCtx::Get()->Now().as_timespec(GPR_CLOCK_REALTIME)),
timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)),
next_(nullptr),
referenced_entity_(std::move(referenced_entity)),
memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)) {}
@ -49,7 +48,7 @@ ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data,
ChannelTrace::TraceEvent::TraceEvent(Severity severity, const grpc_slice& data)
: severity_(severity),
data_(data),
timestamp_(ExecCtx::Get()->Now().as_timespec(GPR_CLOCK_REALTIME)),
timestamp_(Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME)),
next_(nullptr),
memory_usage_(sizeof(TraceEvent) + grpc_slice_memory_usage(data)) {}
@ -65,7 +64,7 @@ ChannelTrace::ChannelTrace(size_t max_event_memory)
return; // tracing is disabled if max_event_memory_ == 0
}
gpr_mu_init(&tracer_mu_);
time_created_ = ExecCtx::Get()->Now().as_timespec(GPR_CLOCK_REALTIME);
time_created_ = Timestamp::Now().as_timespec(GPR_CLOCK_REALTIME);
}
ChannelTrace::~ChannelTrace() {

@ -28,7 +28,7 @@
// low as a usec. Use other clock sources or gpr_precise_clock_now(),
// where you need high resolution clocks.
//
// Using gpr_get_cycle_counter() is preferred to using ExecCtx::Get()->Now()
// Using gpr_get_cycle_counter() is preferred to using Timestamp::Now()
// whenever possible.
#if GPR_CYCLE_COUNTER_CUSTOM

@ -28,6 +28,8 @@
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/no_destruct.h"
namespace grpc_core {
namespace {
@ -35,6 +37,13 @@ namespace {
std::atomic<int64_t> g_process_epoch_seconds;
std::atomic<gpr_cycle_counter> g_process_epoch_cycles;
class GprNowTimeSource final : public Timestamp::Source {
public:
Timestamp Now() override {
return Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
}
};
GPR_ATTRIBUTE_NOINLINE std::pair<int64_t, gpr_cycle_counter> InitTime() {
gpr_cycle_counter cycles_start = 0;
gpr_cycle_counter cycles_end = 0;
@ -133,6 +142,18 @@ int64_t TimespanToMillisRoundDown(gpr_timespec ts) {
} // namespace
GPR_THREAD_LOCAL(Timestamp::Source*)
Timestamp::thread_local_time_source_{
NoDestructSingleton<GprNowTimeSource>::Get()};
Timestamp ScopedTimeCache::Now() {
if (!cached_time_.has_value()) {
previous()->InvalidateCache();
cached_time_ = previous()->Now();
}
return cached_time_.value();
}
Timestamp Timestamp::FromTimespecRoundUp(gpr_timespec ts) {
return FromMillisecondsAfterProcessEpoch(TimespanToMillisRoundUp(gpr_time_sub(
gpr_convert_clock_type(ts, GPR_CLOCK_MONOTONIC), StartTime())));

@ -23,11 +23,14 @@
#include <ostream>
#include <string>
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
namespace grpc_core {
@ -61,6 +64,35 @@ class Duration;
// Timestamp represents a discrete point in time.
class Timestamp {
public:
// Base interface for time providers.
class Source {
public:
// Return the current time.
virtual Timestamp Now() = 0;
virtual void InvalidateCache() {}
protected:
// We don't delete through this interface, so non-virtual dtor is fine.
~Source() = default;
};
class ScopedSource : public Source {
public:
ScopedSource() : previous_(thread_local_time_source_) {
thread_local_time_source_ = this;
}
ScopedSource(const ScopedSource&) = delete;
ScopedSource& operator=(const ScopedSource&) = delete;
void InvalidateCache() override { previous_->InvalidateCache(); }
protected:
~ScopedSource() { thread_local_time_source_ = previous_; }
Source* previous() const { return previous_; }
private:
Source* const previous_;
};
constexpr Timestamp() = default;
// Constructs a Timestamp from a gpr_timespec.
static Timestamp FromTimespecRoundDown(gpr_timespec t);
@ -70,6 +102,8 @@ class Timestamp {
static Timestamp FromCycleCounterRoundUp(gpr_cycle_counter c);
static Timestamp FromCycleCounterRoundDown(gpr_cycle_counter c);
static Timestamp Now() { return thread_local_time_source_->Now(); }
static constexpr Timestamp FromMillisecondsAfterProcessEpoch(int64_t millis) {
return Timestamp(millis);
}
@ -116,6 +150,21 @@ class Timestamp {
explicit constexpr Timestamp(int64_t millis) : millis_(millis) {}
int64_t millis_ = 0;
static GPR_THREAD_LOCAL(Timestamp::Source*) thread_local_time_source_;
};
class ScopedTimeCache final : public Timestamp::ScopedSource {
public:
Timestamp Now() override;
void InvalidateCache() override {
cached_time_ = absl::nullopt;
Timestamp::ScopedSource::InvalidateCache();
}
void TestOnlySetNow(Timestamp now) { cached_time_ = now; }
private:
absl::optional<Timestamp> cached_time_;
};
// Duration represents a span of time.

@ -633,7 +633,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) {
if (millis == grpc_core::Timestamp::InfFuture()) return -1;
int64_t delta = (millis - grpc_core::ExecCtx::Get()->Now()).millis();
int64_t delta = (millis - grpc_core::Timestamp::Now()).millis();
if (delta > INT_MAX) {
return INT_MAX;
} else if (delta < 0) {

@ -947,7 +947,7 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset,
while (keep_polling) {
keep_polling = 0;
if (!pollset->kicked_without_pollers ||
deadline <= grpc_core::ExecCtx::Get()->Now()) {
deadline <= grpc_core::Timestamp::Now()) {
if (!added_worker) {
push_front_worker(pollset, &worker);
added_worker = 1;
@ -1145,7 +1145,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
if (deadline == grpc_core::Timestamp::InfFuture()) return -1;
if (deadline.is_process_epoch()) return 0;
int64_t n = (deadline - grpc_core::ExecCtx::Get()->Now()).millis();
int64_t n = (deadline - grpc_core::Timestamp::Now()).millis();
if (n < 0) return 0;
if (n > INT_MAX) return -1;
return static_cast<int>(n);

@ -77,14 +77,6 @@ bool ExecCtx::Flush() {
return did_something;
}
Timestamp ExecCtx::Now() {
if (!now_is_valid_) {
now_ = Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
now_is_valid_ = true;
}
return now_;
}
void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure,
grpc_error_handle error) {
(void)location;

@ -176,30 +176,14 @@ class ExecCtx {
}
}
/** Returns the stored current time relative to start if valid,
* otherwise refreshes the stored time, sets it valid and returns the new
* value.
*/
Timestamp Now();
/** Invalidates the stored time value. A new time value will be set on calling
* Now().
*/
void InvalidateNow() { now_is_valid_ = false; }
/** To be used only by shutdown code in iomgr */
Timestamp Now() { return Timestamp::Now(); }
void InvalidateNow() { time_cache_.InvalidateCache(); }
void SetNowIomgrShutdown() {
now_ = Timestamp::InfFuture();
now_is_valid_ = true;
}
/** To be used only for testing.
* Sets the now value.
*/
void TestOnlySetNow(Timestamp new_val) {
now_ = new_val;
now_is_valid_ = true;
// We get to do a test only set now on this path just because iomgr
// is getting removed and no point adding more interfaces for it.
time_cache_.TestOnlySetNow(Timestamp::InfFuture());
}
void TestOnlySetNow(Timestamp now) { time_cache_.TestOnlySetNow(now); }
/** Gets pointer to current exec_ctx. */
static ExecCtx* Get() { return exec_ctx_; }
@ -226,9 +210,7 @@ class ExecCtx {
unsigned starting_cpu_ = std::numeric_limits<unsigned>::max();
bool now_is_valid_ = false;
Timestamp now_;
ScopedTimeCache time_cache_;
static GPR_THREAD_LOCAL(ExecCtx*) exec_ctx_;
ExecCtx* last_exec_ctx_ = Get();
};

@ -48,7 +48,7 @@ static DWORD deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
if (deadline == grpc_core::Timestamp::InfFuture()) {
return INFINITE;
}
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
if (deadline < now) return 0;
grpc_core::Duration timeout = deadline - now;
if (timeout.millis() > std::numeric_limits<DWORD>::max()) return INFINITE;

@ -582,7 +582,7 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
}
gpr_mu_lock(p->pollset_mu);
grpc_core::Timestamp deadline =
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(10);
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10);
GRPC_LOG_IF_ERROR(
"backup_poller:pollset_work",
grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline));

@ -253,7 +253,7 @@ static void timer_list_init() {
g_shared_mutables.initialized = true;
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu);
g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
g_shared_mutables.min_timer = grpc_core::Timestamp::Now();
g_last_seen_min_timer = 0;
@ -343,7 +343,7 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
timer, deadline.milliseconds_after_process_epoch(),
grpc_core::ExecCtx::Get()->Now().milliseconds_after_process_epoch(),
grpc_core::Timestamp::Now().milliseconds_after_process_epoch(),
closure, closure->cb);
}
@ -358,7 +358,7 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
gpr_mu_lock(&shard->mu);
timer->pending = true;
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
if (deadline <= now) {
timer->pending = false;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
@ -665,7 +665,7 @@ static grpc_timer_check_result run_some_expired_timers(
static grpc_timer_check_result timer_check(grpc_core::Timestamp* next) {
// prelude
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
/* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */

@ -184,8 +184,7 @@ static bool wait_until(grpc_core::Timestamp next) {
g_timed_waiter_deadline = next;
if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
grpc_core::Duration wait_time =
next - grpc_core::ExecCtx::Get()->Now();
grpc_core::Duration wait_time = next - grpc_core::Timestamp::Now();
gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds",
wait_time.millis());
}

@ -41,7 +41,7 @@ Poll<absl::Status> Sleep::operator()() {
// TODO(ctiller): the following can be safely removed when we remove ExecCtx.
ExecCtx::Get()->InvalidateNow();
// If the deadline is earlier than now we can just return.
if (deadline_ <= ExecCtx::Get()->Now()) return absl::OkStatus();
if (deadline_ <= Timestamp::Now()) return absl::OkStatus();
if (closure_ == nullptr) {
// TODO(ctiller): it's likely we'll want a pool of closures - probably per
// cpu? - to avoid allocating/deallocating on fast paths.
@ -54,7 +54,7 @@ Poll<absl::Status> Sleep::operator()() {
Sleep::ActiveClosure::ActiveClosure(Timestamp deadline)
: waker_(Activity::current()->MakeOwningWaker()),
timer_handle_(GetDefaultEventEngine()->RunAfter(
deadline - ExecCtx::Get()->Now(), this)) {}
deadline - Timestamp::Now(), this)) {}
void Sleep::ActiveClosure::Run() {
ApplicationCallbackExecCtx callback_exec_ctx;

@ -19,13 +19,12 @@
#include <atomic>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef<void(Duration)> f) {
if (period_start_ == Timestamp::ProcessEpoch()) {
period_start_ = ExecCtx::Get()->Now();
period_start_ = Timestamp::Now();
updates_remaining_.store(1, std::memory_order_release);
return false;
}
@ -34,7 +33,7 @@ bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef<void(Duration)> f) {
// We can now safely mutate any non-atomic mutable variables (we've got a
// guarantee that no other thread will), and by the time this function returns
// we must store a postive number into updates_remaining_.
auto now = ExecCtx::Get()->Now();
auto now = Timestamp::Now();
Duration time_so_far = now - period_start_;
if (time_so_far < period_) {
// At most double the number of updates remaining until the next period.

@ -218,7 +218,7 @@ static int is_metadata_server_reachable() {
GPR_ASSERT(uri.ok()); // params are hardcoded
auto http_request = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, &detector.pollent, &request,
grpc_core::ExecCtx::Get()->Now() + max_detection_delay,
grpc_core::Timestamp::Now() + max_detection_delay,
GRPC_CLOSURE_CREATE(on_metadata_server_detection_http_response, &detector,
grpc_schedule_on_exec_ctx),
&detector.response,

@ -733,7 +733,7 @@ static void on_openid_config_retrieved(void* user_data,
}
ctx->http_request = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, &ctx->pollent, &req,
grpc_core::ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay,
grpc_core::Timestamp::Now() + grpc_jwt_verifier_max_delay,
GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx),
&ctx->responses[HTTP_RESPONSE_KEYS],
grpc_core::CreateHttpRequestSSLCredentials());
@ -864,7 +864,7 @@ static void retrieve_key_and_verify(verifier_cb_ctx* ctx) {
}
ctx->http_request = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, &ctx->pollent, &req,
grpc_core::ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, http_cb,
grpc_core::Timestamp::Now() + grpc_jwt_verifier_max_delay, http_cb,
&ctx->responses[rsp_idx], grpc_core::CreateHttpRequestSSLCredentials());
ctx->http_request->Start();
gpr_free(host);

@ -49,7 +49,6 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/http/httpcli_ssl_credentials.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
@ -335,7 +334,7 @@ grpc_oauth2_token_fetcher_credentials::GetRequestMetadata(
if (start_fetch) {
fetch_oauth2(new grpc_credentials_metadata_request(Ref()), &pollent_,
on_oauth2_token_fetcher_http_response,
grpc_core::ExecCtx::Get()->Now() + refresh_threshold);
grpc_core::Timestamp::Now() + refresh_threshold);
}
return
[pending_request]()

@ -929,7 +929,7 @@ class ExecCtxNext : public grpc_core::ExecCtx {
return true;
}
}
return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
}
private:
@ -1033,7 +1033,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
}
if (!is_finished_arg.first_loop &&
grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
grpc_core::Timestamp::Now() >= deadline_millis) {
ret.type = GRPC_QUEUE_TIMEOUT;
ret.success = 0;
dump_pending_tags(cq);
@ -1188,7 +1188,7 @@ class ExecCtxPluck : public grpc_core::ExecCtx {
}
gpr_mu_unlock(cq->mu);
}
return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
return !a->first_loop && a->deadline < grpc_core::Timestamp::Now();
}
private:
@ -1279,7 +1279,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
if (!is_finished_arg.first_loop &&
grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
grpc_core::Timestamp::Now() >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
ret.type = GRPC_QUEUE_TIMEOUT;

@ -25,8 +25,6 @@
#include <algorithm>
#include "src/core/lib/iomgr/exec_ctx.h"
grpc_core::TraceFlag grpc_bdp_estimator_trace(false, "bdp_estimator");
namespace grpc_core {
@ -82,7 +80,7 @@ Timestamp BdpEstimator::CompletePing() {
}
ping_state_ = PingState::UNSCHEDULED;
accumulator_ = 0;
return ExecCtx::Get()->Now() + inter_ping_delay_;
return Timestamp::Now() + inter_ping_delay_;
}
} // namespace grpc_core

@ -24,7 +24,6 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/timeout_encoding.h"
namespace grpc_core {
@ -120,11 +119,11 @@ GrpcTimeoutMetadata::ValueType GrpcTimeoutMetadata::MementoToValue(
if (timeout == Duration::Infinity()) {
return Timestamp::InfFuture();
}
return ExecCtx::Get()->Now() + timeout;
return Timestamp::Now() + timeout;
}
Slice GrpcTimeoutMetadata::Encode(ValueType x) {
return Timeout::FromDuration(x - ExecCtx::Get()->Now()).Encode();
return Timeout::FromDuration(x - Timestamp::Now()).Encode();
}
TeMetadata::MementoType TeMetadata::ParseMemento(

@ -20,8 +20,6 @@
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/iomgr/exec_ctx.h"
grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status) {
switch (status) {
case GRPC_STATUS_OK:
@ -50,7 +48,7 @@ grpc_status_code grpc_http2_error_to_grpc_status(
case GRPC_HTTP2_CANCEL:
/* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been
* exceeded */
return grpc_core::ExecCtx::Get()->Now() > deadline
return grpc_core::Timestamp::Now() > deadline
? GRPC_STATUS_DEADLINE_EXCEEDED
: GRPC_STATUS_CANCELLED;
case GRPC_HTTP2_ENHANCE_YOUR_CALM:

@ -46,11 +46,11 @@ TEST(BackOffTest, ConstantBackOff) {
BackOff backoff(options);
grpc_core::Timestamp next_attempt_start_time = backoff.NextAttemptTime();
EXPECT_EQ(next_attempt_start_time - grpc_core::ExecCtx::Get()->Now(),
EXPECT_EQ(next_attempt_start_time - grpc_core::Timestamp::Now(),
initial_backoff);
for (int i = 0; i < 10000; i++) {
next_attempt_start_time = backoff.NextAttemptTime();
EXPECT_EQ(next_attempt_start_time - grpc_core::ExecCtx::Get()->Now(),
EXPECT_EQ(next_attempt_start_time - grpc_core::Timestamp::Now(),
initial_backoff);
}
}
@ -68,7 +68,7 @@ TEST(BackOffTest, MinConnect) {
.set_max_backoff(max_backoff);
BackOff backoff(options);
grpc_core::Timestamp next = backoff.NextAttemptTime();
EXPECT_EQ(next - grpc_core::ExecCtx::Get()->Now(), initial_backoff);
EXPECT_EQ(next - grpc_core::Timestamp::Now(), initial_backoff);
}
TEST(BackOffTest, NoJitterBackOff) {
@ -145,7 +145,7 @@ TEST(BackOffTest, JitterBackOff) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Timestamp next = backoff.NextAttemptTime();
EXPECT_EQ(next - grpc_core::ExecCtx::Get()->Now(), initial_backoff);
EXPECT_EQ(next - grpc_core::Timestamp::Now(), initial_backoff);
auto expected_next_lower_bound = grpc_core::Duration::Milliseconds(
static_cast<double>(current_backoff.millis()) * (1 - jitter));
@ -157,7 +157,7 @@ TEST(BackOffTest, JitterBackOff) {
// next-now must be within (jitter*100)% of the current backoff (which
// increases by * multiplier up to max_backoff).
const grpc_core::Duration timeout_millis =
next - grpc_core::ExecCtx::Get()->Now();
next - grpc_core::Timestamp::Now();
EXPECT_GE(timeout_millis, expected_next_lower_bound);
EXPECT_LE(timeout_millis, expected_next_upper_bound);
current_backoff = std::min(

@ -94,11 +94,11 @@ class TestDNSResolver : public grpc_core::DNSResolver {
last_resolution_time = now;
}
// For correct time diff comparisons, make sure that any subsequent calls
// to grpc_core::ExecCtx::Get()->Now() on this thread don't return a time
// to grpc_core::Timestamp::Now() on this thread don't return a time
// which is earlier than that returned by the call(s) to
// gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important
// because the resolver's last_resolution_timestamp_ will be taken from
// grpc_core::ExecCtx::Get()->Now() right after this returns.
// grpc_core::Timestamp::Now() right after this returns.
grpc_core::ExecCtx::Get()->InvalidateNow();
return result;
}
@ -164,11 +164,11 @@ static grpc_ares_request* test_dns_lookup_ares(
}
last_resolution_time = now;
// For correct time diff comparisons, make sure that any subsequent calls
// to grpc_core::ExecCtx::Get()->Now() on this thread don't return a time
// to grpc_core::Timestamp::Now() on this thread don't return a time
// which is earlier than that returned by the call(s) to
// gpr_now(GPR_CLOCK_MONOTONIC) within this function. This is important
// because the resolver's last_resolution_timestamp_ will be taken from
// grpc_core::ExecCtx::Get()->Now() right after this returns.
// grpc_core::Timestamp::Now() right after this returns.
grpc_core::ExecCtx::Get()->InvalidateNow();
return result;
}
@ -217,7 +217,7 @@ static void poll_pollset_until_request_done(iomgr_args* args) {
if (done) {
break;
}
grpc_core::Duration time_left = deadline - grpc_core::ExecCtx::Get()->Now();
grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now();
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64, done, time_left.millis());
ASSERT_GE(time_left, grpc_core::Duration::Zero());
grpc_pollset_worker* worker = nullptr;

@ -552,7 +552,7 @@ static void on_read_request_done_locked(void* arg, grpc_error_handle error) {
// Connect to requested address.
// The connection callback inherits our reference to conn.
const grpc_core::Timestamp deadline =
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(10);
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10);
GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn,
grpc_schedule_on_exec_ctx);
auto args = grpc_core::CoreConfiguration::Get()
@ -616,7 +616,7 @@ static void thread_main(void* arg) {
gpr_mu_lock(proxy->mu);
GRPC_LOG_IF_ERROR("grpc_pollset_work",
grpc_pollset_work(proxy->pollset[0], &worker,
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Timestamp::Now() +
grpc_core::Duration::Seconds(1)));
gpr_mu_unlock(proxy->mu);
grpc_core::ExecCtx::Get()->Flush();

@ -151,7 +151,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
: name_(std::string(name)), on_done_(std::move(on_done)) {
grpc_timer_init(
&timer_,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(),
GRPC_CLOSURE_CREATE(FinishResolve, this, grpc_schedule_on_exec_ctx));
}
@ -242,8 +242,7 @@ grpc_ares_request* my_dns_lookup_ares(
r->on_done = on_done;
r->addresses = addresses;
grpc_timer_init(
&r->timer,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
&r->timer, grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(),
GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx));
return nullptr;
}
@ -308,8 +307,7 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
fc->ep = ep;
fc->deadline = deadline;
grpc_timer_init(
&fc->timer,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
&fc->timer, grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(),
GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
}

@ -116,7 +116,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_server_shutdown_and_notify(server, cq, tag(0xdead));
grpc_server_cancel_all_calls(server);
grpc_core::Timestamp deadline =
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(5);
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(5);
for (int i = 0; i <= requested_calls; i++) {
// A single grpc_completion_queue_next might not be sufficient for getting
// the tag from shutdown, because we might potentially get blocked by
@ -132,7 +132,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_OP_COMPLETE &&
grpc_core::ExecCtx::Get()->Now() < deadline);
grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
}
grpc_completion_queue_shutdown(cq);
@ -142,7 +142,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_QUEUE_SHUTDOWN &&
grpc_core::ExecCtx::Get()->Now() < deadline);
grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
}
grpc_server_destroy(server);

@ -35,7 +35,7 @@ namespace posix_engine {
TEST(TimerManagerTest, StressTest) {
grpc_core::ExecCtx exec_ctx;
auto now = exec_ctx.Now();
auto now = grpc_core::Timestamp::Now();
auto test_deadline = now + grpc_core::Duration::Seconds(15);
std::vector<Timer> timers;
constexpr int kTimerCount = 500;
@ -58,7 +58,7 @@ TEST(TimerManagerTest, StressTest) {
// Wait for all callbacks to have been called
while (called.load(std::memory_order_relaxed) < kTimerCount) {
exec_ctx.InvalidateNow();
if (exec_ctx.Now() > test_deadline) {
if (grpc_core::Timestamp::Now() > test_deadline) {
FAIL() << "Deadline exceeded. "
<< called.load(std::memory_order_relaxed) << "/" << kTimerCount
<< " callbacks executed";

@ -265,7 +265,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_mu_lock(g_mu);
while (!state.read_done || !state.write_done) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
GPR_ASSERT(grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
}
@ -291,7 +291,7 @@ static void wait_for_fail_count(int* fail_count, int want_fail_count) {
gpr_mu_lock(g_mu);
grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
grpc_timeout_seconds_to_deadline(10));
while (grpc_core::ExecCtx::Get()->Now() < deadline &&
while (grpc_core::Timestamp::Now() < deadline &&
*fail_count < want_fail_count) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(

@ -103,8 +103,7 @@ static void actually_poll(void* argsp) {
if (args->done) {
break;
}
grpc_core::Duration time_left =
deadline - grpc_core::ExecCtx::Get()->Now();
grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now();
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64, args->done,
time_left.millis());
ASSERT_GE(time_left, grpc_core::Duration::Zero());

@ -99,8 +99,7 @@ class ResolveAddressTest : public ::testing::Test {
if (done_) {
break;
}
grpc_core::Duration time_left =
deadline - grpc_core::ExecCtx::Get()->Now();
grpc_core::Duration time_left = deadline - grpc_core::Timestamp::Now();
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64, done_,
time_left.millis());
ASSERT_GE(time_left, grpc_core::Duration::Zero());

@ -284,7 +284,7 @@ static grpc_error_handle tcp_connect(const test_addr* remote,
}
gpr_log(GPR_DEBUG, "wait");
while (g_nconnects == nconnects_before &&
deadline > grpc_core::ExecCtx::Get()->Now()) {
deadline > grpc_core::Timestamp::Now()) {
grpc_pollset_worker* worker = nullptr;
grpc_error_handle err;
if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=

@ -58,7 +58,7 @@ static void add_test(void) {
grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace);
memset(cb_called, 0, sizeof(cb_called));
grpc_core::Timestamp start = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp start = grpc_core::Timestamp::Now();
/* 10 ms timers. will expire in the current epoch */
for (i = 0; i < 10; i++) {
@ -178,7 +178,7 @@ void long_running_service_cleanup_test(void) {
gpr_log(GPR_INFO, "long_running_service_cleanup_test");
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
GPR_ASSERT(now.milliseconds_after_process_epoch() >= k25Days.millis());
grpc_timer_list_init();
grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace);

@ -36,7 +36,7 @@ namespace {
TEST(Sleep, Zzzz) {
ExecCtx exec_ctx;
absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1);
Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
// Sleep for one second then set done to true.
auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(),
[&done](absl::Status r) {
@ -45,13 +45,13 @@ TEST(Sleep, Zzzz) {
});
done.WaitForNotification();
exec_ctx.InvalidateNow();
EXPECT_GE(ExecCtx::Get()->Now(), done_time);
EXPECT_GE(Timestamp::Now(), done_time);
}
TEST(Sleep, AlreadyDone) {
ExecCtx exec_ctx;
absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() - Duration::Seconds(1);
Timestamp done_time = Timestamp::Now() - Duration::Seconds(1);
// Sleep for no time at all then set done to true.
auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(),
[&done](absl::Status r) {
@ -64,7 +64,7 @@ TEST(Sleep, AlreadyDone) {
TEST(Sleep, Cancel) {
ExecCtx exec_ctx;
absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Seconds(1);
Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
// Sleep for one second but race it to complete immediately
auto activity = MakeActivity(
Race(Sleep(done_time), [] { return absl::CancelledError(); }),
@ -74,14 +74,14 @@ TEST(Sleep, Cancel) {
});
done.WaitForNotification();
exec_ctx.InvalidateNow();
EXPECT_LT(ExecCtx::Get()->Now(), done_time);
EXPECT_LT(Timestamp::Now(), done_time);
}
TEST(Sleep, MoveSemantics) {
// ASAN should help determine if there are any memory leaks here
ExecCtx exec_ctx;
absl::Notification done;
Timestamp done_time = ExecCtx::Get()->Now() + Duration::Milliseconds(111);
Timestamp done_time = Timestamp::Now() + Duration::Milliseconds(111);
Sleep donor(done_time);
Sleep sleeper = std::move(donor);
auto activity = MakeActivity(std::move(sleeper), InlineWakeupScheduler(),
@ -91,7 +91,7 @@ TEST(Sleep, MoveSemantics) {
});
done.WaitForNotification();
exec_ctx.InvalidateNow();
EXPECT_GE(ExecCtx::Get()->Now(), done_time);
EXPECT_GE(Timestamp::Now(), done_time);
}
TEST(Sleep, StressTest) {
@ -104,7 +104,8 @@ TEST(Sleep, StressTest) {
for (int i = 0; i < kNumActivities; i++) {
auto notification = std::make_shared<absl::Notification>();
auto activity = MakeActivity(
Sleep(exec_ctx.Now() + Duration::Seconds(1)), ExecCtxWakeupScheduler(),
Sleep(Timestamp::Now() + Duration::Seconds(1)),
ExecCtxWakeupScheduler(),
[notification](absl::Status /*r*/) { notification->Notify(); });
notifications.push_back(std::move(notification));
activities.push_back(std::move(activity));

@ -40,14 +40,14 @@ TEST(PeriodicUpdateTest, SimpleTest) {
{
ExecCtx exec_ctx;
upd = absl::make_unique<PeriodicUpdate>(Duration::Seconds(1));
start = exec_ctx.Now();
start = Timestamp::Now();
}
// Wait until the first period has elapsed.
bool done = false;
while (!done) {
ExecCtx exec_ctx;
upd->Tick([&](Duration elapsed) {
reset_start = ExecCtx::Get()->Now();
reset_start = Timestamp::Now();
EXPECT_GE(elapsed, Duration::Seconds(1));
done = true;
});
@ -55,7 +55,7 @@ TEST(PeriodicUpdateTest, SimpleTest) {
// Ensure that took at least 1 second.
{
ExecCtx exec_ctx;
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1));
EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(1));
start = reset_start;
}
// Do ten more update cycles
@ -64,8 +64,8 @@ TEST(PeriodicUpdateTest, SimpleTest) {
while (!done) {
ExecCtx exec_ctx;
upd->Tick([&](Duration) {
reset_start = ExecCtx::Get()->Now();
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1));
reset_start = Timestamp::Now();
EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(1));
done = true;
});
}
@ -73,8 +73,8 @@ TEST(PeriodicUpdateTest, SimpleTest) {
// allowance for the presumed inaccuracy of this type.
{
ExecCtx exec_ctx;
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1));
EXPECT_LE(exec_ctx.Now() - start, Duration::Seconds(3));
EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(1));
EXPECT_LE(Timestamp::Now() - start, Duration::Seconds(3));
start = reset_start;
}
}
@ -88,7 +88,7 @@ TEST(PeriodicUpdate, ThreadTest) {
{
ExecCtx exec_ctx;
upd = absl::make_unique<PeriodicUpdate>(Duration::Seconds(1));
start = exec_ctx.Now();
start = Timestamp::Now();
}
// Run ten threads all updating the counter continuously, for a total of ten
// update cycles.
@ -113,8 +113,8 @@ TEST(PeriodicUpdate, ThreadTest) {
// Ensure our ten cycles took at least 10 seconds, and no more than 30.
{
ExecCtx exec_ctx;
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(10));
EXPECT_LE(exec_ctx.Now() - start, Duration::Seconds(30));
EXPECT_GE(Timestamp::Now() - start, Duration::Seconds(10));
EXPECT_LE(Timestamp::Now() - start, Duration::Seconds(30));
}
}

@ -90,7 +90,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
creds->create_security_connector(grpc_core::ChannelArgs());
GPR_ASSERT(sc != nullptr);
grpc_core::Timestamp deadline =
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now();
grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now();
struct handshake_state state;
state.done_callback_called = false;

@ -159,8 +159,8 @@ void bad_server_thread(void* vargs) {
gpr_mu_lock(args->mu);
while (!args->stop.load(std::memory_order_acquire)) {
grpc_core::Timestamp deadline = grpc_core::ExecCtx::Get()->Now() +
grpc_core::Duration::Milliseconds(100);
grpc_core::Timestamp deadline =
grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(100);
grpc_pollset_worker* worker = nullptr;
if (!GRPC_LOG_IF_ERROR(

@ -94,7 +94,7 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) {
grpc_server_shutdown_and_notify(server, cq, tag(0xdead));
grpc_server_cancel_all_calls(server);
grpc_core::Timestamp deadline =
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(5);
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(5);
for (int i = 0; i <= requested_calls; i++) {
// A single grpc_completion_queue_next might not be sufficient for getting
// the tag from shutdown, because we might potentially get blocked by
@ -110,7 +110,7 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) {
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_OP_COMPLETE &&
grpc_core::ExecCtx::Get()->Now() < deadline);
grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
}
grpc_completion_queue_shutdown(cq);
@ -120,7 +120,7 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) {
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_QUEUE_SHUTDOWN &&
grpc_core::ExecCtx::Get()->Now() < deadline);
grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
}
grpc_server_destroy(server);

@ -162,7 +162,7 @@ void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
kMaxAdvanceTimeMillis),
GPR_TIMESPAN));
exec_ctx.InvalidateNow();
if (exec_ctx.Now() >= next_bdp_ping_) {
if (Timestamp::Now() >= next_bdp_ping_) {
scheduled_write_ = true;
}
} break;
@ -289,7 +289,7 @@ void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
}
if (scheduled_write_) {
SendToRemote send;
if (exec_ctx.Now() >= next_bdp_ping_) {
if (Timestamp::Now() >= next_bdp_ping_) {
if (auto* bdp = tfc_->bdp_estimator()) {
bdp->SchedulePing();
bdp->StartPing();

@ -146,7 +146,7 @@ class Client {
grpc_tcp_client_connect(
state.closure(), &endpoint_, pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
addresses_or->data(), ExecCtx::Get()->Now() + Duration::Seconds(1));
addresses_or->data(), Timestamp::Now() + Duration::Seconds(1));
ASSERT_TRUE(PollUntilDone(&state, Timestamp::InfFuture()));
ASSERT_EQ(GRPC_ERROR_NONE, state.error());
grpc_pollset_set_destroy(pollset_set);
@ -162,7 +162,7 @@ class Client {
bool retval = true;
// Use a deadline of 3 seconds, which is a lot more than we should
// need for a 1-second timeout, but this helps avoid flakes.
Timestamp deadline = ExecCtx::Get()->Now() + Duration::Seconds(3);
Timestamp deadline = Timestamp::Now() + Duration::Seconds(3);
while (true) {
EventState state;
grpc_endpoint_read(endpoint_, &read_buffer, state.closure(),
@ -226,15 +226,15 @@ class Client {
while (true) {
grpc_pollset_worker* worker = nullptr;
gpr_mu_lock(mu_);
GRPC_LOG_IF_ERROR("grpc_pollset_work",
grpc_pollset_work(pollset_, &worker,
ExecCtx::Get()->Now() +
Duration::Milliseconds(100)));
GRPC_LOG_IF_ERROR(
"grpc_pollset_work",
grpc_pollset_work(pollset_, &worker,
Timestamp::Now() + Duration::Milliseconds(100)));
// Flushes any work scheduled before or during polling.
ExecCtx::Get()->Flush();
gpr_mu_unlock(mu_);
if (state != nullptr && state->done()) return true;
if (ExecCtx::Get()->Now() >= deadline) return false;
if (Timestamp::Now() >= deadline) return false;
}
}

@ -499,7 +499,7 @@ static void sched_next_channel_action_locked(half* m) {
grpc_timer_init(&m->parent->channel_effects->timer,
grpc_core::Duration::Milliseconds(
m->parent->channel_effects->actions[0].wait_ms) +
grpc_core::ExecCtx::Get()->Now(),
grpc_core::Timestamp::Now(),
GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m,
grpc_schedule_on_exec_ctx));
}

@ -107,7 +107,7 @@ void grpc_free_port_using_server(int port) {
GPR_ASSERT(uri.ok());
auto http_request = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, &pr.pops, &req,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(30),
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30),
GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
grpc_schedule_on_exec_ctx),
&rsp,
@ -121,7 +121,7 @@ void grpc_free_port_using_server(int port) {
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker,
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Timestamp::Now() +
grpc_core::Duration::Seconds(1)))) {
pr.done = 1;
}
@ -191,7 +191,7 @@ static void got_port_from_server(void* arg, grpc_error_handle error) {
GPR_ASSERT(uri.ok());
pr->http_request = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, &pr->pops, &req,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(30),
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30),
GRPC_CLOSURE_CREATE(got_port_from_server, pr,
grpc_schedule_on_exec_ctx),
&pr->response,
@ -238,7 +238,7 @@ int grpc_pick_port_using_server(void) {
GPR_ASSERT(uri.ok());
auto http_request = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, &pr.pops, &req,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(30),
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30),
GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
grpc_schedule_on_exec_ctx),
&pr.response,
@ -252,7 +252,7 @@ int grpc_pick_port_using_server(void) {
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker,
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Timestamp::Now() +
grpc_core::Duration::Seconds(1)))) {
pr.port = 0;
}

@ -90,14 +90,13 @@ INSTANTIATE_TEST_SUITE_P(TimeJump, TimeJumpTest,
TEST_P(TimeJumpTest, TimerRunning) {
grpc_core::ExecCtx exec_ctx;
grpc_timer timer;
grpc_timer_init(
&timer,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(3),
GRPC_CLOSURE_CREATE(
[](void*, grpc_error_handle error) {
GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
},
nullptr, grpc_schedule_on_exec_ctx));
grpc_timer_init(&timer,
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(3),
GRPC_CLOSURE_CREATE(
[](void*, grpc_error_handle error) {
GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
},
nullptr, grpc_schedule_on_exec_ctx));
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
std::ostringstream cmd;
cmd << "sudo date `date -v" << GetParam() << " \"+%m%d%H%M%y\"`";

@ -87,7 +87,7 @@ TEST_F(TimerTest, OneTimerExpires) {
int timer_fired = 0;
grpc_timer_init(
&timer,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Milliseconds(500),
grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(500),
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle) {
int* timer_fired = static_cast<int*>(arg);
@ -113,7 +113,7 @@ TEST_F(TimerTest, MultipleTimersExpire) {
int timer_fired = 0;
for (int i = 0; i < kNumTimers; ++i) {
grpc_timer_init(&timers[i],
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Timestamp::Now() +
grpc_core::Duration::Milliseconds(500) +
grpc_core::Duration::Milliseconds(i),
GRPC_CLOSURE_CREATE(
@ -147,7 +147,7 @@ TEST_F(TimerTest, CancelSomeTimers) {
// and set a small firing time for timers which need to execute.
grpc_timer_init(
&timers[i],
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Timestamp::Now() +
((i < kNumTimers / 2) ? grpc_core ::Duration::Milliseconds(60000)
: grpc_core ::Duration::Milliseconds(100) +
grpc_core::Duration::Milliseconds(i)),
@ -183,8 +183,7 @@ TEST_F(TimerTest, DISABLED_TimerNotCanceled) {
grpc_core::ExecCtx exec_ctx;
grpc_timer timer;
grpc_timer_init(
&timer,
grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(10),
&timer, grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10),
GRPC_CLOSURE_CREATE([](void*, grpc_error_handle) {}, nullptr,
grpc_schedule_on_exec_ctx));
}
@ -198,17 +197,17 @@ TEST_F(TimerTest, DISABLED_CancelRace) {
grpc_timer timers[kNumTimers];
for (int i = 0; i < kNumTimers; ++i) {
grpc_timer* arg = (i != 0) ? &timers[i - 1] : nullptr;
grpc_timer_init(&timers[i],
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Duration::Milliseconds(100),
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {
grpc_timer* timer = static_cast<grpc_timer*>(arg);
if (timer) {
grpc_timer_cancel(timer);
}
},
arg, grpc_schedule_on_exec_ctx));
grpc_timer_init(
&timers[i],
grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(100),
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {
grpc_timer* timer = static_cast<grpc_timer*>(arg);
if (timer) {
grpc_timer_cancel(timer);
}
},
arg, grpc_schedule_on_exec_ctx));
}
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
}
@ -230,17 +229,17 @@ TEST_F(TimerTest, DISABLED_CancelNextTimer) {
if (i < kNumTimers - 1) {
arg = &timers[i + 1];
}
grpc_timer_init(&timers[i],
grpc_core::ExecCtx::Get()->Now() +
grpc_core::Duration::Milliseconds(100),
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {
grpc_timer* timer = static_cast<grpc_timer*>(arg);
if (timer) {
grpc_timer_cancel(timer);
}
},
arg, grpc_schedule_on_exec_ctx));
grpc_timer_init(
&timers[i],
grpc_core::Timestamp::Now() + grpc_core::Duration::Milliseconds(100),
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle /*error*/) {
grpc_timer* timer = static_cast<grpc_timer*>(arg);
if (timer) {
grpc_timer_cancel(timer);
}
},
arg, grpc_schedule_on_exec_ctx));
}
grpc_timer_cancel(&timers[0]);
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));

@ -192,7 +192,7 @@ ConnectionAttemptInjector::InjectedDelay::InjectedDelay(
const grpc_resolved_address* addr, grpc_core::Timestamp deadline)
: attempt_(closure, ep, interested_parties, config, addr, deadline) {
GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr);
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp now = grpc_core::Timestamp::Now();
duration = std::min(duration, deadline - now);
grpc_timer_init(&timer_, now + duration, &timer_callback_);
}

@ -73,7 +73,7 @@ BENCHMARK(BM_HpackEncoderInitDestroy);
static void BM_HpackEncoderEncodeDeadline(benchmark::State& state) {
TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx;
grpc_core::Timestamp saved_now = grpc_core::ExecCtx::Get()->Now();
grpc_core::Timestamp saved_now = grpc_core::Timestamp::Now();
auto arena = grpc_core::MakeScopedArena(1024, g_memory_allocator);
grpc_metadata_batch b(arena.get());

@ -221,7 +221,7 @@ void MaybePollArbitraryPollsetTwice() {
gpr_mu_lock(mu);
GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(pollset, &worker, grpc_core::ExecCtx::Get()->Now()));
grpc_pollset_work(pollset, &worker, grpc_core::Timestamp::Now()));
gpr_mu_unlock(mu);
grpc_core::ExecCtx::Get()->Flush();
// Make a second zero-timeout poll (in case the first one
@ -229,7 +229,7 @@ void MaybePollArbitraryPollsetTwice() {
gpr_mu_lock(mu);
GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(pollset, &worker, grpc_core::ExecCtx::Get()->Now()));
grpc_pollset_work(pollset, &worker, grpc_core::Timestamp::Now()));
gpr_mu_unlock(mu);
grpc_core::ExecCtx::Get()->Flush();
grpc_pollset_destroy(pollset);

Loading…
Cancel
Save