[call-v3] Migrate time caching to the right spot (#37637)

This change adds an experiment to move time caching from `ExecCtx` (which is the wrong place for this mechanism) and moves it to the party update path (the expectation being that a single poll of a call is the granularity at which we expect time caching to be a useful optimization, whilst avoiding the unbounded hold times associated with the current mechanism).

This requires fixing up a few tests that grew to depend on time caching (would appreciate close eyes on the credentials test, as it's unclear to me why this is required or what the effect is).

This should also fix b/232544809.

Closes #37637

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37637 from ctiller:closer-to-the-sun 8bbde2d0bd
PiperOrigin-RevId: 672574762
pull/37638/head
Craig Tiller 3 months ago committed by Copybara-Service
parent 1ed28f882f
commit 38de0e22d4
  1. 1
      bazel/experiments.bzl
  2. 18
      src/core/lib/experiments/experiments.cc
  3. 11
      src/core/lib/experiments/experiments.h
  4. 5
      src/core/lib/experiments/experiments.yaml
  5. 2
      src/core/lib/experiments/rollouts.yaml
  6. 28
      src/core/lib/iomgr/exec_ctx.h
  7. 6
      src/core/lib/promise/party.cc
  8. 6
      test/core/iomgr/timer_list_test.cc
  9. 5
      test/core/security/credentials_test.cc

@ -34,6 +34,7 @@ EXPERIMENT_ENABLES = {
"server_privacy": "server_privacy", "server_privacy": "server_privacy",
"tcp_frame_size_tuning": "tcp_frame_size_tuning", "tcp_frame_size_tuning": "tcp_frame_size_tuning",
"tcp_rcv_lowat": "tcp_rcv_lowat", "tcp_rcv_lowat": "tcp_rcv_lowat",
"time_caching_in_party": "time_caching_in_party",
"trace_record_callops": "trace_record_callops", "trace_record_callops": "trace_record_callops",
"unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size", "unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size",
"work_serializer_clears_time_cache": "work_serializer_clears_time_cache", "work_serializer_clears_time_cache": "work_serializer_clears_time_cache",

@ -79,6 +79,10 @@ const char* const additional_constraints_tcp_frame_size_tuning = "{}";
const char* const description_tcp_rcv_lowat = const char* const description_tcp_rcv_lowat =
"Use SO_RCVLOWAT to avoid wakeups on the read path."; "Use SO_RCVLOWAT to avoid wakeups on the read path.";
const char* const additional_constraints_tcp_rcv_lowat = "{}"; const char* const additional_constraints_tcp_rcv_lowat = "{}";
const char* const description_time_caching_in_party =
"Disable time caching in exec_ctx, and enable it only in a single party "
"execution.";
const char* const additional_constraints_time_caching_in_party = "{}";
const char* const description_trace_record_callops = const char* const description_trace_record_callops =
"Enables tracing of call batch initiation and completion."; "Enables tracing of call batch initiation and completion.";
const char* const additional_constraints_trace_record_callops = "{}"; const char* const additional_constraints_trace_record_callops = "{}";
@ -141,6 +145,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true}, additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, {"tcp_rcv_lowat", description_tcp_rcv_lowat,
additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true},
{"time_caching_in_party", description_time_caching_in_party,
additional_constraints_time_caching_in_party, nullptr, 0, true, true},
{"trace_record_callops", description_trace_record_callops, {"trace_record_callops", description_trace_record_callops,
additional_constraints_trace_record_callops, nullptr, 0, true, true}, additional_constraints_trace_record_callops, nullptr, 0, true, true},
{"unconstrained_max_quota_buffer_size", {"unconstrained_max_quota_buffer_size",
@ -216,6 +222,10 @@ const char* const additional_constraints_tcp_frame_size_tuning = "{}";
const char* const description_tcp_rcv_lowat = const char* const description_tcp_rcv_lowat =
"Use SO_RCVLOWAT to avoid wakeups on the read path."; "Use SO_RCVLOWAT to avoid wakeups on the read path.";
const char* const additional_constraints_tcp_rcv_lowat = "{}"; const char* const additional_constraints_tcp_rcv_lowat = "{}";
const char* const description_time_caching_in_party =
"Disable time caching in exec_ctx, and enable it only in a single party "
"execution.";
const char* const additional_constraints_time_caching_in_party = "{}";
const char* const description_trace_record_callops = const char* const description_trace_record_callops =
"Enables tracing of call batch initiation and completion."; "Enables tracing of call batch initiation and completion.";
const char* const additional_constraints_trace_record_callops = "{}"; const char* const additional_constraints_trace_record_callops = "{}";
@ -278,6 +288,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true}, additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, {"tcp_rcv_lowat", description_tcp_rcv_lowat,
additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true},
{"time_caching_in_party", description_time_caching_in_party,
additional_constraints_time_caching_in_party, nullptr, 0, true, true},
{"trace_record_callops", description_trace_record_callops, {"trace_record_callops", description_trace_record_callops,
additional_constraints_trace_record_callops, nullptr, 0, true, true}, additional_constraints_trace_record_callops, nullptr, 0, true, true},
{"unconstrained_max_quota_buffer_size", {"unconstrained_max_quota_buffer_size",
@ -353,6 +365,10 @@ const char* const additional_constraints_tcp_frame_size_tuning = "{}";
const char* const description_tcp_rcv_lowat = const char* const description_tcp_rcv_lowat =
"Use SO_RCVLOWAT to avoid wakeups on the read path."; "Use SO_RCVLOWAT to avoid wakeups on the read path.";
const char* const additional_constraints_tcp_rcv_lowat = "{}"; const char* const additional_constraints_tcp_rcv_lowat = "{}";
const char* const description_time_caching_in_party =
"Disable time caching in exec_ctx, and enable it only in a single party "
"execution.";
const char* const additional_constraints_time_caching_in_party = "{}";
const char* const description_trace_record_callops = const char* const description_trace_record_callops =
"Enables tracing of call batch initiation and completion."; "Enables tracing of call batch initiation and completion.";
const char* const additional_constraints_trace_record_callops = "{}"; const char* const additional_constraints_trace_record_callops = "{}";
@ -415,6 +431,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true}, additional_constraints_tcp_frame_size_tuning, nullptr, 0, false, true},
{"tcp_rcv_lowat", description_tcp_rcv_lowat, {"tcp_rcv_lowat", description_tcp_rcv_lowat,
additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true},
{"time_caching_in_party", description_time_caching_in_party,
additional_constraints_time_caching_in_party, nullptr, 0, true, true},
{"trace_record_callops", description_trace_record_callops, {"trace_record_callops", description_trace_record_callops,
additional_constraints_trace_record_callops, nullptr, 0, true, true}, additional_constraints_trace_record_callops, nullptr, 0, true, true},
{"unconstrained_max_quota_buffer_size", {"unconstrained_max_quota_buffer_size",

@ -78,6 +78,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; }
inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY
inline bool IsTimeCachingInPartyEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsTraceRecordCallopsEnabled() { return true; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
@ -110,6 +112,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; }
inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY
inline bool IsTimeCachingInPartyEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsTraceRecordCallopsEnabled() { return true; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
@ -141,6 +145,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; }
inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY
inline bool IsTimeCachingInPartyEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsTraceRecordCallopsEnabled() { return true; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
@ -168,6 +174,7 @@ enum ExperimentIds {
kExperimentIdServerPrivacy, kExperimentIdServerPrivacy,
kExperimentIdTcpFrameSizeTuning, kExperimentIdTcpFrameSizeTuning,
kExperimentIdTcpRcvLowat, kExperimentIdTcpRcvLowat,
kExperimentIdTimeCachingInParty,
kExperimentIdTraceRecordCallops, kExperimentIdTraceRecordCallops,
kExperimentIdUnconstrainedMaxQuotaBufferSize, kExperimentIdUnconstrainedMaxQuotaBufferSize,
kExperimentIdWorkSerializerClearsTimeCache, kExperimentIdWorkSerializerClearsTimeCache,
@ -242,6 +249,10 @@ inline bool IsTcpFrameSizeTuningEnabled() {
inline bool IsTcpRcvLowatEnabled() { inline bool IsTcpRcvLowatEnabled() {
return IsExperimentEnabled<kExperimentIdTcpRcvLowat>(); return IsExperimentEnabled<kExperimentIdTcpRcvLowat>();
} }
#define GRPC_EXPERIMENT_IS_INCLUDED_TIME_CACHING_IN_PARTY
inline bool IsTimeCachingInPartyEnabled() {
return IsExperimentEnabled<kExperimentIdTimeCachingInParty>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS #define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { inline bool IsTraceRecordCallopsEnabled() {
return IsExperimentEnabled<kExperimentIdTraceRecordCallops>(); return IsExperimentEnabled<kExperimentIdTraceRecordCallops>();

@ -145,6 +145,11 @@
expiry: 2024/12/01 expiry: 2024/12/01
owner: vigneshbabu@google.com owner: vigneshbabu@google.com
test_tags: ["endpoint_test", "flow_control_test"] test_tags: ["endpoint_test", "flow_control_test"]
- name: time_caching_in_party
description: Disable time caching in exec_ctx, and enable it only in a single party execution.
owner: ctiller@google.com
expiry: 2024/12/12
test_tags: []
- name: trace_record_callops - name: trace_record_callops
description: Enables tracing of call batch initiation and completion. description: Enables tracing of call batch initiation and completion.
expiry: 2024/12/01 expiry: 2024/12/01

@ -98,6 +98,8 @@
default: false default: false
- name: tcp_rcv_lowat - name: tcp_rcv_lowat
default: false default: false
- name: time_caching_in_party
default: true
- name: trace_record_callops - name: trace_record_callops
default: true default: true
- name: unconstrained_max_quota_buffer_size - name: unconstrained_max_quota_buffer_size

@ -35,6 +35,7 @@
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/fork.h"
@ -116,6 +117,11 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope {
ExecCtx() ExecCtx()
: latent_see::ParentScope(GRPC_LATENT_SEE_METADATA("ExecCtx")), : latent_see::ParentScope(GRPC_LATENT_SEE_METADATA("ExecCtx")),
flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
#if !TARGET_OS_IPHONE
if (!IsTimeCachingInPartyEnabled()) {
time_cache_.emplace();
}
#endif
Fork::IncExecCtxCount(); Fork::IncExecCtxCount();
Set(this); Set(this);
} }
@ -126,6 +132,11 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope {
explicit ExecCtx(uintptr_t fl, latent_see::Metadata* latent_see_metadata) explicit ExecCtx(uintptr_t fl, latent_see::Metadata* latent_see_metadata)
: latent_see::ParentScope(latent_see_metadata), flags_(fl) { : latent_see::ParentScope(latent_see_metadata), flags_(fl) {
#if !TARGET_OS_IPHONE
if (!IsTimeCachingInPartyEnabled()) {
time_cache_.emplace();
}
#endif
if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
Fork::IncExecCtxCount(); Fork::IncExecCtxCount();
} }
@ -195,23 +206,18 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope {
Timestamp Now() { return Timestamp::Now(); } Timestamp Now() { return Timestamp::Now(); }
void InvalidateNow() { void InvalidateNow() {
#if !TARGET_OS_IPHONE if (time_cache_.has_value()) time_cache_->InvalidateCache();
time_cache_.InvalidateCache();
#endif
} }
void SetNowIomgrShutdown() { void SetNowIomgrShutdown() {
#if !TARGET_OS_IPHONE
// We get to do a test only set now on this path just because iomgr // We get to do a test only set now on this path just because iomgr
// is getting removed and no point adding more interfaces for it. // is getting removed and no point adding more interfaces for it.
time_cache_.TestOnlySetNow(Timestamp::InfFuture()); TestOnlySetNow(Timestamp::InfFuture());
#endif
} }
void TestOnlySetNow(Timestamp now) { void TestOnlySetNow(Timestamp now) {
#if !TARGET_OS_IPHONE if (!time_cache_.has_value()) time_cache_.emplace();
time_cache_.TestOnlySetNow(now); time_cache_->TestOnlySetNow(now);
#endif
} }
/// Gets pointer to current exec_ctx. /// Gets pointer to current exec_ctx.
@ -237,9 +243,7 @@ class GRPC_DLL ExecCtx : public latent_see::ParentScope {
CombinerData combiner_data_ = {nullptr, nullptr}; CombinerData combiner_data_ = {nullptr, nullptr};
uintptr_t flags_; uintptr_t flags_;
#if !TARGET_OS_IPHONE absl::optional<ScopedTimeCache> time_cache_;
ScopedTimeCache time_cache_;
#endif
#if !defined(_WIN32) || !defined(_DLL) #if !defined(_WIN32) || !defined(_DLL)
static thread_local ExecCtx* exec_ctx_; static thread_local ExecCtx* exec_ctx_;

@ -269,6 +269,12 @@ void Party::RunPartyAndUnref(uint64_t prev_state) {
DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u) DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u)
<< "Party should have contained no wakeups on lock"; << "Party should have contained no wakeups on lock";
prev_state |= kLocked; prev_state |= kLocked;
absl::optional<ScopedTimeCache> time_cache;
#if !TARGET_OS_IPHONE
if (IsTimeCachingInPartyEnabled()) {
time_cache.emplace();
}
#endif
for (;;) { for (;;) {
uint64_t keep_allocated_mask = kAllocatedMask; uint64_t keep_allocated_mask = kAllocatedMask;
// For each wakeup bit... // For each wakeup bit...

@ -47,6 +47,8 @@ static void cb(void* arg, grpc_error_handle error) {
} }
static void add_test(void) { static void add_test(void) {
if (grpc_core::IsTimeCachingInPartyEnabled()) return;
int i; int i;
grpc_timer timers[20]; grpc_timer timers[20];
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
@ -116,6 +118,8 @@ static void add_test(void) {
// Cleaning up a list with pending timers. // Cleaning up a list with pending timers.
void destruction_test(void) { void destruction_test(void) {
if (grpc_core::IsTimeCachingInPartyEnabled()) return;
grpc_timer timers[5]; grpc_timer timers[5];
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
@ -173,6 +177,8 @@ void destruction_test(void) {
// 4) Shuts down the timer list // 4) Shuts down the timer list
// https://github.com/grpc/grpc/issues/15904 // https://github.com/grpc/grpc/issues/15904
void long_running_service_cleanup_test(void) { void long_running_service_cleanup_test(void) {
if (grpc_core::IsTimeCachingInPartyEnabled()) return;
grpc_timer timers[4]; grpc_timer timers[4];
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;

@ -2492,6 +2492,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
creds_->AddResult(MakeToken("foo", kExpirationTime)); creds_->AddResult(MakeToken("foo", kExpirationTime));
// First request will trigger a fetch. // First request will trigger a fetch.
LOG(INFO) << "First request";
auto state = RequestMetadataState::NewInstance( auto state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true); absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
@ -2499,6 +2500,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) {
EXPECT_EQ(creds_->num_fetches(), 1); EXPECT_EQ(creds_->num_fetches(), 1);
// Second request while fetch is still outstanding will be delayed but // Second request while fetch is still outstanding will be delayed but
// will not trigger a new fetch. // will not trigger a new fetch.
LOG(INFO) << "Second request";
state = RequestMetadataState::NewInstance( state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/true); absl::OkStatus(), "authorization: foo", /*expect_delay=*/true);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
@ -2507,6 +2509,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) {
// Now tick to finish the fetch. // Now tick to finish the fetch.
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();
// Next request will be served from cache with no delay. // Next request will be served from cache with no delay.
LOG(INFO) << "Third request";
state = RequestMetadataState::NewInstance( state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/false); absl::OkStatus(), "authorization: foo", /*expect_delay=*/false);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
@ -2519,6 +2522,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) {
// Next request will trigger a new fetch but will still use the // Next request will trigger a new fetch but will still use the
// cached token. // cached token.
creds_->AddResult(MakeToken("bar")); creds_->AddResult(MakeToken("bar"));
LOG(INFO) << "Fourth request";
state = RequestMetadataState::NewInstance( state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: foo", /*expect_delay=*/false); absl::OkStatus(), "authorization: foo", /*expect_delay=*/false);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,
@ -2526,6 +2530,7 @@ TEST_F(TokenFetcherCredentialsTest, Basic) {
EXPECT_EQ(creds_->num_fetches(), 2); EXPECT_EQ(creds_->num_fetches(), 2);
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();
// Next request will use the new data. // Next request will use the new data.
LOG(INFO) << "Fifth request";
state = RequestMetadataState::NewInstance( state = RequestMetadataState::NewInstance(
absl::OkStatus(), "authorization: bar", /*expect_delay=*/false); absl::OkStatus(), "authorization: bar", /*expect_delay=*/false);
state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority, state->RunRequestMetadataTest(creds_.get(), kTestUrlScheme, kTestAuthority,

Loading…
Cancel
Save