Fourth attempt: Implement transparent retries (#28943)

* Revert "Revert "Third attempt: Implement transparent retries (#28925)" (#28942)"

This reverts commit be70a8676c.

* fix edge case where call is cancelled while transparent retry callback is pending
pull/28985/head
Mark D. Roth 3 years ago committed by GitHub
parent faea518de0
commit c27f8f1505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      CMakeLists.txt
  2. 6
      build_autogenerated.yaml
  3. 3
      gRPC-Core.podspec
  4. 6
      grpc.gyp
  5. 185
      src/core/ext/filters/client_channel/retry_filter.cc
  6. 8
      src/core/tsi/alts/handshaker/alts_shared_resource.cc
  7. 7
      src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
  8. 24
      test/core/end2end/end2end_nosec_tests.cc
  9. 24
      test/core/end2end/end2end_tests.cc
  10. 11
      test/core/end2end/generate_tests.bzl
  11. 2
      test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc
  12. 2
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  13. 28
      test/core/end2end/tests/retry_send_op_fails.cc
  14. 379
      test/core/end2end/tests/retry_transparent_goaway.cc
  15. 368
      test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
  16. 378
      test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
  17. 4
      test/core/transport/chttp2/streams_not_seen_test.cc

6
CMakeLists.txt generated

@ -1191,6 +1191,9 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
test/core/end2end/tests/retry_throttled.cc
test/core/end2end/tests/retry_too_many_attempts.cc
test/core/end2end/tests/retry_transparent_goaway.cc
test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
test/core/end2end/tests/retry_unref_before_finish.cc
test/core/end2end/tests/retry_unref_before_recv.cc
test/core/end2end/tests/server_finishes_request.cc
@ -1337,6 +1340,9 @@ add_library(end2end_tests
test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
test/core/end2end/tests/retry_throttled.cc
test/core/end2end/tests/retry_too_many_attempts.cc
test/core/end2end/tests/retry_transparent_goaway.cc
test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
test/core/end2end/tests/retry_unref_before_finish.cc
test/core/end2end/tests/retry_unref_before_recv.cc
test/core/end2end/tests/server_finishes_request.cc

@ -113,6 +113,9 @@ libs:
- test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
- test/core/end2end/tests/retry_throttled.cc
- test/core/end2end/tests/retry_too_many_attempts.cc
- test/core/end2end/tests/retry_transparent_goaway.cc
- test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
- test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
- test/core/end2end/tests/retry_unref_before_finish.cc
- test/core/end2end/tests/retry_unref_before_recv.cc
- test/core/end2end/tests/server_finishes_request.cc
@ -250,6 +253,9 @@ libs:
- test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
- test/core/end2end/tests/retry_throttled.cc
- test/core/end2end/tests/retry_too_many_attempts.cc
- test/core/end2end/tests/retry_transparent_goaway.cc
- test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
- test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
- test/core/end2end/tests/retry_unref_before_finish.cc
- test/core/end2end/tests/retry_unref_before_recv.cc
- test/core/end2end/tests/server_finishes_request.cc

3
gRPC-Core.podspec generated

@ -2496,6 +2496,9 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc',
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/retry_transparent_goaway.cc',
'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc',
'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc',
'test/core/end2end/tests/retry_unref_before_finish.cc',
'test/core/end2end/tests/retry_unref_before_recv.cc',
'test/core/end2end/tests/server_finishes_request.cc',

6
grpc.gyp generated

@ -266,6 +266,9 @@
'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc',
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/retry_transparent_goaway.cc',
'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc',
'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc',
'test/core/end2end/tests/retry_unref_before_finish.cc',
'test/core/end2end/tests/retry_unref_before_recv.cc',
'test/core/end2end/tests/server_finishes_request.cc',
@ -380,6 +383,9 @@
'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc',
'test/core/end2end/tests/retry_throttled.cc',
'test/core/end2end/tests/retry_too_many_attempts.cc',
'test/core/end2end/tests/retry_transparent_goaway.cc',
'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc',
'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc',
'test/core/end2end/tests/retry_unref_before_finish.cc',
'test/core/end2end/tests/retry_unref_before_recv.cc',
'test/core/end2end/tests/server_finishes_request.cc',

@ -85,7 +85,6 @@
// which batches need to be sent on the LB call for a given attempt.
// TODO(roth): In subsequent PRs:
// - add support for transparent retries (including initial metadata)
// - implement hedging
// By default, we buffer 256 KiB per RPC for retries.
@ -215,7 +214,7 @@ class RetryFilter::CallData {
// State associated with each call attempt.
class CallAttempt : public RefCounted<CallAttempt> {
public:
explicit CallAttempt(CallData* calld);
CallAttempt(CallData* calld, bool is_transparent_retry);
~CallAttempt() override;
bool lb_call_committed() const { return lb_call_committed_; }
@ -401,7 +400,7 @@ class RetryFilter::CallData {
void MaybeSwitchToFastPath();
// Returns true if the call should be retried.
bool ShouldRetry(absl::optional<grpc_status_code> status, bool is_lb_drop,
bool ShouldRetry(absl::optional<grpc_status_code> status,
absl::optional<grpc_millis> server_pushback_ms);
// Abandons the call attempt. Unrefs any deferred batches.
@ -518,10 +517,15 @@ class RetryFilter::CallData {
static void OnRetryTimer(void* arg, grpc_error_handle error);
static void OnRetryTimerLocked(void* arg, grpc_error_handle error);
// Adds a closure to closures to start a transparent retry.
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
static void StartTransparentRetry(void* arg, grpc_error_handle error);
OrphanablePtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
void CreateCallAttempt();
void CreateCallAttempt(bool is_transparent_retry);
RetryFilter* chand_;
grpc_polling_entity* pollent_;
@ -564,6 +568,8 @@ class RetryFilter::CallData {
// Retry state.
bool retry_committed_ : 1;
bool retry_timer_pending_ : 1;
bool retry_codepath_started_ : 1;
bool sent_transparent_retry_not_seen_by_server_ : 1;
int num_attempts_completed_ = 0;
grpc_timer retry_timer_;
grpc_closure retry_closure_;
@ -656,7 +662,8 @@ class RetryFilter::CallData::CallStackDestructionBarrier
// RetryFilter::CallData::CallAttempt
//
RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
bool is_transparent_retry)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt"
: nullptr),
calld_(calld),
@ -673,7 +680,8 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
sent_cancel_stream_(false),
seen_recv_trailing_metadata_from_surface_(false),
abandoned_(false) {
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_);
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_,
is_transparent_retry);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: create lb_call=%p",
calld->chand_, calld, this, lb_call_.get());
@ -1077,11 +1085,8 @@ void RetryFilter::CallData::CallAttempt::CancelFromSurface(
}
bool RetryFilter::CallData::CallAttempt::ShouldRetry(
absl::optional<grpc_status_code> status, bool is_lb_drop,
absl::optional<grpc_status_code> status,
absl::optional<grpc_millis> server_pushback_ms) {
// LB drops always inhibit retries.
if (is_lb_drop) return false;
// TODO(roth): Handle transparent retries here.
// If no retry policy, don't retry.
if (calld_->retry_policy_ == nullptr) return false;
// Check status.
@ -1245,9 +1250,8 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED),
&closures);
// Check whether we should retry.
if (call_attempt->ShouldRetry(
/*status=*/absl::nullopt, /*is_lb_drop=*/false,
/*server_pushback_ms=*/absl::nullopt)) {
if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
/*server_pushback_ms=*/absl::nullopt)) {
// Mark current attempt as abandoned.
call_attempt->Abandon();
// We are retrying. Start backoff timer.
@ -1549,10 +1553,11 @@ namespace {
// Sets *status, *server_pushback_ms, and *is_lb_drop based on md_batch
// and error.
void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
grpc_error_handle error, grpc_status_code* status,
absl::optional<grpc_millis>* server_pushback_ms,
bool* is_lb_drop) {
void GetCallStatus(
grpc_millis deadline, grpc_metadata_batch* md_batch,
grpc_error_handle error, grpc_status_code* status,
absl::optional<grpc_millis>* server_pushback_ms, bool* is_lb_drop,
absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) {
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
intptr_t value = 0;
@ -1562,8 +1567,9 @@ void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
}
} else {
*status = *md_batch->get(GrpcStatusMetadata());
*server_pushback_ms = md_batch->get(GrpcRetryPushbackMsMetadata());
}
*server_pushback_ms = md_batch->get(GrpcRetryPushbackMsMetadata());
*stream_network_state = md_batch->get(GrpcStreamNetworkState());
GRPC_ERROR_UNREF(error);
}
@ -1695,36 +1701,72 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
absl::optional<grpc_millis> server_pushback_ms;
bool is_lb_drop = false;
absl::optional<GrpcStreamNetworkState::ValueType> stream_network_state;
grpc_metadata_batch* md_batch =
batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
bool is_lb_drop = false;
GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_ms, &is_lb_drop);
&server_pushback_ms, &is_lb_drop, &stream_network_state);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(
GPR_INFO,
"chand=%p calld=%p attempt=%p: call finished, status=%s is_lb_drop=%d",
calld->chand_, calld, call_attempt, grpc_status_code_to_string(status),
is_lb_drop);
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: call finished, status=%s "
"server_pushback_ms=%s is_lb_drop=%d stream_network_state=%s",
calld->chand_, calld, call_attempt,
grpc_status_code_to_string(status),
server_pushback_ms.has_value()
? absl::StrCat(*server_pushback_ms).c_str()
: "N/A",
is_lb_drop,
stream_network_state.has_value()
? absl::StrCat(*stream_network_state).c_str()
: "N/A");
}
// Check if we should retry.
if (call_attempt->ShouldRetry(status, is_lb_drop, server_pushback_ms)) {
// Start retry timer.
calld->StartRetryTimer(server_pushback_ms);
// Cancel call attempt.
CallCombinerClosureList closures;
call_attempt->MaybeAddBatchForCancelOp(
error == GRPC_ERROR_NONE
? grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED)
: GRPC_ERROR_REF(error),
&closures);
// Record that this attempt has been abandoned.
call_attempt->Abandon();
// Yields call combiner.
closures.RunClosures(calld->call_combiner_);
return;
if (!is_lb_drop) { // Never retry on LB drops.
enum { kNoRetry, kTransparentRetry, kConfigurableRetry } retry = kNoRetry;
// Handle transparent retries.
if (stream_network_state.has_value() && !calld->retry_committed_) {
// If not sent on wire, then always retry.
// If sent on wire but not seen by server, retry exactly once.
if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) {
retry = kTransparentRetry;
} else if (*stream_network_state ==
GrpcStreamNetworkState::kNotSeenByServer &&
!calld->sent_transparent_retry_not_seen_by_server_) {
calld->sent_transparent_retry_not_seen_by_server_ = true;
retry = kTransparentRetry;
}
}
// If not transparently retrying, check for configurable retry.
if (retry == kNoRetry &&
call_attempt->ShouldRetry(status, server_pushback_ms)) {
retry = kConfigurableRetry;
}
// If we're retrying, do so.
if (retry != kNoRetry) {
CallCombinerClosureList closures;
// Cancel call attempt.
call_attempt->MaybeAddBatchForCancelOp(
error == GRPC_ERROR_NONE
? grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED)
: GRPC_ERROR_REF(error),
&closures);
// For transparent retries, add a closure to immediately start a new
// call attempt.
// For configurable retries, start retry timer.
if (retry == kTransparentRetry) {
calld->AddClosureToStartTransparentRetry(&closures);
} else {
calld->StartRetryTimer(server_pushback_ms);
}
// Record that this attempt has been abandoned.
call_attempt->Abandon();
// Yields call combiner.
closures.RunClosures(calld->call_combiner_);
return;
}
}
// Not retrying, so commit the call.
calld->RetryCommit(call_attempt);
@ -2093,7 +2135,9 @@ RetryFilter::CallData::CallData(RetryFilter* chand,
pending_send_message_(false),
pending_send_trailing_metadata_(false),
retry_committed_(false),
retry_timer_pending_(false) {}
retry_timer_pending_(false),
retry_codepath_started_(false),
sent_transparent_retry_not_seen_by_server_(false) {}
RetryFilter::CallData::~CallData() {
FreeAllCachedSendOpData();
@ -2107,6 +2151,10 @@ RetryFilter::CallData::~CallData() {
void RetryFilter::CallData::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from surface: %s",
chand_, this, grpc_transport_stream_op_batch_string(batch).c_str());
}
// If we have an LB call, delegate to the LB call.
if (committed_call_ != nullptr) {
// Note: This will release the call combiner.
@ -2176,11 +2224,6 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
}
// If we do not yet have a call attempt, create one.
if (call_attempt_ == nullptr) {
// If there is no retry policy, then commit retries immediately.
// This ensures that the code below will always jump to the fast path.
// TODO(roth): Remove this special case when we implement
// transparent retries.
if (retry_policy_ == nullptr) retry_committed_ = true;
// If this is the first batch and retries are already committed
// (e.g., if this batch put the call above the buffer size limit), then
// immediately create an LB call and delegate the batch to it. This
@ -2196,7 +2239,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
// We also skip this optimization if perAttemptRecvTimeout is set in the
// retry policy, because we need the code in CallAttempt to handle
// the associated timer.
if (num_attempts_completed_ == 0 && retry_committed_ &&
if (!retry_codepath_started_ && retry_committed_ &&
(retry_policy_ == nullptr ||
!retry_policy_->per_attempt_recv_timeout().has_value())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -2210,7 +2253,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
static_cast<ClientChannelServiceConfigCallData*>(
call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
committed_call_ = CreateLoadBalancedCall(
service_config_call_data->call_dispatch_controller());
service_config_call_data->call_dispatch_controller(),
/*is_transparent_retry=*/false);
committed_call_->StartTransportStreamOpBatch(batch);
return;
}
@ -2221,7 +2265,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
this);
}
CreateCallAttempt();
retry_codepath_started_ = true;
CreateCallAttempt(/*is_transparent_retry=*/false);
return;
}
// Send batches to call attempt.
@ -2234,7 +2279,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
OrphanablePtr<ClientChannel::LoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller) {
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) {
grpc_call_element_args args = {owning_call_, nullptr, call_context_,
path_, /*start_time=*/0, deadline_,
arena_, call_combiner_};
@ -2243,13 +2289,11 @@ RetryFilter::CallData::CreateLoadBalancedCall(
// This callback holds a ref to the CallStackDestructionBarrier
// object until the LB call is destroyed.
call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
call_dispatch_controller,
// TODO(roth): Change this when we support transparent retries.
/*is_transparent_retry=*/false);
call_dispatch_controller, is_transparent_retry);
}
void RetryFilter::CallData::CreateCallAttempt() {
call_attempt_ = MakeRefCounted<CallAttempt>(this);
void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) {
call_attempt_ = MakeRefCounted<CallAttempt>(this, is_transparent_retry);
call_attempt_->StartRetriableBatches();
}
@ -2544,13 +2588,36 @@ void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
auto* calld = static_cast<CallData*>(arg);
if (error == GRPC_ERROR_NONE && calld->retry_timer_pending_) {
calld->retry_timer_pending_ = false;
calld->CreateCallAttempt();
calld->CreateCallAttempt(/*is_transparent_retry=*/false);
} else {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "retry timer cancelled");
}
GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
}
void RetryFilter::CallData::AddClosureToStartTransparentRetry(
CallCombinerClosureList* closures) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: scheduling transparent retry", chand_,
this);
}
GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr);
closures->Add(&retry_closure_, GRPC_ERROR_NONE, "start transparent retry");
}
void RetryFilter::CallData::StartTransparentRetry(void* arg,
grpc_error_handle /*error*/) {
auto* calld = static_cast<CallData*>(arg);
if (calld->cancelled_from_surface_ == GRPC_ERROR_NONE) {
calld->CreateCallAttempt(/*is_transparent_retry=*/true);
} else {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"call cancelled before transparent retry");
}
GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
}
} // namespace
const grpc_channel_filter kRetryFilterVtable = {

@ -22,6 +22,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/tsi/alts/handshaker/alts_handshaker_client.h"
static alts_shared_resource_dedicated g_alts_resource_dedicated;
@ -56,8 +57,13 @@ void grpc_alts_shared_resource_dedicated_start(
gpr_mu_lock(&g_alts_resource_dedicated.mu);
if (g_alts_resource_dedicated.cq == nullptr) {
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
// Disable retries so that we quickly get a signal when the
// handshake server is not reachable.
grpc_arg disable_retries_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
grpc_channel_args args = {1, &disable_retries_arg};
g_alts_resource_dedicated.channel =
grpc_channel_create(handshaker_service_url, creds, nullptr);
grpc_channel_create(handshaker_service_url, creds, &args);
grpc_channel_credentials_release(creds);
g_alts_resource_dedicated.cq =
grpc_completion_queue_create_for_next(nullptr);

@ -499,8 +499,13 @@ static void alts_tsi_handshaker_create_channel(
alts_tsi_handshaker* handshaker = next_args->handshaker;
GPR_ASSERT(handshaker->channel == nullptr);
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
// Disable retries so that we quickly get a signal when the
// handshake server is not reachable.
grpc_arg disable_retries_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
grpc_channel_args args = {1, &disable_retries_arg};
handshaker->channel = grpc_channel_create(
next_args->handshaker->handshaker_service_url, creds, nullptr);
next_args->handshaker->handshaker_service_url, creds, &args);
grpc_channel_credentials_release(creds);
tsi_result continue_next_result =
alts_tsi_handshaker_continue_handshaker_next(

@ -179,6 +179,12 @@ extern void retry_throttled(grpc_end2end_test_config config);
extern void retry_throttled_pre_init(void);
extern void retry_too_many_attempts(grpc_end2end_test_config config);
extern void retry_too_many_attempts_pre_init(void);
extern void retry_transparent_goaway(grpc_end2end_test_config config);
extern void retry_transparent_goaway_pre_init(void);
extern void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config);
extern void retry_transparent_max_concurrent_streams_pre_init(void);
extern void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config);
extern void retry_transparent_not_sent_on_wire_pre_init(void);
extern void retry_unref_before_finish(grpc_end2end_test_config config);
extern void retry_unref_before_finish_pre_init(void);
extern void retry_unref_before_recv(grpc_end2end_test_config config);
@ -286,6 +292,9 @@ void grpc_end2end_tests_pre_init(void) {
retry_streaming_succeeds_before_replay_finished_pre_init();
retry_throttled_pre_init();
retry_too_many_attempts_pre_init();
retry_transparent_goaway_pre_init();
retry_transparent_max_concurrent_streams_pre_init();
retry_transparent_not_sent_on_wire_pre_init();
retry_unref_before_finish_pre_init();
retry_unref_before_recv_pre_init();
server_finishes_request_pre_init();
@ -385,6 +394,9 @@ void grpc_end2end_tests(int argc, char **argv,
retry_streaming_succeeds_before_replay_finished(config);
retry_throttled(config);
retry_too_many_attempts(config);
retry_transparent_goaway(config);
retry_transparent_max_concurrent_streams(config);
retry_transparent_not_sent_on_wire(config);
retry_unref_before_finish(config);
retry_unref_before_recv(config);
server_finishes_request(config);
@ -703,6 +715,18 @@ void grpc_end2end_tests(int argc, char **argv,
retry_too_many_attempts(config);
continue;
}
if (0 == strcmp("retry_transparent_goaway", argv[i])) {
retry_transparent_goaway(config);
continue;
}
if (0 == strcmp("retry_transparent_max_concurrent_streams", argv[i])) {
retry_transparent_max_concurrent_streams(config);
continue;
}
if (0 == strcmp("retry_transparent_not_sent_on_wire", argv[i])) {
retry_transparent_not_sent_on_wire(config);
continue;
}
if (0 == strcmp("retry_unref_before_finish", argv[i])) {
retry_unref_before_finish(config);
continue;

@ -183,6 +183,12 @@ extern void retry_throttled(grpc_end2end_test_config config);
extern void retry_throttled_pre_init(void);
extern void retry_too_many_attempts(grpc_end2end_test_config config);
extern void retry_too_many_attempts_pre_init(void);
extern void retry_transparent_goaway(grpc_end2end_test_config config);
extern void retry_transparent_goaway_pre_init(void);
extern void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config);
extern void retry_transparent_max_concurrent_streams_pre_init(void);
extern void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config);
extern void retry_transparent_not_sent_on_wire_pre_init(void);
extern void retry_unref_before_finish(grpc_end2end_test_config config);
extern void retry_unref_before_finish_pre_init(void);
extern void retry_unref_before_recv(grpc_end2end_test_config config);
@ -292,6 +298,9 @@ void grpc_end2end_tests_pre_init(void) {
retry_streaming_succeeds_before_replay_finished_pre_init();
retry_throttled_pre_init();
retry_too_many_attempts_pre_init();
retry_transparent_goaway_pre_init();
retry_transparent_max_concurrent_streams_pre_init();
retry_transparent_not_sent_on_wire_pre_init();
retry_unref_before_finish_pre_init();
retry_unref_before_recv_pre_init();
server_finishes_request_pre_init();
@ -393,6 +402,9 @@ void grpc_end2end_tests(int argc, char **argv,
retry_streaming_succeeds_before_replay_finished(config);
retry_throttled(config);
retry_too_many_attempts(config);
retry_transparent_goaway(config);
retry_transparent_max_concurrent_streams(config);
retry_transparent_not_sent_on_wire(config);
retry_unref_before_finish(config);
retry_unref_before_recv(config);
server_finishes_request(config);
@ -719,6 +731,18 @@ void grpc_end2end_tests(int argc, char **argv,
retry_too_many_attempts(config);
continue;
}
if (0 == strcmp("retry_transparent_goaway", argv[i])) {
retry_transparent_goaway(config);
continue;
}
if (0 == strcmp("retry_transparent_max_concurrent_streams", argv[i])) {
retry_transparent_max_concurrent_streams(config);
continue;
}
if (0 == strcmp("retry_transparent_not_sent_on_wire", argv[i])) {
retry_transparent_not_sent_on_wire(config);
continue;
}
if (0 == strcmp("retry_unref_before_finish", argv[i])) {
retry_unref_before_finish(config);
continue;

@ -362,6 +362,17 @@ END2END_TESTS = {
),
"retry_throttled": _test_options(needs_client_channel = True),
"retry_too_many_attempts": _test_options(needs_client_channel = True),
"retry_transparent_goaway": _test_options(needs_client_channel = True),
"retry_transparent_not_sent_on_wire": _test_options(
needs_client_channel = True,
),
"retry_transparent_max_concurrent_streams": _test_options(
needs_client_channel = True,
proxyable = False,
# TODO(jtattermusch): too long bazel test name makes the test flaky on Windows RBE
# See b/151617965
short_name = "retry_transparent_mcs",
),
"retry_unref_before_finish": _test_options(needs_client_channel = True),
"retry_unref_before_recv": _test_options(needs_client_channel = True),
"server_finishes_request": _test_options(),

@ -240,7 +240,7 @@ class FailSendOpsFilter {
public:
static grpc_channel_filter kFilterVtable;
public:
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,

@ -267,7 +267,7 @@ class InjectStatusFilter {
public:
static grpc_channel_filter kFilterVtable;
public:
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,

@ -277,14 +277,14 @@ static void test_retry_send_op_fails(grpc_end2end_test_config config) {
namespace {
// A filter that, for the first call it sees, will fail the batch
// containing send_initial_metadata and then fail the call with status
// ABORTED. All subsequent calls are allowed through without failures.
class FailFirstSendOpFilter {
// A filter that, for the first call it sees, will fail all batches except
// for cancellations, so that the call fails with status ABORTED.
// All subsequent calls are allowed through without failures.
class FailFirstCallFilter {
public:
static grpc_channel_filter kFilterVtable;
public:
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
@ -302,7 +302,7 @@ class FailFirstSendOpFilter {
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
if (!chand->seen_first_) {
chand->seen_first_ = true;
@ -312,7 +312,7 @@ class FailFirstSendOpFilter {
grpc_transport_stream_op_batch_finish_with_failure(
batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"FailFirstSendOpFilter failing batch"),
"FailFirstCallFilter failing batch"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED),
calld->call_combiner_);
return;
@ -330,19 +330,19 @@ class FailFirstSendOpFilter {
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) FailFirstSendOpFilter();
new (elem->channel_data) FailFirstCallFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
chand->~FailFirstSendOpFilter();
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
chand->~FailFirstCallFilter();
}
bool seen_first_ = false;
};
grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
@ -350,11 +350,11 @@ grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(FailFirstSendOpFilter),
sizeof(FailFirstCallFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"FailFirstSendOpFilter",
"FailFirstCallFilter",
};
} // namespace
@ -374,7 +374,7 @@ void retry_send_op_fails(grpc_end2end_test_config config) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstSendOpFilter::kFilterVtable,
builder->PrependFilter(&FailFirstCallFilter::kFilterVtable,
nullptr);
return true;
});

@ -0,0 +1,379 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/error_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests transparent retries when the call was never sent out on the wire.
static void test_retry_transparent_goaway(grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
char* peer;
grpc_end2end_test_fixture f =
begin_test(config, "retry_transparent_goaway", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
peer = grpc_call_get_peer(c);
GPR_ASSERT(peer != nullptr);
gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
gpr_free(peer);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
grpc_slice status_details = grpc_slice_from_static_string("xyz");
// Start a batch containing send ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Start a batch containing recv ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Client send ops should now complete.
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
// Server should get a call.
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
// Server receives the request.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
cq_verify(cqv);
// Server sends a response with status OK.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// In principle, the server batch should complete before the client
// recv ops batch, but in the proxy fixtures, there are multiple threads
// involved, so the completion order tends to be a little racy.
CQ_EXPECT_COMPLETION(cqv, tag(103), true);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(
request_metadata_recv.metadata[i].key,
grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
namespace {
// A filter that, for the first call it sees, will fail all batches except
// for cancellations, so that the call fails with an error whose
// StreamNetworkState is kNotSeenByServer.
// All subsequent calls are allowed through without failures.
class FailFirstCallFilter {
public:
static grpc_channel_filter kFilterVtable;
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
if (!chand->seen_call_) {
calld->fail_ = true;
chand->seen_call_ = true;
}
if (calld->fail_) {
if (batch->recv_trailing_metadata) {
batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
}
if (!batch->cancel_stream) {
grpc_transport_stream_op_batch_finish_with_failure(
batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"FailFirstCallFilter failing batch"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
calld->call_combiner_);
return;
}
}
grpc_call_next_op(elem, batch);
}
private:
explicit CallData(const grpc_call_element_args* args)
: call_combiner_(args->call_combiner) {}
grpc_core::CallCombiner* call_combiner_;
bool fail_ = false;
};
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) FailFirstCallFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
chand->~FailFirstCallFilter();
}
bool seen_call_ = false;
};
grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(FailFirstCallFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"FailFirstCallFilter",
};
} // namespace
void retry_transparent_goaway(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstCallFilter::kFilterVtable,
nullptr);
return true;
});
},
[config] { test_retry_transparent_goaway(config); });
}
void retry_transparent_goaway_pre_init(void) {}

@ -0,0 +1,368 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/error_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests transparent retries when the call was never sent out on the wire.
// This is similar to retry_transparent_not_sent_on_wire, except that
// instead of simulating the response with a filter, we actually have
// the transport behave the right way. We create a server with
// MAX_CONCURRENT_STREAMS set to 1. We start a call on the server, and
// then start a second call, which will get queued in the transport.
// Then, before the first call finishes, the server is shut down and
// restarted. The second call will fail in that transport instance and
// will be transparently retried after the server starts up again.
static void test_retry_transparent_max_concurrent_streams(
grpc_end2end_test_config config) {
grpc_op ops[6];
grpc_op* op;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_slice status_details = grpc_slice_from_static_string("xyz");
grpc_call_error error;
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS), 1);
grpc_channel_args server_args = {1, &arg};
grpc_end2end_test_fixture f =
begin_test(config, "retry_transparent_max_concurrent_streams", nullptr,
&server_args);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
// Client starts a call.
grpc_call* c =
grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array_init(&initial_metadata_recv);
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_status_code status;
grpc_slice details;
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server should get a call.
grpc_call* s;
grpc_metadata_array request_metadata_recv;
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details call_details;
grpc_call_details_init(&call_details);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_call_details_destroy(&call_details);
grpc_metadata_array_destroy(&request_metadata_recv);
// Client starts a second call.
// We set wait_for_ready for this call, so that if it retries before
// the server comes back up, it stays pending.
grpc_call* c2 =
grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c2);
grpc_byte_buffer* request_payload2 =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_metadata_array initial_metadata_recv2;
grpc_metadata_array_init(&initial_metadata_recv2);
grpc_byte_buffer* response_payload_recv2 = nullptr;
grpc_metadata_array trailing_metadata_recv2;
grpc_metadata_array_init(&trailing_metadata_recv2);
grpc_status_code status2;
grpc_slice details2;
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload2;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv2;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv2;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
op->data.recv_status_on_client.status = &status2;
op->data.recv_status_on_client.status_details = &details2;
op++;
error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops), tag(2),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Start server shutdown.
grpc_server_shutdown_and_notify(f.server, f.cq, tag(102));
// Server handles the first call.
grpc_byte_buffer* request_payload_recv = nullptr;
int was_cancelled = 2;
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server completes first call and shutdown.
// Client completes first call.
CQ_EXPECT_COMPLETION(cqv, tag(103), true);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
// Clean up from first call.
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
grpc_byte_buffer_destroy(request_payload_recv);
GPR_ASSERT(was_cancelled == 0);
grpc_byte_buffer_destroy(response_payload);
grpc_call_unref(s);
grpc_byte_buffer_destroy(request_payload);
grpc_metadata_array_destroy(&initial_metadata_recv);
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
grpc_byte_buffer_destroy(response_payload_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
grpc_slice_unref(details);
grpc_call_unref(c);
// Destroy server and then restart it.
grpc_server_destroy(f.server);
f.server = nullptr;
config.init_server(&f, &server_args);
// Server should get the second call.
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(201));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(201), true);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_call_details_destroy(&call_details);
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(
request_metadata_recv.metadata[i].key,
grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
}
grpc_metadata_array_destroy(&request_metadata_recv);
// Server handles the second call.
request_payload_recv = nullptr;
was_cancelled = 2;
grpc_byte_buffer* response_payload2 =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload2;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Second call completes.
CQ_EXPECT_COMPLETION(cqv, tag(202), true);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
cq_verify(cqv);
// Clean up from second call.
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
grpc_byte_buffer_destroy(request_payload_recv);
GPR_ASSERT(was_cancelled == 0);
grpc_byte_buffer_destroy(response_payload2);
grpc_call_unref(s);
grpc_byte_buffer_destroy(request_payload2);
grpc_metadata_array_destroy(&initial_metadata_recv2);
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv2, response_payload_slice));
grpc_byte_buffer_destroy(response_payload_recv2);
grpc_metadata_array_destroy(&trailing_metadata_recv2);
GPR_ASSERT(status2 == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details2, "xyz"));
grpc_slice_unref(details2);
grpc_call_unref(c2);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
test_retry_transparent_max_concurrent_streams(config);
}
void retry_transparent_max_concurrent_streams_pre_init(void) {}

@ -0,0 +1,378 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/error_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests transparent retries when the call was never sent out on the wire.
static void test_retry_transparent_not_sent_on_wire(
grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
char* peer;
grpc_end2end_test_fixture f = begin_test(
config, "retry_transparent_not_sent_on_wire", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
peer = grpc_call_get_peer(c);
GPR_ASSERT(peer != nullptr);
gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
gpr_free(peer);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
grpc_slice status_details = grpc_slice_from_static_string("xyz");
// Start a batch containing send ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Start a batch containing recv ops.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Client send ops should now complete.
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
// Server should get a call.
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
// Server receives the request.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), true);
cq_verify(cqv);
// Server sends a response with status OK.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// In principle, the server batch should complete before the client
// recv ops batch, but in the proxy fixtures, there are multiple threads
// involved, so the completion order tends to be a little racy.
CQ_EXPECT_COMPLETION(cqv, tag(103), true);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
GPR_ASSERT(
byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(
request_metadata_recv.metadata[i].key,
grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
namespace {
// A filter that, for the first 10 calls it sees, will fail all batches except
// for cancellations, so that the call fails with an error whose
// StreamNetworkState is kNotSentOnWire.
// All subsequent calls are allowed through without failures.
class FailFirstTenCallsFilter {
public:
static grpc_channel_filter kFilterVtable;
private:
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(args);
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
auto* chand = static_cast<FailFirstTenCallsFilter*>(elem->channel_data);
auto* calld = static_cast<CallData*>(elem->call_data);
if (chand->num_calls_ < 10) calld->fail_ = true;
if (batch->send_initial_metadata) ++chand->num_calls_;
if (calld->fail_) {
if (batch->recv_trailing_metadata) {
batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
}
if (!batch->cancel_stream) {
grpc_transport_stream_op_batch_finish_with_failure(
batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"FailFirstTenCallsFilter failing batch"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
calld->call_combiner_);
return;
}
}
grpc_call_next_op(elem, batch);
}
private:
explicit CallData(const grpc_call_element_args* args)
: call_combiner_(args->call_combiner) {}
grpc_core::CallCombiner* call_combiner_;
bool fail_ = false;
};
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* /*args*/) {
new (elem->channel_data) FailFirstTenCallsFilter();
return GRPC_ERROR_NONE;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<FailFirstTenCallsFilter*>(elem->channel_data);
chand->~FailFirstTenCallsFilter();
}
size_t num_calls_ = 0;
};
grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(FailFirstTenCallsFilter),
Init,
Destroy,
grpc_channel_next_get_info,
"FailFirstTenCallsFilter",
};
} // namespace
void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder);
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
return true;
}
// Install filter.
builder->PrependFilter(&FailFirstTenCallsFilter::kFilterVtable,
nullptr);
return true;
});
},
[config] { test_retry_transparent_not_sent_on_wire(config); });
}
void retry_transparent_not_sent_on_wire_pre_init(void) {}

@ -206,7 +206,9 @@ class StreamsNotSeenTest : public ::testing::Test {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0)};
const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0)};
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
client_args};
grpc_channel_credentials* creds = grpc_insecure_credentials_create();

Loading…
Cancel
Save