fix handling of retry perAttemptRecvTimeout exceeded on last attempt (#26737)

* fix handling of retry perAttemptRecvTimeout exceeded on last attempt

* fix clang-tidy
reviewable/pr26751/r1
Mark D. Roth 4 years ago committed by GitHub
parent a2d0e37c32
commit 1f8aaf0e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CMakeLists.txt
  2. 2
      build_autogenerated.yaml
  3. 1
      gRPC-Core.podspec
  4. 2
      grpc.gyp
  5. 64
      src/core/ext/filters/client_channel/retry_filter.cc
  6. 1
      templates/test/core/end2end/end2end_defs.include
  7. 9
      test/core/end2end/end2end_nosec_tests.cc
  8. 9
      test/core/end2end/end2end_tests.cc
  9. 6
      test/core/end2end/generate_tests.bzl
  10. 263
      test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc

@ -1132,6 +1132,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/retry_non_retriable_status.cc
test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
test/core/end2end/tests/retry_recv_initial_metadata.cc
test/core/end2end/tests/retry_recv_message.cc
test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
@ -1270,6 +1271,7 @@ add_library(end2end_tests
test/core/end2end/tests/retry_non_retriable_status.cc
test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
test/core/end2end/tests/retry_recv_initial_metadata.cc
test/core/end2end/tests/retry_recv_message.cc
test/core/end2end/tests/retry_recv_trailing_metadata_error.cc

@ -94,6 +94,7 @@ libs:
- test/core/end2end/tests/retry_non_retriable_status.cc
- test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
- test/core/end2end/tests/retry_recv_initial_metadata.cc
- test/core/end2end/tests/retry_recv_message.cc
- test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
@ -207,6 +208,7 @@ libs:
- test/core/end2end/tests/retry_non_retriable_status.cc
- test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
- test/core/end2end/tests/retry_recv_initial_metadata.cc
- test/core/end2end/tests/retry_recv_message.cc
- test/core/end2end/tests/retry_recv_trailing_metadata_error.cc

@ -2139,6 +2139,7 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
'test/core/end2end/tests/retry_recv_message.cc',
'test/core/end2end/tests/retry_recv_trailing_metadata_error.cc',

@ -248,6 +248,7 @@
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
'test/core/end2end/tests/retry_recv_message.cc',
'test/core/end2end/tests/retry_recv_trailing_metadata_error.cc',
@ -354,6 +355,7 @@
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
'test/core/end2end/tests/retry_recv_message.cc',
'test/core/end2end/tests/retry_recv_trailing_metadata_error.cc',

@ -257,7 +257,7 @@ class RetryFilter::CallData {
// Adds retriable recv_trailing_metadata op.
void AddRetriableRecvTrailingMetadataOp();
// Adds cancel_stream op.
void AddCancelStreamOp();
void AddCancelStreamOp(const char* reason);
private:
// Frees cached send ops that were completed by the completed batch in
@ -390,7 +390,7 @@ class RetryFilter::CallData {
// Cancels the call attempt. Unrefs any deferred batches.
// Adds a batch to closures to cancel this call attempt.
void Cancel(CallCombinerClosureList* closures);
void Cancel(const char* reason, CallCombinerClosureList* closures);
static void OnPerAttemptRecvTimer(void* arg, grpc_error_handle error);
static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
@ -459,7 +459,7 @@ class RetryFilter::CallData {
// save space but will also result in a data race because compiler
// will generate a 2 byte store which overwrites the meta-data
// fields upon setting this field.
bool cancelled_ : 1;
bool abandoned_ : 1;
};
CallData(RetryFilter* chand, const grpc_call_element_args& args);
@ -654,7 +654,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
started_recv_trailing_metadata_(false),
completed_recv_trailing_metadata_(false),
seen_recv_trailing_metadata_from_surface_(false),
cancelled_(false) {
abandoned_(false) {
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: create lb_call=%p",
@ -742,8 +742,8 @@ void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
// If we're not yet committed, we can't switch yet.
// TODO(roth): As part of implementing hedging, this logic needs to
// check that *this* call attempt is the one that we've committed to.
// Might need to replace cancelled_ with an enum indicating whether we're
// in flight, cancelled, or the winning call attempt.
// Might need to replace abandoned_ with an enum indicating whether we're
// in flight, abandoned, or the winning call attempt.
if (!calld_->retry_committed_) return;
// If we've already switched to fast path, there's nothing to do here.
if (calld_->committed_call_ != nullptr) return;
@ -1131,9 +1131,7 @@ bool RetryFilter::CallData::CallAttempt::ShouldRetry(
}
void RetryFilter::CallData::CallAttempt::Cancel(
CallCombinerClosureList* closures) {
// Record that this attempt has been cancelled.
cancelled_ = true;
const char* reason, CallCombinerClosureList* closures) {
// Unref batches for deferred completion callbacks that will now never
// be invoked.
if (started_recv_trailing_metadata_ &&
@ -1162,7 +1160,7 @@ void RetryFilter::CallData::CallAttempt::Cancel(
// transport knows that this call should be cleaned up, even if it
// hasn't received any ops.
BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true);
cancel_batch_data->AddCancelStreamOp();
cancel_batch_data->AddCancelStreamOp(reason);
AddClosureForBatch(cancel_batch_data->batch(),
"start cancellation batch on call attempt", closures);
}
@ -1196,11 +1194,13 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
// Cancel this attempt.
// TODO(roth): When implementing hedging, we should not cancel the
// current attempt.
call_attempt->Cancel(&closures);
call_attempt->Cancel("retry perAttemptRecvTimeout exceeded", &closures);
// Check whether we should retry.
if (call_attempt->ShouldRetry(
/*status=*/absl::nullopt, /*is_lb_drop=*/false,
/*server_pushback_md=*/nullptr, /*server_pushback_ms=*/nullptr)) {
// Mark current attempt as abandoned.
call_attempt->abandoned_ = true;
// We are retrying. Start backoff timer.
calld->StartRetryTimer(/*server_pushback_ms=*/-1);
} else {
@ -1350,11 +1350,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
grpc_error_std_string(error).c_str());
}
call_attempt->completed_recv_initial_metadata_ = true;
// If this attempt has been cancelled, then we're not going to use the
// If this attempt has been abandoned, then we're not going to use the
// result of this recv_initial_metadata op, so do nothing.
if (call_attempt->cancelled_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_initial_metadata_ready after cancellation");
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"recv_initial_metadata_ready for abandoned attempt");
return;
}
// Cancel per-attempt recv timer, if any.
@ -1443,11 +1444,11 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
grpc_error_std_string(error).c_str());
}
++call_attempt->completed_recv_message_count_;
// If this attempt has been cancelled, then we're not going to use the
// If this attempt has been abandoned, then we're not going to use the
// result of this recv_message op, so do nothing.
if (call_attempt->cancelled_) {
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_message_ready after cancellation");
"recv_message_ready for abandoned attempt");
return;
}
// Cancel per-attempt recv timer, if any.
@ -1641,11 +1642,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
grpc_error_std_string(error).c_str());
}
call_attempt->completed_recv_trailing_metadata_ = true;
// If this attempt has been cancelled, then we're not going to use the
// If this attempt has been abandoned, then we're not going to use the
// result of this recv_trailing_metadata op, so do nothing.
if (call_attempt->cancelled_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_trailing_metadata_ready after cancellation");
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"recv_trailing_metadata_ready for abandoned attempt");
return;
}
// Cancel per-attempt recv timer, if any.
@ -1673,7 +1675,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
calld->StartRetryTimer(server_pushback_ms);
// Cancel call attempt.
CallCombinerClosureList closures;
call_attempt->Cancel(&closures);
call_attempt->Cancel("call attempt failed", &closures);
// Record that this attempt has been abandoned.
call_attempt->abandoned_ = true;
// Yields call combiner.
closures.RunClosures(calld->call_combiner_);
return;
@ -1758,11 +1762,11 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
grpc_error_std_string(error).c_str(),
grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str());
}
// If this attempt has been cancelled, then we're not going to propagate
// If this attempt has been abandoned, then we're not going to propagate
// the completion of this batch, so do nothing.
if (call_attempt->cancelled_) {
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"on_complete after cancellation");
"on_complete for abandoned attempt");
return;
}
// If we got an error and have not yet gotten the
@ -1956,10 +1960,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::
&call_attempt_->recv_trailing_metadata_ready_;
}
void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp() {
void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
const char* reason) {
batch_.cancel_stream = true;
batch_.payload->cancel_stream.cancel_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("retry attempt abandoned");
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
}
//
@ -2437,7 +2443,7 @@ void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
// retry attempt is started, in which case we'll just pass the real
// call dispatch controller down into the LB call, and it won't be
// our problem anymore.
if (call_attempt_->lb_call_committed()) {
if (call_attempt->lb_call_committed()) {
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
service_config_call_data->call_dispatch_controller()->Commit();

@ -43,6 +43,7 @@ void grpc_end2end_tests_pre_init(void) {
% endfor
}
// NOLINTNEXTLINE(readability-function-size)
void grpc_end2end_tests(int argc, char **argv,
grpc_end2end_test_config config) {
int i;

@ -145,6 +145,8 @@ extern void retry_non_retriable_status_before_recv_trailing_metadata_started(grp
extern void retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init(void);
extern void retry_per_attempt_recv_timeout(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_pre_init(void);
extern void retry_per_attempt_recv_timeout_on_last_attempt(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_on_last_attempt_pre_init(void);
extern void retry_recv_initial_metadata(grpc_end2end_test_config config);
extern void retry_recv_initial_metadata_pre_init(void);
extern void retry_recv_message(grpc_end2end_test_config config);
@ -261,6 +263,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_non_retriable_status_pre_init();
retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init();
retry_per_attempt_recv_timeout_pre_init();
retry_per_attempt_recv_timeout_on_last_attempt_pre_init();
retry_recv_initial_metadata_pre_init();
retry_recv_message_pre_init();
retry_recv_trailing_metadata_error_pre_init();
@ -290,6 +293,7 @@ void grpc_end2end_tests_pre_init(void) {
write_buffering_at_end_pre_init();
}
// NOLINTNEXTLINE(readability-function-size)
void grpc_end2end_tests(int argc, char **argv,
grpc_end2end_test_config config) {
int i;
@ -355,6 +359,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_non_retriable_status(config);
retry_non_retriable_status_before_recv_trailing_metadata_started(config);
retry_per_attempt_recv_timeout(config);
retry_per_attempt_recv_timeout_on_last_attempt(config);
retry_recv_initial_metadata(config);
retry_recv_message(config);
retry_recv_trailing_metadata_error(config);
@ -618,6 +623,10 @@ void grpc_end2end_tests(int argc, char **argv,
retry_per_attempt_recv_timeout(config);
continue;
}
if (0 == strcmp("retry_per_attempt_recv_timeout_on_last_attempt", argv[i])) {
retry_per_attempt_recv_timeout_on_last_attempt(config);
continue;
}
if (0 == strcmp("retry_recv_initial_metadata", argv[i])) {
retry_recv_initial_metadata(config);
continue;

@ -147,6 +147,8 @@ extern void retry_non_retriable_status_before_recv_trailing_metadata_started(grp
extern void retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init(void);
extern void retry_per_attempt_recv_timeout(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_pre_init(void);
extern void retry_per_attempt_recv_timeout_on_last_attempt(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_on_last_attempt_pre_init(void);
extern void retry_recv_initial_metadata(grpc_end2end_test_config config);
extern void retry_recv_initial_metadata_pre_init(void);
extern void retry_recv_message(grpc_end2end_test_config config);
@ -264,6 +266,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_non_retriable_status_pre_init();
retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init();
retry_per_attempt_recv_timeout_pre_init();
retry_per_attempt_recv_timeout_on_last_attempt_pre_init();
retry_recv_initial_metadata_pre_init();
retry_recv_message_pre_init();
retry_recv_trailing_metadata_error_pre_init();
@ -293,6 +296,7 @@ void grpc_end2end_tests_pre_init(void) {
write_buffering_at_end_pre_init();
}
// NOLINTNEXTLINE(readability-function-size)
void grpc_end2end_tests(int argc, char **argv,
grpc_end2end_test_config config) {
int i;
@ -359,6 +363,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_non_retriable_status(config);
retry_non_retriable_status_before_recv_trailing_metadata_started(config);
retry_per_attempt_recv_timeout(config);
retry_per_attempt_recv_timeout_on_last_attempt(config);
retry_recv_initial_metadata(config);
retry_recv_message(config);
retry_recv_trailing_metadata_error(config);
@ -626,6 +631,10 @@ void grpc_end2end_tests(int argc, char **argv,
retry_per_attempt_recv_timeout(config);
continue;
}
if (0 == strcmp("retry_per_attempt_recv_timeout_on_last_attempt", argv[i])) {
retry_per_attempt_recv_timeout_on_last_attempt(config);
continue;
}
if (0 == strcmp("retry_recv_initial_metadata", argv[i])) {
retry_recv_initial_metadata(config);
continue;

@ -302,6 +302,12 @@ END2END_TESTS = {
"retry_per_attempt_recv_timeout": _test_options(
needs_client_channel = True,
),
"retry_per_attempt_recv_timeout_on_last_attempt": _test_options(
needs_client_channel = True,
# TODO(jtattermusch): too long bazel test name makes the test flaky on Windows RBE
# See b/151617965
short_name = "retry_per_attempt_recv_timeout2",
),
"retry_recv_initial_metadata": _test_options(needs_client_channel = True),
"retry_recv_message": _test_options(needs_client_channel = True),
"retry_recv_trailing_metadata_error": _test_options(

@ -0,0 +1,263 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/static_metadata.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests perAttemptRecvTimeout:
// - 1 retry allowed for ABORTED status
// - both attempts do not receive a response until after perAttemptRecvTimeout
static void test_retry_per_attempt_recv_timeout_on_last_attempt(
grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_call* s0;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
grpc_arg args[] = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 1),
grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
const_cast<char*>(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"service\", \"method\": \"method\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 2,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"perAttemptRecvTimeout\": \"2s\",\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}")),
};
grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args};
grpc_end2end_test_fixture f =
begin_test(config, "retry", &client_args, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = n_seconds_from_now(10);
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server gets a call but does not respond to the call.
error =
grpc_server_request_call(f.server, &s0, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
// Make sure the "grpc-previous-rpc-attempts" header was not sent in the
// initial attempt.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(request_metadata_recv.metadata[i].key,
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS));
}
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_details_init(&call_details);
// Server gets a second call, which it also does not respond to.
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(201));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(201), true);
cq_verify(cqv);
// Now we can unref the first call.
grpc_call_unref(s0);
// Make sure the "grpc-previous-rpc-attempts" header was sent in the retry.
bool found_retry_header = false;
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
if (grpc_slice_eq(request_metadata_recv.metadata[i].key,
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS)) {
GPR_ASSERT(
grpc_slice_eq(request_metadata_recv.metadata[i].value, GRPC_MDSTR_1));
found_retry_header = true;
break;
}
}
GPR_ASSERT(found_retry_header);
// Client sees call completion.
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_CANCELLED);
GPR_ASSERT(
0 == grpc_slice_str_cmp(details, "retry perAttemptRecvTimeout exceeded"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
void retry_per_attempt_recv_timeout_on_last_attempt(
grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
test_retry_per_attempt_recv_timeout_on_last_attempt(config);
}
void retry_per_attempt_recv_timeout_on_last_attempt_pre_init(void) {}
Loading…
Cancel
Save