pull/36477/head
Craig Tiller 12 months ago
parent 729789c161
commit 6c99aaabc4
  1. 12
      src/core/client_channel/client_channel_filter.cc
  2. 1
      src/core/client_channel/client_channel_filter.h
  3. 2
      src/core/lib/surface/call.cc
  4. 34
      test/core/end2end/end2end_tests.h
  5. 8
      test/core/end2end/tests/http2_stats.cc

@ -3054,7 +3054,6 @@ ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
: LoadBalancedCall(chand, args.context, std::move(on_commit),
is_transparent_retry),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
@ -3354,8 +3353,12 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::
// Get status from error.
grpc_status_code code;
std::string message;
grpc_error_get_status(error, self->deadline_, &code, &message,
/*http_error=*/nullptr, /*error_string=*/nullptr);
grpc_error_get_status(
error,
static_cast<Call*>(self->call_context()[GRPC_CONTEXT_CALL].value)
->deadline(),
&code, &message,
/*http_error=*/nullptr, /*error_string=*/nullptr);
status = absl::Status(static_cast<absl::StatusCode>(code), message);
} else {
// Get status from headers.
@ -3493,7 +3496,8 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
GPR_ASSERT(path != nullptr);
SubchannelCall::Args call_args = {
connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
deadline_, arena_,
static_cast<Call*>(call_context()[GRPC_CONTEXT_CALL].value)->deadline(),
arena_,
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
call_context(), call_combiner_};

@ -558,7 +558,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final
// TODO(roth): Instead of duplicating these fields in every filter
// that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead.
Timestamp deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;

@ -925,6 +925,7 @@ void FilterStackCall::CancelWithError(grpc_error_handle error) {
}
ClearPeerString();
InternalRef("termination");
ResetDeadline();
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
@ -946,6 +947,7 @@ void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
gpr_log(GPR_INFO, "set_final_status %s %s", is_client() ? "CLI" : "SVR",
StatusToString(error).c_str());
}
ResetDeadline();
if (is_client()) {
std::string status_details;
grpc_error_get_status(error, send_deadline(), final_op_.client.status,

@ -181,6 +181,36 @@ class CoreEnd2endTest : public ::testing::Test {
void* p;
};
// Safe notification to use for core e2e tests.
// Since when we're fuzzing we don't run background threads, the normal
// Notification type isn't safe to wait on (for some background timer to fire
// for instance...), consequently we need to use this.
class TestNotification {
public:
explicit TestNotification(CoreEnd2endTest* test) : test_(test) {}
void WaitForNotificationWithTimeout(absl::Duration wait_time) {
if (g_is_fuzzing_core_e2e_tests) {
Timestamp end = Timestamp::Now() + Duration::NanosecondsRoundUp(
ToInt64Nanoseconds(wait_time));
while (true) {
if (base_.HasBeenNotified()) return;
auto now = Timestamp::Now();
if (now >= end) return;
test_->step_fn_(now - end);
}
} else {
base_.WaitForNotificationWithTimeout(wait_time);
}
}
void Notify() { base_.Notify(); }
private:
Notification base_;
CoreEnd2endTest* const test_;
};
// CallBuilder - results in a call to either grpc_channel_create_call or
// grpc_channel_create_registered_call.
// Affords a fluent interface to specify optional arguments.
@ -753,7 +783,9 @@ class CoreEnd2endTest : public ::testing::Test {
cq_,
g_is_fuzzing_core_e2e_tests ? CqVerifier::FailUsingGprCrashWithStdio
: CqVerifier::FailUsingGprCrash,
std::move(step_fn_));
[this](grpc_event_engine::experimental::EventEngine::Duration d) {
step_fn_(d);
});
}
return *cq_verifier_;
}

@ -57,8 +57,8 @@ namespace grpc_core {
namespace {
Mutex* g_mu;
Notification* g_client_call_ended_notify;
Notification* g_server_call_ended_notify;
CoreEnd2endTest::TestNotification* g_client_call_ended_notify;
CoreEnd2endTest::TestNotification* g_server_call_ended_notify;
class FakeCallTracer : public ClientCallTracer {
public:
@ -197,8 +197,8 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled";
}
g_mu = new Mutex();
g_client_call_ended_notify = new Notification();
g_server_call_ended_notify = new Notification();
g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
GlobalStatsPluginRegistry::RegisterStatsPlugin(
std::make_shared<NewFakeStatsPlugin>());
auto send_from_client = RandomSlice(10);

Loading…
Cancel
Save