From 38de0e22d48fbe420e11628637a513958d8bc297 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 9 Sep 2024 09:41:54 -0700 Subject: [PATCH] [call-v3] Migrate time caching to the right spot (#37637) This change adds an experiment to move time caching from `ExecCtx` (which is the wrong place for this mechanism) and moves it to the party update path (the expectation being that a single poll of a call is the granularity at which we expect time caching to be a useful optimization, whilst avoiding the unbounded hold times associated with the current mechanism). This requires fixing up a few tests that grew to depend on time caching (would appreciate close eyes on the credentials test, as it's unclear to me why this is required or what the effect is). This should also fix b/232544809. Closes #37637 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37637 from ctiller:closer-to-the-sun 8bbde2d0bdc352fa72af20698cd8d550d96e8c0a PiperOrigin-RevId: 672574762 --- bazel/experiments.bzl | 1 + src/core/lib/experiments/experiments.cc | 18 +++++++++++++++ src/core/lib/experiments/experiments.h | 11 +++++++++ src/core/lib/experiments/experiments.yaml | 5 ++++ src/core/lib/experiments/rollouts.yaml | 2 ++ src/core/lib/iomgr/exec_ctx.h | 28 +++++++++++++---------- src/core/lib/promise/party.cc | 6 +++++ test/core/iomgr/timer_list_test.cc | 6 +++++ test/core/security/credentials_test.cc | 5 ++++ 9 files changed, 70 insertions(+), 12 deletions(-) diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 5ae6526b4ad..1fc7f607ea8 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -34,6 +34,7 @@ EXPERIMENT_ENABLES = { "server_privacy": "server_privacy", "tcp_frame_size_tuning": "tcp_frame_size_tuning", "tcp_rcv_lowat": "tcp_rcv_lowat", + "time_caching_in_party": "time_caching_in_party", "trace_record_callops": "trace_record_callops", "unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size", "work_serializer_clears_time_cache": "work_serializer_clears_time_cache", diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 8a8b5a445e1..96a2c734308 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -79,6 +79,10 @@ const char* const additional_constraints_tcp_frame_size_tuning = "{}"; const char* const description_tcp_rcv_lowat = "Use SO_RCVLOWAT to avoid wakeups on the read path."; const char* const additional_constraints_tcp_rcv_lowat = "{}"; +const char* const description_time_caching_in_party = + "Disable time caching in exec_ctx, and enable it only in a single party " + "execution."; +const char* const additional_constraints_time_caching_in_party = "{}"; const char* const description_trace_record_callops = "Enables tracing of call batch initiation and completion."; const char* const additional_constraints_trace_record_callops = "{}"; @@ -141,6 +145,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true}, {"tcp_rcv_lowat", description_tcp_rcv_lowat, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, + {"time_caching_in_party", description_time_caching_in_party, + additional_constraints_time_caching_in_party, nullptr, 0, true, true}, {"trace_record_callops", description_trace_record_callops, additional_constraints_trace_record_callops, nullptr, 0, true, true}, {"unconstrained_max_quota_buffer_size", @@ -216,6 +222,10 @@ const char* const additional_constraints_tcp_frame_size_tuning = "{}"; const char* const description_tcp_rcv_lowat = "Use SO_RCVLOWAT to avoid wakeups on the read path."; const char* const additional_constraints_tcp_rcv_lowat = "{}"; +const char* const description_time_caching_in_party = + "Disable time caching in exec_ctx, and enable it only in a single party " + "execution."; +const char* const additional_constraints_time_caching_in_party = "{}"; const char* const description_trace_record_callops = "Enables tracing of call batch initiation and completion."; const char* const additional_constraints_trace_record_callops = "{}"; @@ -278,6 +288,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true}, {"tcp_rcv_lowat", description_tcp_rcv_lowat, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, + {"time_caching_in_party", description_time_caching_in_party, + additional_constraints_time_caching_in_party, nullptr, 0, true, true}, {"trace_record_callops", description_trace_record_callops, additional_constraints_trace_record_callops, nullptr, 0, true, true}, {"unconstrained_max_quota_buffer_size", @@ -353,6 +365,10 @@ const char* const additional_constraints_tcp_frame_size_tuning = "{}"; const char* const description_tcp_rcv_lowat = "Use SO_RCVLOWAT to avoid wakeups on the read path."; const char* const additional_constraints_tcp_rcv_lowat = "{}"; +const char* const description_time_caching_in_party = + "Disable time caching in exec_ctx, and enable it only in a single party " + "execution."; +const char* const additional_constraints_time_caching_in_party = "{}"; const char* const description_trace_record_callops = "Enables tracing of call batch initiation and completion."; const char* const additional_constraints_trace_record_callops = "{}"; @@ -415,6 +431,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true}, {"tcp_rcv_lowat", description_tcp_rcv_lowat, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, + {"time_caching_in_party", description_time_caching_in_party, + additional_constraints_time_caching_in_party, nullptr, 0, true, true}, {"trace_record_callops", description_trace_record_callops, additional_constraints_trace_record_callops, nullptr, 0, true, true}, {"unconstrained_max_quota_buffer_size", diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 0b44aca703f..b6fb4257c58 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -78,6 +78,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY +inline bool IsTimeCachingInPartyEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } @@ -110,6 +112,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY +inline bool IsTimeCachingInPartyEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } @@ -141,6 +145,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY +inline bool IsTimeCachingInPartyEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } @@ -168,6 +174,7 @@ enum ExperimentIds { kExperimentIdServerPrivacy, kExperimentIdTcpFrameSizeTuning, kExperimentIdTcpRcvLowat, + kExperimentIdTimeCachingInParty, kExperimentIdTraceRecordCallops, kExperimentIdUnconstrainedMaxQuotaBufferSize, kExperimentIdWorkSerializerClearsTimeCache, @@ -242,6 +249,10 @@ inline bool IsTcpFrameSizeTuningEnabled() { inline bool IsTcpRcvLowatEnabled() { return IsExperimentEnabled(); } +#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY +inline bool IsTimeCachingInPartyEnabled() { + return IsExperimentEnabled(); +} #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS inline bool IsTraceRecordCallopsEnabled() { return IsExperimentEnabled(); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index a74b72dc62f..6cc3781f266 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -145,6 +145,11 @@ expiry: 2024/12/01 owner: vigneshbabu@google.com test_tags: ["endpoint_test", "flow_control_test"] +- name: time_caching_in_party + description: Disable time caching in exec_ctx, and enable it only in a single party execution. + owner: ctiller@google.com + expiry: 2024/12/12 + test_tags: [] - name: trace_record_callops description: Enables tracing of call batch initiation and completion. expiry: 2024/12/01 diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index e3a87d1f62b..661539b1029 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -98,6 +98,8 @@ default: false - name: tcp_rcv_lowat default: false +- name: time_caching_in_party + default: true - name: trace_record_callops default: true - name: unconstrained_max_quota_buffer_size diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index ce41e5b92b3..bcdb1a743d7 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -35,6 +35,7 @@ #include #include +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/fork.h" @@ -116,6 +117,11 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope { ExecCtx() : latent_see::ParentScope(GRPC_LATENT_SEE_METADATA("ExecCtx")), flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { +#if !TARGET_OS_IPHONE + if (!IsTimeCachingInPartyEnabled()) { + time_cache_.emplace(); + } +#endif Fork::IncExecCtxCount(); Set(this); } @@ -126,6 +132,11 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope { explicit ExecCtx(uintptr_t fl, latent_see::Metadata* latent_see_metadata) : latent_see::ParentScope(latent_see_metadata), flags_(fl) { +#if !TARGET_OS_IPHONE + if (!IsTimeCachingInPartyEnabled()) { + time_cache_.emplace(); + } +#endif if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { Fork::IncExecCtxCount(); } @@ -195,23 +206,18 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope { Timestamp Now() { return Timestamp::Now(); } void InvalidateNow() { -#if !TARGET_OS_IPHONE - time_cache_.InvalidateCache(); -#endif + if (time_cache_.has_value()) time_cache_->InvalidateCache(); } void SetNowIomgrShutdown() { -#if !TARGET_OS_IPHONE // We get to do a test only set now on this path just because iomgr // is getting removed and no point adding more interfaces for it. - time_cache_.TestOnlySetNow(Timestamp::InfFuture()); -#endif + TestOnlySetNow(Timestamp::InfFuture()); } void TestOnlySetNow(Timestamp now) { -#if !TARGET_OS_IPHONE - time_cache_.TestOnlySetNow(now); -#endif + if (!time_cache_.has_value()) time_cache_.emplace(); + time_cache_->TestOnlySetNow(now); } /// Gets pointer to current exec_ctx. @@ -237,9 +243,7 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope { CombinerData combiner_data_ = {nullptr, nullptr}; uintptr_t flags_; -#if !TARGET_OS_IPHONE - ScopedTimeCache time_cache_; -#endif + absl::optional time_cache_; #if !defined(_WIN32) || !defined(_DLL) static thread_local ExecCtx* exec_ctx_; diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index c8bde2d198f..b8b6a899a48 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -269,6 +269,12 @@ void Party::RunPartyAndUnref(uint64_t prev_state) { DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u) << "Party should have contained no wakeups on lock"; prev_state |= kLocked; + absl::optional time_cache; +#if !TARGET_OS_IPHONE + if (IsTimeCachingInPartyEnabled()) { + time_cache.emplace(); + } +#endif for (;;) { uint64_t keep_allocated_mask = kAllocatedMask; // For each wakeup bit... diff --git a/test/core/iomgr/timer_list_test.cc b/test/core/iomgr/timer_list_test.cc index e3064c299ff..e3c98e83780 100644 --- a/test/core/iomgr/timer_list_test.cc +++ b/test/core/iomgr/timer_list_test.cc @@ -47,6 +47,8 @@ static void cb(void* arg, grpc_error_handle error) { } static void add_test(void) { + if (grpc_core::IsTimeCachingInPartyEnabled()) return; + int i; grpc_timer timers[20]; grpc_core::ExecCtx exec_ctx; @@ -116,6 +118,8 @@ static void add_test(void) { // Cleaning up a list with pending timers. void destruction_test(void) { + if (grpc_core::IsTimeCachingInPartyEnabled()) return; + grpc_timer timers[5]; grpc_core::ExecCtx exec_ctx; @@ -173,6 +177,8 @@ void destruction_test(void) { // 4) Shuts down the timer list // https://github.com/grpc/grpc/issues/15904 void long_running_service_cleanup_test(void) { + if (grpc_core::IsTimeCachingInPartyEnabled()) return; + grpc_timer timers[4]; grpc_core::ExecCtx exec_ctx; diff --git a/test/core/security/credentials_test.cc b/test/core/security/credentials_test.cc index 721a02f6e33..3731f5ca189 100644 --- a/test/core/security/credentials_test.cc +++ b/test/core/security/credentials_test.cc @@ -2492,6 +2492,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) { ExecCtx exec_ctx; creds_->AddResult(MakeToken("foo", kExpirationTime)); // First request will trigger a fetch. + LOG(INFO) << "First request"; auto state = RequestMetadataState::NewInstance( absl::OkStatus(), "authorization: foo", /*expect_delay=*/true); state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, @@ -2499,6 +2500,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) { EXPECT_EQ(creds_->num_fetches(), 1); // Second request while fetch is still outstanding will be delayed but // will not trigger a new fetch. + LOG(INFO) << "Second request"; state = RequestMetadataState::NewInstance( absl::OkStatus(), "authorization: foo", /*expect_delay=*/true); state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, @@ -2507,6 +2509,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) { // Now tick to finish the fetch. event_engine_->TickUntilIdle(); // Next request will be served from cache with no delay. + LOG(INFO) << "Third request"; state = RequestMetadataState::NewInstance( absl::OkStatus(), "authorization: foo", /*expect_delay=*/false); state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, @@ -2519,6 +2522,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) { // Next request will trigger a new fetch but will still use the // cached token. creds_->AddResult(MakeToken("bar")); + LOG(INFO) << "Fourth request"; state = RequestMetadataState::NewInstance( absl::OkStatus(), "authorization: foo", /*expect_delay=*/false); state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, @@ -2526,6 +2530,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) { EXPECT_EQ(creds_->num_fetches(), 2); event_engine_->TickUntilIdle(); // Next request will use the new data. + LOG(INFO) << "Fifth request"; state = RequestMetadataState::NewInstance( absl::OkStatus(), "authorization: bar", /*expect_delay=*/false); state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,