From 6c99aaabc4fbc9e741fa010df368ecf974eb9fb8 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Apr 2024 19:41:18 +0000 Subject: [PATCH] fix --- .../client_channel/client_channel_filter.cc | 12 ++++--- .../client_channel/client_channel_filter.h | 1 - src/core/lib/surface/call.cc | 2 ++ test/core/end2end/end2end_tests.h | 34 ++++++++++++++++++- test/core/end2end/tests/http2_stats.cc | 8 ++--- 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index ffce158d1bd..215fb55f7a6 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -3054,7 +3054,6 @@ ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( absl::AnyInvocable on_commit, bool is_transparent_retry) : LoadBalancedCall(chand, args.context, std::move(on_commit), is_transparent_retry), - deadline_(args.deadline), arena_(args.arena), owning_call_(args.call_stack), call_combiner_(args.call_combiner), @@ -3354,8 +3353,12 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall:: // Get status from error. grpc_status_code code; std::string message; - grpc_error_get_status(error, self->deadline_, &code, &message, - /*http_error=*/nullptr, /*error_string=*/nullptr); + grpc_error_get_status( + error, + static_cast(self->call_context()[GRPC_CONTEXT_CALL].value) + ->deadline(), + &code, &message, + /*http_error=*/nullptr, /*error_string=*/nullptr); status = absl::Status(static_cast(code), message); } else { // Get status from headers. @@ -3493,7 +3496,8 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() { GPR_ASSERT(path != nullptr); SubchannelCall::Args call_args = { connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0, - deadline_, arena_, + static_cast(call_context()[GRPC_CONTEXT_CALL].value)->deadline(), + arena_, // TODO(roth): When we implement hedging support, we will probably // need to use a separate call context for each subchannel call. call_context(), call_combiner_}; diff --git a/src/core/client_channel/client_channel_filter.h b/src/core/client_channel/client_channel_filter.h index f9ae44b0d26..d9e9be9ae44 100644 --- a/src/core/client_channel/client_channel_filter.h +++ b/src/core/client_channel/client_channel_filter.h @@ -558,7 +558,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final // TODO(roth): Instead of duplicating these fields in every filter // that uses any one of them, we should store them in the call // context. This will save per-call memory overhead. - Timestamp deadline_; Arena* arena_; grpc_call_stack* owning_call_; CallCombiner* call_combiner_; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c969296e621..ea320b99e63 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -925,6 +925,7 @@ void FilterStackCall::CancelWithError(grpc_error_handle error) { } ClearPeerString(); InternalRef("termination"); + ResetDeadline(); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent @@ -946,6 +947,7 @@ void FilterStackCall::SetFinalStatus(grpc_error_handle error) { gpr_log(GPR_INFO, "set_final_status %s %s", is_client() ? "CLI" : "SVR", StatusToString(error).c_str()); } + ResetDeadline(); if (is_client()) { std::string status_details; grpc_error_get_status(error, send_deadline(), final_op_.client.status, diff --git a/test/core/end2end/end2end_tests.h b/test/core/end2end/end2end_tests.h index 737badb909b..7ef307377b7 100644 --- a/test/core/end2end/end2end_tests.h +++ b/test/core/end2end/end2end_tests.h @@ -181,6 +181,36 @@ class CoreEnd2endTest : public ::testing::Test { void* p; }; + // Safe notification to use for core e2e tests. + // Since when we're fuzzing we don't run background threads, the normal + // Notification type isn't safe to wait on (for some background timer to fire + // for instance...), consequently we need to use this. + class TestNotification { + public: + explicit TestNotification(CoreEnd2endTest* test) : test_(test) {} + + void WaitForNotificationWithTimeout(absl::Duration wait_time) { + if (g_is_fuzzing_core_e2e_tests) { + Timestamp end = Timestamp::Now() + Duration::NanosecondsRoundUp( + ToInt64Nanoseconds(wait_time)); + while (true) { + if (base_.HasBeenNotified()) return; + auto now = Timestamp::Now(); + if (now >= end) return; + test_->step_fn_(now - end); + } + } else { + base_.WaitForNotificationWithTimeout(wait_time); + } + } + + void Notify() { base_.Notify(); } + + private: + Notification base_; + CoreEnd2endTest* const test_; + }; + // CallBuilder - results in a call to either grpc_channel_create_call or // grpc_channel_create_registered_call. // Affords a fluent interface to specify optional arguments. @@ -753,7 +783,9 @@ class CoreEnd2endTest : public ::testing::Test { cq_, g_is_fuzzing_core_e2e_tests ? CqVerifier::FailUsingGprCrashWithStdio : CqVerifier::FailUsingGprCrash, - std::move(step_fn_)); + [this](grpc_event_engine::experimental::EventEngine::Duration d) { + step_fn_(d); + }); } return *cq_verifier_; } diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc index 482024f46e8..f0421d67dd0 100644 --- a/test/core/end2end/tests/http2_stats.cc +++ b/test/core/end2end/tests/http2_stats.cc @@ -57,8 +57,8 @@ namespace grpc_core { namespace { Mutex* g_mu; -Notification* g_client_call_ended_notify; -Notification* g_server_call_ended_notify; +CoreEnd2endTest::TestNotification* g_client_call_ended_notify; +CoreEnd2endTest::TestNotification* g_server_call_ended_notify; class FakeCallTracer : public ClientCallTracer { public: @@ -197,8 +197,8 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) { GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled"; } g_mu = new Mutex(); - g_client_call_ended_notify = new Notification(); - g_server_call_ended_notify = new Notification(); + g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this); + g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this); GlobalStatsPluginRegistry::RegisterStatsPlugin( std::make_shared()); auto send_from_client = RandomSlice(10);