fix retry code to fail batches instead of creating attempt if previously cancelled from surface (#27217)

* fix retry code to fail batches instead of creating attempt if previously cancelled from surface

* add xDS end2end tests covering the FI use-case that triggered the bug

* fix memory leak
pull/27224/head
Mark D. Roth 4 years ago committed by GitHub
parent ef182816f8
commit 2cb3831c83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      src/core/ext/filters/client_channel/retry_filter.cc
  2. 68
      test/cpp/end2end/xds_end2end_test.cc

@ -88,9 +88,7 @@
// TODO(roth): In subsequent PRs:
// - add support for transparent retries (including initial metadata)
// - figure out how to record stats in census for retries
// (census filter is on top of this one)
// - add census stats for retries
// - implement hedging
// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
@ -539,6 +537,8 @@ class RetryFilter::CallData {
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_error_handle cancelled_from_surface_ = GRPC_ERROR_NONE;
RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
// TODO(roth): As part of implementing hedging, we will need to maintain a
@ -2141,6 +2141,7 @@ RetryFilter::CallData::~CallData() {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
GPR_ASSERT(pending_batches_[i].batch == nullptr);
}
GRPC_ERROR_UNREF(cancelled_from_surface_);
}
void RetryFilter::CallData::StartTransportStreamOpBatch(
@ -2173,6 +2174,9 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
call_attempt_->CancelFromSurface(batch);
return;
}
// Save cancel_error in case subsequent batches are started.
GRPC_ERROR_UNREF(cancelled_from_surface_);
cancelled_from_surface_ = GRPC_ERROR_REF(cancel_error);
// Cancel retry timer.
if (retry_timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -2201,6 +2205,15 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
}
// If we do not yet have a call attempt, create one.
if (call_attempt_ == nullptr) {
// If we were previously cancelled from the surface, cancel this
// batch instead of creating a call attempt.
if (cancelled_from_surface_ != GRPC_ERROR_NONE) {
PendingBatchClear(pending);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_);
return;
}
// If there is no retry policy, then commit retries immediately.
// This ensures that the code below will always jump to the fast path.
// TODO(roth): Remove this special case when we implement

@ -12216,6 +12216,74 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) {
EXPECT_EQ(kMaxFault, num_delayed);
}
TEST_P(FaultInjectionTest, XdsFaultInjectionBidiStreamDelayOk) {
// kRpcTimeoutMilliseconds is 10s should never be reached.
const uint32_t kRpcTimeoutMilliseconds = grpc_test_slowdown_factor() * 10000;
const uint32_t kFixedDelaySeconds = 1;
const uint32_t kDelayPercentagePerHundred = 100;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Create an EDS resource
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args, DefaultEdsServiceName()));
// Construct the fault injection filter config
HTTPFault http_fault;
auto* delay_percentage = http_fault.mutable_delay()->mutable_percentage();
delay_percentage->set_numerator(kDelayPercentagePerHundred);
delay_percentage->set_denominator(FractionalPercent::HUNDRED);
auto* fixed_delay = http_fault.mutable_delay()->mutable_fixed_delay();
fixed_delay->set_seconds(kFixedDelaySeconds);
// Config fault injection via different setup
SetFilterConfig(http_fault);
ClientContext context;
context.set_deadline(
grpc_timeout_milliseconds_to_deadline(kRpcTimeoutMilliseconds));
auto stream = stub_->BidiStream(&context);
stream->WritesDone();
auto status = stream->Finish();
EXPECT_TRUE(status.ok()) << status.error_message() << ", "
<< status.error_details() << ", "
<< context.debug_error_string();
}
// This case catches a bug in the retry code that was triggered by a bad
// interaction with the FI code. See https://github.com/grpc/grpc/pull/27217
// for description.
TEST_P(FaultInjectionTest, XdsFaultInjectionBidiStreamDelayError) {
const uint32_t kRpcTimeoutMilliseconds = grpc_test_slowdown_factor() * 500;
const uint32_t kFixedDelaySeconds = 100;
const uint32_t kDelayPercentagePerHundred = 100;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Create an EDS resource
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args, DefaultEdsServiceName()));
// Construct the fault injection filter config
HTTPFault http_fault;
auto* delay_percentage = http_fault.mutable_delay()->mutable_percentage();
delay_percentage->set_numerator(kDelayPercentagePerHundred);
delay_percentage->set_denominator(FractionalPercent::HUNDRED);
auto* fixed_delay = http_fault.mutable_delay()->mutable_fixed_delay();
fixed_delay->set_seconds(kFixedDelaySeconds);
// Config fault injection via different setup
SetFilterConfig(http_fault);
ClientContext context;
context.set_deadline(
grpc_timeout_milliseconds_to_deadline(kRpcTimeoutMilliseconds));
auto stream = stub_->BidiStream(&context);
stream->WritesDone();
auto status = stream->Finish();
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, status.error_code())
<< status.error_message() << ", " << status.error_details() << ", "
<< context.debug_error_string();
}
class BootstrapSourceTest : public XdsEnd2endTest {
public:
BootstrapSourceTest() : XdsEnd2endTest(4, 1) {}

Loading…
Cancel
Save