Implement transparent retries (#28548)

* transport: add error attributes indicating stream network state

* add missing case

* transparent retries

* don't use backoff timer for transparent retries

* fix build

* add retry_transparent_not_sent_on_wire test

* add retry_transparent_goaway test

* remove special case to short-circuit retry code if no retry policy

* clang-format

* buildifier

* simplify logic a bit

* get StreamNetworkState from metadata instead of error

* fix api_fuzzer to always start recv_trailing_metadata_ready

* clang-format

* fix test flakiness in proxy fixture

* add test with MAX_CONCURRENT_STREAMS

* don't transparently retry if we're already committed

* fix test to not reuse byte buffers

* clang-format

* disable retries for streams_not_seen_test
pull/28843/head^2
Mark D. Roth 3 years ago committed by GitHub
parent 1c8073828c
commit 7f8f3dc001
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      CMakeLists.txt
  2. 6
      build_autogenerated.yaml
  3. 3
      gRPC-Core.podspec
  4. 6
      grpc.gyp
  5. 178
      src/core/ext/filters/client_channel/retry_filter.cc
  6. 24
      test/core/end2end/end2end_nosec_tests.cc
  7. 24
      test/core/end2end/end2end_tests.cc
  8. 34
      test/core/end2end/fuzzers/api_fuzzer.cc
  9. 11
      test/core/end2end/generate_tests.bzl
  10. 2
      test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc
  11. 2
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  12. 28
      test/core/end2end/tests/retry_send_op_fails.cc
  13. 379
      test/core/end2end/tests/retry_transparent_goaway.cc
  14. 368
      test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
  15. 378
      test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
  16. 4
      test/core/transport/chttp2/streams_not_seen_test.cc

6
CMakeLists.txt generated

@ -1191,6 +1191,9 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
test/core/end2end/tests/retry_throttled.cc
test/core/end2end/tests/retry_too_many_attempts.cc
test/core/end2end/tests/retry_transparent_goaway.cc
test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
test/core/end2end/tests/server_finishes_request.cc
test/core/end2end/tests/server_streaming.cc
test/core/end2end/tests/shutdown_finishes_calls.cc
@ -1333,6 +1336,9 @@ add_library(end2end_tests
test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
test/core/end2end/tests/retry_throttled.cc
test/core/end2end/tests/retry_too_many_attempts.cc
test/core/end2end/tests/retry_transparent_goaway.cc
test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
test/core/end2end/tests/sdk_authz.cc
test/core/end2end/tests/server_finishes_request.cc
test/core/end2end/tests/server_streaming.cc

@ -112,6 +112,9 @@ libs:
- test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
- test/core/end2end/tests/retry_throttled.cc
- test/core/end2end/tests/retry_too_many_attempts.cc
- test/core/end2end/tests/retry_transparent_goaway.cc
- test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
- test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
- test/core/end2end/tests/server_finishes_request.cc
- test/core/end2end/tests/server_streaming.cc
- test/core/end2end/tests/shutdown_finishes_calls.cc
@ -245,6 +248,9 @@ libs:
- test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
- test/core/end2end/tests/retry_throttled.cc
- test/core/end2end/tests/retry_too_many_attempts.cc
- test/core/end2end/tests/retry_transparent_goaway.cc
- test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
- test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
- test/core/end2end/tests/sdk_authz.cc
- test/core/end2end/tests/server_finishes_request.cc
- test/core/end2end/tests/server_streaming.cc

3
gRPC-Core.podspec generated

@ -2500,6 +2500,9 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc',
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/retry_transparent_goaway.cc',
'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc',
'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc',
'test/core/end2end/tests/sdk_authz.cc',
'test/core/end2end/tests/server_finishes_request.cc',
'test/core/end2end/tests/server_streaming.cc',

6
grpc.gyp generated

@ -265,6 +265,9 @@
'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc',
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/retry_transparent_goaway.cc',
'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc',
'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc',
'test/core/end2end/tests/server_finishes_request.cc',
'test/core/end2end/tests/server_streaming.cc',
'test/core/end2end/tests/shutdown_finishes_calls.cc',
@ -375,6 +378,9 @@
'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc',
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/retry_transparent_goaway.cc',
'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc',
'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc',
'test/core/end2end/tests/sdk_authz.cc',
'test/core/end2end/tests/server_finishes_request.cc',
'test/core/end2end/tests/server_streaming.cc',

@ -85,7 +85,6 @@
// which batches need to be sent on the LB call for a given attempt.
// TODO(roth): In subsequent PRs:
// - add support for transparent retries (including initial metadata)
// - implement hedging
// By default, we buffer 256 KiB per RPC for retries.
@ -209,7 +208,7 @@ class RetryFilter::CallData {
// State associated with each call attempt.
class CallAttempt : public RefCounted<CallAttempt> {
public:
explicit CallAttempt(CallData* calld);
CallAttempt(CallData* calld, bool is_transparent_retry);
~CallAttempt() override;
bool lb_call_committed() const { return lb_call_committed_; }
@ -395,7 +394,7 @@ class RetryFilter::CallData {
void MaybeSwitchToFastPath();
// Returns true if the call should be retried.
bool ShouldRetry(absl::optional<grpc_status_code> status, bool is_lb_drop,
bool ShouldRetry(absl::optional<grpc_status_code> status,
absl::optional<grpc_millis> server_pushback_ms);
// Abandons the call attempt. Unrefs any deferred batches.
@ -512,10 +511,15 @@ class RetryFilter::CallData {
static void OnRetryTimer(void* arg, grpc_error_handle error);
static void OnRetryTimerLocked(void* arg, grpc_error_handle error);
// Adds a closure to closures to start a transparent retry.
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
static void StartTransparentRetry(void* arg, grpc_error_handle error);
OrphanablePtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
void CreateCallAttempt();
void CreateCallAttempt(bool is_transparent_retry);
RetryFilter* chand_;
grpc_polling_entity* pollent_;
@ -558,6 +562,8 @@ class RetryFilter::CallData {
// Retry state.
bool retry_committed_ : 1;
bool retry_timer_pending_ : 1;
bool retry_codepath_started_ : 1;
bool sent_transparent_retry_not_seen_by_server_ : 1;
int num_attempts_completed_ = 0;
grpc_timer retry_timer_;
grpc_closure retry_closure_;
@ -650,7 +656,8 @@ class RetryFilter::CallData::CallStackDestructionBarrier
// RetryFilter::CallData::CallAttempt
//
RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
bool is_transparent_retry)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt"
: nullptr),
calld_(calld),
@ -667,7 +674,8 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
sent_cancel_stream_(false),
seen_recv_trailing_metadata_from_surface_(false),
abandoned_(false) {
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_);
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_,
is_transparent_retry);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: create lb_call=%p",
calld->chand_, calld, this, lb_call_.get());
@ -1070,11 +1078,8 @@ void RetryFilter::CallData::CallAttempt::CancelFromSurface(
}
bool RetryFilter::CallData::CallAttempt::ShouldRetry(
absl::optional<grpc_status_code> status, bool is_lb_drop,
absl::optional<grpc_status_code> status,
absl::optional<grpc_millis> server_pushback_ms) {
// LB drops always inhibit retries.
if (is_lb_drop) return false;
// TODO(roth): Handle transparent retries here.
// If no retry policy, don't retry.
if (calld_->retry_policy_ == nullptr) return false;
// Check status.
@ -1238,9 +1243,8 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED),
&closures);
// Check whether we should retry.
if (call_attempt->ShouldRetry(
/*status=*/absl::nullopt, /*is_lb_drop=*/false,
/*server_pushback_ms=*/absl::nullopt)) {
if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
/*server_pushback_ms=*/absl::nullopt)) {
// Mark current attempt as abandoned.
call_attempt->Abandon();
// We are retrying. Start backoff timer.
@ -1542,10 +1546,11 @@ namespace {
// Sets *status, *server_pushback_ms, and *is_lb_drop based on md_batch
// and error.
void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
grpc_error_handle error, grpc_status_code* status,
absl::optional<grpc_millis>* server_pushback_ms,
bool* is_lb_drop) {
void GetCallStatus(
grpc_millis deadline, grpc_metadata_batch* md_batch,
grpc_error_handle error, grpc_status_code* status,
absl::optional<grpc_millis>* server_pushback_ms, bool* is_lb_drop,
absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) {
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
intptr_t value = 0;
@ -1555,8 +1560,9 @@ void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
}
} else {
*status = *md_batch->get(GrpcStatusMetadata());
*server_pushback_ms = md_batch->get(GrpcRetryPushbackMsMetadata());
}
*server_pushback_ms = md_batch->get(GrpcRetryPushbackMsMetadata());
*stream_network_state = md_batch->get(GrpcStreamNetworkState());
GRPC_ERROR_UNREF(error);
}
@ -1688,36 +1694,72 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
absl::optional<grpc_millis> server_pushback_ms;
bool is_lb_drop = false;
absl::optional<GrpcStreamNetworkState::ValueType> stream_network_state;
grpc_metadata_batch* md_batch =
batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
bool is_lb_drop = false;
GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_ms, &is_lb_drop);
&server_pushback_ms, &is_lb_drop, &stream_network_state);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(
GPR_INFO,
"chand=%p calld=%p attempt=%p: call finished, status=%s is_lb_drop=%d",
calld->chand_, calld, call_attempt, grpc_status_code_to_string(status),
is_lb_drop);
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: call finished, status=%s "
"server_pushback_ms=%s is_lb_drop=%d stream_network_state=%s",
calld->chand_, calld, call_attempt,
grpc_status_code_to_string(status),
server_pushback_ms.has_value()
? absl::StrCat(*server_pushback_ms).c_str()
: "N/A",
is_lb_drop,
stream_network_state.has_value()
? absl::StrCat(*stream_network_state).c_str()
: "N/A");
}
// Check if we should retry.
if (call_attempt->ShouldRetry(status, is_lb_drop, server_pushback_ms)) {
// Start retry timer.
calld->StartRetryTimer(server_pushback_ms);
// Cancel call attempt.
CallCombinerClosureList closures;
call_attempt->MaybeAddBatchForCancelOp(
error == GRPC_ERROR_NONE
? grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED)
: GRPC_ERROR_REF(error),
&closures);
// Record that this attempt has been abandoned.
call_attempt->Abandon();
// Yields call combiner.
closures.RunClosures(calld->call_combiner_);
return;
if (!is_lb_drop) { // Never retry on LB drops.
enum { kNoRetry, kTransparentRetry, kConfigurableRetry } retry = kNoRetry;
// Handle transparent retries.
if (stream_network_state.has_value() && !calld->retry_committed_) {
// If not sent on wire, then always retry.
// If sent on wire but not seen by server, retry exactly once.
if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) {
retry = kTransparentRetry;
} else if (*stream_network_state ==
GrpcStreamNetworkState::kNotSeenByServer &&
!calld->sent_transparent_retry_not_seen_by_server_) {
calld->sent_transparent_retry_not_seen_by_server_ = true;
retry = kTransparentRetry;
}
}
// If not transparently retrying, check for configurable retry.
if (retry == kNoRetry &&
call_attempt->ShouldRetry(status, server_pushback_ms)) {
retry = kConfigurableRetry;
}
// If we're retrying, do so.
if (retry != kNoRetry) {
CallCombinerClosureList closures;
// Cancel call attempt.
call_attempt->MaybeAddBatchForCancelOp(
error == GRPC_ERROR_NONE
? grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED)
: GRPC_ERROR_REF(error),
&closures);
// For transparent retries, add a closure to immediately start a new
// call attempt.
// For configurable retries, start retry timer.
if (retry == kTransparentRetry) {
calld->AddClosureToStartTransparentRetry(&closures);
} else {
calld->StartRetryTimer(server_pushback_ms);
}
// Record that this attempt has been abandoned.
call_attempt->Abandon();
// Yields call combiner.
closures.RunClosures(calld->call_combiner_);
return;
}
}
// Not retrying, so commit the call.
calld->RetryCommit(call_attempt);
@ -2087,7 +2129,9 @@ RetryFilter::CallData::CallData(RetryFilter* chand,
pending_send_message_(false),
pending_send_trailing_metadata_(false),
retry_committed_(false),
retry_timer_pending_(false) {}
retry_timer_pending_(false),
retry_codepath_started_(false),
sent_transparent_retry_not_seen_by_server_(false) {}
RetryFilter::CallData::~CallData() {
grpc_slice_unref_internal(path_);
@ -2100,6 +2144,10 @@ RetryFilter::CallData::~CallData() {
void RetryFilter::CallData::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from surface: %s",
chand_, this, grpc_transport_stream_op_batch_string(batch).c_str());
}
// If we have an LB call, delegate to the LB call.
if (committed_call_ != nullptr) {
// Note: This will release the call combiner.
@ -2168,11 +2216,6 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_);
return;
}
// If there is no retry policy, then commit retries immediately.
// This ensures that the code below will always jump to the fast path.
// TODO(roth): Remove this special case when we implement
// transparent retries.
if (retry_policy_ == nullptr) retry_committed_ = true;
// If this is the first batch and retries are already committed
// (e.g., if this batch put the call above the buffer size limit), then
// immediately create an LB call and delegate the batch to it. This
@ -2188,7 +2231,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
// We also skip this optimization if perAttemptRecvTimeout is set in the
// retry policy, because we need the code in CallAttempt to handle
// the associated timer.
if (num_attempts_completed_ == 0 && retry_committed_ &&
if (!retry_codepath_started_ && retry_committed_ &&
(retry_policy_ == nullptr ||
!retry_policy_->per_attempt_recv_timeout().has_value())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -2202,7 +2245,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
static_cast<ClientChannelServiceConfigCallData*>(
call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
committed_call_ = CreateLoadBalancedCall(
service_config_call_data->call_dispatch_controller());
service_config_call_data->call_dispatch_controller(),
/*is_transparent_retry=*/false);
committed_call_->StartTransportStreamOpBatch(batch);
return;
}
@ -2213,7 +2257,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
this);
}
CreateCallAttempt();
retry_codepath_started_ = true;
CreateCallAttempt(/*is_transparent_retry=*/false);
return;
}
// Send batches to call attempt.
@ -2226,7 +2271,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
OrphanablePtr<ClientChannel::LoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller) {
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) {
grpc_call_element_args args = {owning_call_, nullptr, call_context_,
path_, /*start_time=*/0, deadline_,
arena_, call_combiner_};
@ -2235,13 +2281,11 @@ RetryFilter::CallData::CreateLoadBalancedCall(
// This callback holds a ref to the CallStackDestructionBarrier
// object until the LB call is destroyed.
call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
call_dispatch_controller,
// TODO(roth): Change this when we support transparent retries.
/*is_transparent_retry=*/false);
call_dispatch_controller, is_transparent_retry);
}
void RetryFilter::CallData::CreateCallAttempt() {
call_attempt_ = MakeRefCounted<CallAttempt>(this);
void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) {
call_attempt_ = MakeRefCounted<CallAttempt>(this, is_transparent_retry);
call_attempt_->StartRetriableBatches();
}
@ -2533,13 +2577,29 @@ void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
auto* calld = static_cast<CallData*>(arg);
if (error == GRPC_ERROR_NONE && calld->retry_timer_pending_) {
calld->retry_timer_pending_ = false;
calld->CreateCallAttempt();
calld->CreateCallAttempt(/*is_transparent_retry=*/false);
} else {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "retry timer cancelled");
}
GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
}
void RetryFilter::CallData::AddClosureToStartTransparentRetry(
CallCombinerClosureList* closures) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: scheduling transparent retry", chand_,
this);
}
GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr);
closures->Add(&retry_closure_, GRPC_ERROR_NONE, "start transparent retry");
}
void RetryFilter::CallData::StartTransparentRetry(void* arg,
grpc_error_handle /*error*/) {
auto* calld = static_cast<CallData*>(arg);
calld->CreateCallAttempt(/*is_transparent_retry=*/true);
}
} // namespace
const grpc_channel_filter kRetryFilterVtable = {

@ -177,6 +177,12 @@ extern void retry_throttled(grpc_end2end_test_config config);
extern void retry_throttled_pre_init(void);
extern void retry_too_many_attempts(grpc_end2end_test_config config);
extern void retry_too_many_attempts_pre_init(void);
extern void retry_transparent_goaway(grpc_end2end_test_config config);
extern void retry_transparent_goaway_pre_init(void);
extern void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config);
extern void retry_transparent_max_concurrent_streams_pre_init(void);
extern void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config);
extern void retry_transparent_not_sent_on_wire_pre_init(void);
extern void server_finishes_request(grpc_end2end_test_config config);
extern void server_finishes_request_pre_init(void);
extern void server_streaming(grpc_end2end_test_config config);
@ -279,6 +285,9 @@ void grpc_end2end_tests_pre_init(void) {
retry_streaming_succeeds_before_replay_finished_pre_init();
retry_throttled_pre_init();
retry_too_many_attempts_pre_init();
retry_transparent_goaway_pre_init();
retry_transparent_max_concurrent_streams_pre_init();
retry_transparent_not_sent_on_wire_pre_init();
server_finishes_request_pre_init();
server_streaming_pre_init();
shutdown_finishes_calls_pre_init();
@ -375,6 +384,9 @@ void grpc_end2end_tests(int argc, char **argv,
retry_streaming_succeeds_before_replay_finished(config);
retry_throttled(config);
retry_too_many_attempts(config);
retry_transparent_goaway(config);
retry_transparent_max_concurrent_streams(config);
retry_transparent_not_sent_on_wire(config);
server_finishes_request(config);
server_streaming(config);
shutdown_finishes_calls(config);
@ -687,6 +699,18 @@ void grpc_end2end_tests(int argc, char **argv,
retry_too_many_attempts(config);
continue;
}
if (0 == strcmp("retry_transparent_goaway", argv[i])) {
retry_transparent_goaway(config);
continue;
}
if (0 == strcmp("retry_transparent_max_concurrent_streams", argv[i])) {
retry_transparent_max_concurrent_streams(config);
continue;
}
if (0 == strcmp("retry_transparent_not_sent_on_wire", argv[i])) {
retry_transparent_not_sent_on_wire(config);
continue;
}
if (0 == strcmp("server_finishes_request", argv[i])) {
server_finishes_request(config);
continue;

@ -179,6 +179,12 @@ extern void retry_throttled(grpc_end2end_test_config config);
extern void retry_throttled_pre_init(void);
extern void retry_too_many_attempts(grpc_end2end_test_config config);
extern void retry_too_many_attempts_pre_init(void);
extern void retry_transparent_goaway(grpc_end2end_test_config config);
extern void retry_transparent_goaway_pre_init(void);
extern void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config);
extern void retry_transparent_max_concurrent_streams_pre_init(void);
extern void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config);
extern void retry_transparent_not_sent_on_wire_pre_init(void);
extern void sdk_authz(grpc_end2end_test_config config);
extern void sdk_authz_pre_init(void);
extern void server_finishes_request(grpc_end2end_test_config config);
@ -284,6 +290,9 @@ void grpc_end2end_tests_pre_init(void) {
retry_streaming_succeeds_before_replay_finished_pre_init();
retry_throttled_pre_init();
retry_too_many_attempts_pre_init();
retry_transparent_goaway_pre_init();
retry_transparent_max_concurrent_streams_pre_init();
retry_transparent_not_sent_on_wire_pre_init();
sdk_authz_pre_init();
server_finishes_request_pre_init();
server_streaming_pre_init();
@ -382,6 +391,9 @@ void grpc_end2end_tests(int argc, char **argv,
retry_streaming_succeeds_before_replay_finished(config);
retry_throttled(config);
retry_too_many_attempts(config);
retry_transparent_goaway(config);
retry_transparent_max_concurrent_streams(config);
retry_transparent_not_sent_on_wire(config);
sdk_authz(config);
server_finishes_request(config);
server_streaming(config);
@ -699,6 +711,18 @@ void grpc_end2end_tests(int argc, char **argv,
retry_too_many_attempts(config);
continue;
}
if (0 == strcmp("retry_transparent_goaway", argv[i])) {
retry_transparent_goaway(config);
continue;
}
if (0 == strcmp("retry_transparent_max_concurrent_streams", argv[i])) {
retry_transparent_max_concurrent_streams(config);
continue;
}
if (0 == strcmp("retry_transparent_not_sent_on_wire", argv[i])) {
retry_transparent_not_sent_on_wire(config);
continue;
}
if (0 == strcmp("sdk_authz", argv[i])) {
sdk_authz(config);
continue;

@ -340,6 +340,16 @@ class Call : public std::enable_shared_from_this<Call> {
void Shutdown() {
if (call_ != nullptr) {
grpc_call_cancel(call_, nullptr);
if (type_ == CallType::CLIENT && !started_recv_status_on_client_) {
uint8_t has_ops = 0;
grpc_op op = MakeRecvStatusOnClientOp(&has_ops);
auto* v = FinishedBatchValidator(has_ops);
grpc_call_error error =
grpc_call_start_batch(call_, &op, 1, v, nullptr);
if (error != GRPC_CALL_OK) {
v->Run(false);
}
}
type_ = CallType::TOMBSTONED;
}
}
@ -481,12 +491,9 @@ class Call : public std::enable_shared_from_this<Call> {
}
break;
case api_fuzzer::BatchOp::kReceiveStatusOnClient:
op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op.data.recv_status_on_client.status = &status_;
op.data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_;
op.data.recv_status_on_client.status_details = &recv_status_details_;
*batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
op = MakeRecvStatusOnClientOp(batch_ops);
unwinders->push_back(
[this]() { started_recv_status_on_client_ = false; });
break;
case api_fuzzer::BatchOp::kReceiveCloseOnServer:
op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
@ -538,6 +545,18 @@ class Call : public std::enable_shared_from_this<Call> {
}
private:
grpc_op MakeRecvStatusOnClientOp(uint8_t* batch_ops) {
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op.data.recv_status_on_client.status = &status_;
op.data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_;
op.data.recv_status_on_client.status_details = &recv_status_details_;
*batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
started_recv_status_on_client_ = true;
return op;
}
CallType type_;
grpc_call* call_ = nullptr;
grpc_byte_buffer* recv_message_ = nullptr;
@ -553,8 +572,9 @@ class Call : public std::enable_shared_from_this<Call> {
bool enqueued_recv_initial_metadata_ = false;
grpc_call_details call_details_{};
grpc_byte_buffer* send_message_ = nullptr;
bool call_closed_ = false;
bool pending_recv_message_op_ = false;
bool started_recv_status_on_client_ = false;
bool call_closed_ = false;
std::vector<void*> free_pointers_;
std::vector<grpc_slice> unref_slices_;

@ -355,6 +355,17 @@ END2END_TESTS = {
),
"retry_throttled": _test_options(needs_client_channel = True),
"retry_too_many_attempts": _test_options(needs_client_channel = True),
"retry_transparent_goaway": _test_options(needs_client_channel = True),
"retry_transparent_not_sent_on_wire": _test_options(
needs_client_channel = True,
),
"retry_transparent_max_concurrent_streams": _test_options(
needs_client_channel = True,
proxyable = False,
# TODO(jtattermusch): too long bazel test name makes the test flaky on Windows RBE
# See b/151617965
short_name = "retry_transparent_mcs",
),
"sdk_authz": _test_options(secure = True),
"server_finishes_request": _test_options(),
"server_streaming": _test_options(needs_http2 = True),

@ -240,7 +240,7 @@ class FailSendOpsFilter {
public:
static grpc_channel_filter kFilterVtable;
public:
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,

@ -267,7 +267,7 @@ class InjectStatusFilter {
public:
static grpc_channel_filter kFilterVtable;
public:
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,

@ -277,14 +277,14 @@ static void test_retry_send_op_fails(grpc_end2end_test_config config) {
namespace {
// A filter that, for the first call it sees, will fail the batch
// containing send_initial_metadata and then fail the call with status
// ABORTED. All subsequent calls are allowed through without failures.
class FailFirstSendOpFilter {
// A filter that, for the first call it sees, will fail all batches except
// for cancellations, so that the call fails with status ABORTED.
// All subsequent calls are allowed through without failures.
class FailFirstCallFilter {
public:
static grpc_channel_filter kFilterVtable;
public:
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
@ -302,7 +302,7 @@ class FailFirstSendOpFilter {
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
if (!chand->seen_first_) {
chand->seen_first_ = true;
@ -312,7 +312,7 @@ class FailFirstSendOpFilter {
grpc_transport_stream_op_batch_finish_with_failure(
batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"FailFirstSendOpFilter failing batch"),
"FailFirstCallFilter failing batch"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED),
calld->call_combiner_);
return;
@ -330,19 +330,19 @@ class FailFirstSendOpFilter {
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) FailFirstSendOpFilter();
new (elem->channel_data) FailFirstCallFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
chand->~FailFirstSendOpFilter();
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
chand->~FailFirstCallFilter();
}
bool seen_first_ = false;
};
grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
@ -350,11 +350,11 @@ grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(FailFirstSendOpFilter),
sizeof(FailFirstCallFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"FailFirstSendOpFilter",
"FailFirstCallFilter",
};
} // namespace
@ -374,7 +374,7 @@ void retry_send_op_fails(grpc_end2end_test_config config) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstSendOpFilter::kFilterVtable,
builder->PrependFilter(&FailFirstCallFilter::kFilterVtable,
nullptr);
return true;
});

@ -0,0 +1,379 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/error_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests transparent retries when the call was never sent out on the wire.
static void test_retry_transparent_goaway(grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
char* peer;
grpc_end2end_test_fixture f =
begin_test(config, "retry_transparent_goaway", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
peer = grpc_call_get_peer(c);
GPR_ASSERT(peer != nullptr);
gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
gpr_free(peer);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
grpc_slice status_details = grpc_slice_from_static_string("xyz");
// Start a batch containing send ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Start a batch containing recv ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Client send ops should now complete.
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
// Server should get a call.
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
// Server receives the request.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
cq_verify(cqv);
// Server sends a response with status OK.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// In principle, the server batch should complete before the client
// recv ops batch, but in the proxy fixtures, there are multiple threads
// involved, so the completion order tends to be a little racy.
CQ_EXPECT_COMPLETION(cqv, tag(103), true);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(
request_metadata_recv.metadata[i].key,
grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
namespace {
// A filter that, for the first call it sees, will fail all batches except
// for cancellations, so that the call fails with an error whose
// StreamNetworkState is kNotSeenByServer.
// All subsequent calls are allowed through without failures.
class FailFirstCallFilter {
public:
static grpc_channel_filter kFilterVtable;
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
if (!chand->seen_call_) {
calld->fail_ = true;
chand->seen_call_ = true;
}
if (calld->fail_) {
if (batch->recv_trailing_metadata) {
batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
}
if (!batch->cancel_stream) {
grpc_transport_stream_op_batch_finish_with_failure(
batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"FailFirstCallFilter failing batch"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
calld->call_combiner_);
return;
}
}
grpc_call_next_op(elem, batch);
}
private:
explicit CallData(const grpc_call_element_args* args)
: call_combiner_(args->call_combiner) {}
grpc_core::CallCombiner* call_combiner_;
bool fail_ = false;
};
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) FailFirstCallFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
chand->~FailFirstCallFilter();
}
bool seen_call_ = false;
};
grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(FailFirstCallFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"FailFirstCallFilter",
};
} // namespace
void retry_transparent_goaway(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstCallFilter::kFilterVtable,
nullptr);
return true;
});
},
[config] { test_retry_transparent_goaway(config); });
}
void retry_transparent_goaway_pre_init(void) {}

@ -0,0 +1,368 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/error_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests transparent retries when the call was never sent out on the wire.
// This is similar to retry_transparent_not_sent_on_wire, except that
// instead of simulating the response with a filter, we actually have
// the transport behave the right way. We create a server with
// MAX_CONCURRENT_STREAMS set to 1. We start a call on the server, and
// then start a second call, which will get queued in the transport.
// Then, before the first call finishes, the server is shut down and
// restarted. The second call will fail in that transport instance and
// will be transparently retried after the server starts up again.
static void test_retry_transparent_max_concurrent_streams(
grpc_end2end_test_config config) {
grpc_op ops[6];
grpc_op* op;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_slice status_details = grpc_slice_from_static_string("xyz");
grpc_call_error error;
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS), 1);
grpc_channel_args server_args = {1, &arg};
grpc_end2end_test_fixture f =
begin_test(config, "retry_transparent_max_concurrent_streams", nullptr,
&server_args);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
// Client starts a call.
grpc_call* c =
grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array_init(&initial_metadata_recv);
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_status_code status;
grpc_slice details;
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server should get a call.
grpc_call* s;
grpc_metadata_array request_metadata_recv;
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details call_details;
grpc_call_details_init(&call_details);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_call_details_destroy(&call_details);
grpc_metadata_array_destroy(&request_metadata_recv);
// Client starts a second call.
// We set wait_for_ready for this call, so that if it retries before
// the server comes back up, it stays pending.
grpc_call* c2 =
grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c2);
grpc_byte_buffer* request_payload2 =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_metadata_array initial_metadata_recv2;
grpc_metadata_array_init(&initial_metadata_recv2);
grpc_byte_buffer* response_payload_recv2 = nullptr;
grpc_metadata_array trailing_metadata_recv2;
grpc_metadata_array_init(&trailing_metadata_recv2);
grpc_status_code status2;
grpc_slice details2;
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload2;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv2;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv2;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
op->data.recv_status_on_client.status = &status2;
op->data.recv_status_on_client.status_details = &details2;
op++;
error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops), tag(2),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Start server shutdown.
grpc_server_shutdown_and_notify(f.server, f.cq, tag(102));
// Server handles the first call.
grpc_byte_buffer* request_payload_recv = nullptr;
int was_cancelled = 2;
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server completes first call and shutdown.
// Client completes first call.
CQ_EXPECT_COMPLETION(cqv, tag(103), true);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
// Clean up from first call.
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
grpc_byte_buffer_destroy(request_payload_recv);
GPR_ASSERT(was_cancelled == 0);
grpc_byte_buffer_destroy(response_payload);
grpc_call_unref(s);
grpc_byte_buffer_destroy(request_payload);
grpc_metadata_array_destroy(&initial_metadata_recv);
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
grpc_byte_buffer_destroy(response_payload_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
grpc_slice_unref(details);
grpc_call_unref(c);
// Destroy server and then restart it.
grpc_server_destroy(f.server);
f.server = nullptr;
config.init_server(&f, &server_args);
// Server should get the second call.
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(201));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(201), true);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_call_details_destroy(&call_details);
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(
request_metadata_recv.metadata[i].key,
grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
}
grpc_metadata_array_destroy(&request_metadata_recv);
// Server handles the second call.
request_payload_recv = nullptr;
was_cancelled = 2;
grpc_byte_buffer* response_payload2 =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload2;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Second call completes.
CQ_EXPECT_COMPLETION(cqv, tag(202), true);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
cq_verify(cqv);
// Clean up from second call.
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
grpc_byte_buffer_destroy(request_payload_recv);
GPR_ASSERT(was_cancelled == 0);
grpc_byte_buffer_destroy(response_payload2);
grpc_call_unref(s);
grpc_byte_buffer_destroy(request_payload2);
grpc_metadata_array_destroy(&initial_metadata_recv2);
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv2, response_payload_slice));
grpc_byte_buffer_destroy(response_payload_recv2);
grpc_metadata_array_destroy(&trailing_metadata_recv2);
GPR_ASSERT(status2 == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details2, "xyz"));
grpc_slice_unref(details2);
grpc_call_unref(c2);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
test_retry_transparent_max_concurrent_streams(config);
}
void retry_transparent_max_concurrent_streams_pre_init(void) {}

@ -0,0 +1,378 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/error_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests transparent retries when the call was never sent out on the wire.
static void test_retry_transparent_not_sent_on_wire(
grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
char* peer;
grpc_end2end_test_fixture f = begin_test(
config, "retry_transparent_not_sent_on_wire", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
peer = grpc_call_get_peer(c);
GPR_ASSERT(peer != nullptr);
gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
gpr_free(peer);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
grpc_slice status_details = grpc_slice_from_static_string("xyz");
// Start a batch containing send ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Start a batch containing recv ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Client send ops should now complete.
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
// Server should get a call.
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
// Server receives the request.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
cq_verify(cqv);
// Server sends a response with status OK.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// In principle, the server batch should complete before the client
// recv ops batch, but in the proxy fixtures, there are multiple threads
// involved, so the completion order tends to be a little racy.
CQ_EXPECT_COMPLETION(cqv, tag(103), true);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(
request_metadata_recv.metadata[i].key,
grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
namespace {
// A filter that, for the first 10 calls it sees, will fail all batches except
// for cancellations, so that the call fails with an error whose
// StreamNetworkState is kNotSentOnWire.
// All subsequent calls are allowed through without failures.
class FailFirstTenCallsFilter {
public:
static grpc_channel_filter kFilterVtable;
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* chand = static_cast<FailFirstTenCallsFilter*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
if (chand->num_calls_ < 10) calld->fail_ = true;
if (batch->send_initial_metadata) ++chand->num_calls_;
if (calld->fail_) {
if (batch->recv_trailing_metadata) {
batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
}
if (!batch->cancel_stream) {
grpc_transport_stream_op_batch_finish_with_failure(
batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"FailFirstTenCallsFilter failing batch"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
calld->call_combiner_);
return;
}
}
grpc_call_next_op(elem, batch);
}
private:
explicit CallData(const grpc_call_element_args* args)
: call_combiner_(args->call_combiner) {}
grpc_core::CallCombiner* call_combiner_;
bool fail_ = false;
};
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) FailFirstTenCallsFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<FailFirstTenCallsFilter*>(elem->channel_data);
chand->~FailFirstTenCallsFilter();
}
size_t num_calls_ = 0;
};
grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(FailFirstTenCallsFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"FailFirstTenCallsFilter",
};
} // namespace
void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstTenCallsFilter::kFilterVtable,
nullptr);
return true;
});
},
[config] { test_retry_transparent_not_sent_on_wire(config); });
}
void retry_transparent_not_sent_on_wire_pre_init(void) {}

@ -207,7 +207,9 @@ class StreamsNotSeenTest : public ::testing::Test {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0)};
const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0)};
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
client_args};
grpc_channel_credentials* creds = grpc_insecure_credentials_create();

Loading…
Cancel
Save