add separate channel arg to enable hedging (#26838)

* add separate channel arg to enable hedging

* revert change to retry_disabled test
pull/26766/head^2
Mark D. Roth 4 years ago committed by GitHub
parent ea87fcf271
commit fbd5e9f2ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      include/grpc/impl/codegen/grpc_types.h
  2. 62
      src/core/ext/filters/client_channel/retry_service_config.cc
  3. 2
      src/core/ext/filters/client_channel/retry_service_config.h
  4. 65
      test/core/client_channel/service_config_test.cc
  5. 9
      test/core/end2end/tests/retry_lb_drop.cc
  6. 2
      test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
  7. 2
      test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc

@ -384,12 +384,26 @@ typedef struct {
Defaults to "blend". In the current implementation "blend" is equivalent to
"latency". */
#define GRPC_ARG_OPTIMIZATION_TARGET "grpc.optimization_target"
/** If set to zero, disables retry behavior. Otherwise, transparent retries
are enabled for all RPCs, and configurable retries are enabled when they
are configured via the service config. For details, see:
/** Enables retry functionality. Defaults to false. When enabled,
configurable retries are enabled when they are configured via the
service config. For details, see:
https://github.com/grpc/proposal/blob/master/A6-client-retries.md
NOTE: Transparent retries are not yet implemented. When they are
implemented, they will also be enabled by this arg.
NOTE: Hedging functionality is not yet implemented, so those
fields in the service config will currently be ignored. See
also the GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING arg below.
*/
#define GRPC_ARG_ENABLE_RETRIES "grpc.enable_retries"
/** Enables hedging functionality, as described in:
https://github.com/grpc/proposal/blob/master/A6-client-retries.md
Default is currently false, since this functionality is not yet
fully implemented.
NOTE: This channel arg is experimental and will eventually be removed.
Once hedging functionality has been implemented and proves stable,
this arg will be removed, and the hedging functionality will
be enabled via the GRPC_ARG_ENABLE_RETRIES arg above. */
#define GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING "grpc.experimental.enable_hedging"
/** Per-RPC retry buffer size, in bytes. Default is 256 KiB. */
#define GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE "grpc.per_rpc_retry_buffer_size"
/** Channel arg that carries the bridged objective c object for custom metrics

@ -159,9 +159,9 @@ RetryServiceConfigParser::ParseGlobalParams(const grpc_channel_args* /*args*/,
namespace {
grpc_error_handle ParseRetryPolicy(
const Json& json, int* max_attempts, grpc_millis* initial_backoff,
grpc_millis* max_backoff, float* backoff_multiplier,
StatusCodeSet* retryable_status_codes,
const grpc_channel_args* args, const Json& json, int* max_attempts,
grpc_millis* initial_backoff, grpc_millis* max_backoff,
float* backoff_multiplier, StatusCodeSet* retryable_status_codes,
absl::optional<grpc_millis>* per_attempt_recv_timeout) {
if (json.type() != Json::Type::OBJECT) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -251,28 +251,38 @@ grpc_error_handle ParseRetryPolicy(
}
}
// Parse perAttemptRecvTimeout.
it = json.object_value().find("perAttemptRecvTimeout");
if (it != json.object_value().end()) {
grpc_millis per_attempt_recv_timeout_value;
if (!ParseDurationFromJson(it->second, &per_attempt_recv_timeout_value)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:perAttemptRecvTimeout error:type must be STRING of the "
"form given by google.proto.Duration."));
} else {
*per_attempt_recv_timeout = per_attempt_recv_timeout_value;
// TODO(roth): As part of implementing hedging, relax this check such
// that we allow a value of 0 if a hedging policy is specified.
if (per_attempt_recv_timeout_value == 0) {
if (grpc_channel_args_find_bool(args, GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING,
false)) {
it = json.object_value().find("perAttemptRecvTimeout");
if (it != json.object_value().end()) {
grpc_millis per_attempt_recv_timeout_value;
if (!ParseDurationFromJson(it->second, &per_attempt_recv_timeout_value)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:perAttemptRecvTimeout error:must be greater than 0"));
"field:perAttemptRecvTimeout error:type must be STRING of the "
"form given by google.proto.Duration."));
} else {
*per_attempt_recv_timeout = per_attempt_recv_timeout_value;
// TODO(roth): As part of implementing hedging, relax this check such
// that we allow a value of 0 if a hedging policy is specified.
if (per_attempt_recv_timeout_value == 0) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:perAttemptRecvTimeout error:must be greater than 0"));
}
}
} else if (retryable_status_codes->Empty()) {
// If perAttemptRecvTimeout not present, retryableStatusCodes must be
// non-empty.
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:retryableStatusCodes error:must be non-empty if "
"perAttemptRecvTimeout not present"));
}
} else {
// Hedging not enabled, so the error message for
// retryableStatusCodes unset should be different.
if (retryable_status_codes->Empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:retryableStatusCodes error:must be non-empty"));
}
} else if (retryable_status_codes->Empty()) {
// If perAttemptRecvTimeout not present, retryableStatusCodes must be
// non-empty.
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:retryableStatusCodes error:must be non-empty if "
"perAttemptRecvTimeout not present"));
}
return GRPC_ERROR_CREATE_FROM_VECTOR("retryPolicy", &error_list);
}
@ -280,9 +290,9 @@ grpc_error_handle ParseRetryPolicy(
} // namespace
std::unique_ptr<ServiceConfigParser::ParsedConfig>
RetryServiceConfigParser::ParsePerMethodParams(
const grpc_channel_args* /*args*/, const Json& json,
grpc_error_handle* error) {
RetryServiceConfigParser::ParsePerMethodParams(const grpc_channel_args* args,
const Json& json,
grpc_error_handle* error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
// Parse retry policy.
auto it = json.object_value().find("retryPolicy");
@ -293,7 +303,7 @@ RetryServiceConfigParser::ParsePerMethodParams(
float backoff_multiplier = 0;
StatusCodeSet retryable_status_codes;
absl::optional<grpc_millis> per_attempt_recv_timeout;
*error = ParseRetryPolicy(it->second, &max_attempts, &initial_backoff,
*error = ParseRetryPolicy(args, it->second, &max_attempts, &initial_backoff,
&max_backoff, &backoff_multiplier,
&retryable_status_codes, &per_attempt_recv_timeout);
if (*error != GRPC_ERROR_NONE) return nullptr;

@ -83,7 +83,7 @@ class RetryServiceConfigParser : public ServiceConfigParser::Parser {
grpc_error_handle* error) override;
std::unique_ptr<ServiceConfigParser::ParsedConfig> ParsePerMethodParams(
const grpc_channel_args* /*args*/, const Json& json,
const grpc_channel_args* args, const Json& json,
grpc_error_handle* error) override;
static size_t ParserIndex();

@ -1189,8 +1189,7 @@ TEST_F(RetryParserTest, InvalidRetryPolicyEmptyRetryableStatusCodes) {
"Method Params.*referenced_errors.*"
"methodConfig.*referenced_errors.*"
"retryPolicy.*referenced_errors.*"
"field:retryableStatusCodes error:must be non-empty if "
"perAttemptRecvTimeout not present"));
"field:retryableStatusCodes error:must be non-empty"));
GRPC_ERROR_UNREF(error);
}
@ -1271,7 +1270,10 @@ TEST_F(RetryParserTest, ValidRetryPolicyWithPerAttemptRecvTimeout) {
" } ]\n"
"}";
grpc_error_handle error = GRPC_ERROR_NONE;
auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error);
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1);
grpc_channel_args args = {1, &arg};
auto svc_cfg = ServiceConfig::Create(&args, test_json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
const auto* vector_ptr = svc_cfg->GetMethodParsedConfigVector(
grpc_slice_from_static_string("/TestServ/TestMethod"));
@ -1289,6 +1291,43 @@ TEST_F(RetryParserTest, ValidRetryPolicyWithPerAttemptRecvTimeout) {
parsed_config->retryable_status_codes().Contains(GRPC_STATUS_ABORTED));
}
TEST_F(RetryParserTest,
ValidRetryPolicyWithPerAttemptRecvTimeoutIgnoredWhenHedgingDisabled) {
const char* test_json =
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"TestServ\", \"method\": \"TestMethod\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 2,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"perAttemptRecvTimeout\": \"1s\",\n"
" \"retryableStatusCodes\": [\"ABORTED\"]\n"
" }\n"
" } ]\n"
"}";
grpc_error_handle error = GRPC_ERROR_NONE;
auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
const auto* vector_ptr = svc_cfg->GetMethodParsedConfigVector(
grpc_slice_from_static_string("/TestServ/TestMethod"));
ASSERT_NE(vector_ptr, nullptr);
const auto* parsed_config =
static_cast<grpc_core::internal::RetryMethodConfig*>(
((*vector_ptr)[0]).get());
ASSERT_NE(parsed_config, nullptr);
EXPECT_EQ(parsed_config->max_attempts(), 2);
EXPECT_EQ(parsed_config->initial_backoff(), 1000);
EXPECT_EQ(parsed_config->max_backoff(), 120000);
EXPECT_EQ(parsed_config->backoff_multiplier(), 1.6f);
EXPECT_EQ(parsed_config->per_attempt_recv_timeout(), absl::nullopt);
EXPECT_TRUE(
parsed_config->retryable_status_codes().Contains(GRPC_STATUS_ABORTED));
}
TEST_F(RetryParserTest,
ValidRetryPolicyWithPerAttemptRecvTimeoutAndUnsetRetryableStatusCodes) {
const char* test_json =
@ -1307,7 +1346,10 @@ TEST_F(RetryParserTest,
" } ]\n"
"}";
grpc_error_handle error = GRPC_ERROR_NONE;
auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error);
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1);
grpc_channel_args args = {1, &arg};
auto svc_cfg = ServiceConfig::Create(&args, test_json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
const auto* vector_ptr = svc_cfg->GetMethodParsedConfigVector(
grpc_slice_from_static_string("/TestServ/TestMethod"));
@ -1342,7 +1384,10 @@ TEST_F(RetryParserTest, InvalidRetryPolicyPerAttemptRecvTimeoutUnparseable) {
" } ]\n"
"}";
grpc_error_handle error = GRPC_ERROR_NONE;
auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error);
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1);
grpc_channel_args args = {1, &arg};
auto svc_cfg = ServiceConfig::Create(&args, test_json, &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"Service config parsing error.*referenced_errors.*"
@ -1372,7 +1417,10 @@ TEST_F(RetryParserTest, InvalidRetryPolicyPerAttemptRecvTimeoutWrongType) {
" } ]\n"
"}";
grpc_error_handle error = GRPC_ERROR_NONE;
auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error);
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1);
grpc_channel_args args = {1, &arg};
auto svc_cfg = ServiceConfig::Create(&args, test_json, &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"Service config parsing error.*referenced_errors.*"
@ -1402,7 +1450,10 @@ TEST_F(RetryParserTest, InvalidRetryPolicyPerAttemptRecvTimeoutBadValue) {
" } ]\n"
"}";
grpc_error_handle error = GRPC_ERROR_NONE;
auto svc_cfg = ServiceConfig::Create(nullptr, test_json, &error);
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1);
grpc_channel_args args = {1, &arg};
auto svc_cfg = ServiceConfig::Create(&args, test_json, &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"Service config parsing error.*referenced_errors.*"

@ -159,11 +159,10 @@ static void end_test(grpc_end2end_test_fixture* f) {
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests that we don't retry when retries are disabled via the
// GRPC_ARG_ENABLE_RETRIES channel arg, even when there is retry
// configuration in the service config.
// - 1 retry allowed for ABORTED status
// - first attempt returns ABORTED but does not retry
// Tests that we don't retry when the LB policy drops a call,
// even when there is retry configuration in the service config.
// - 1 retry allowed for UNAVAILABLE status
// - first attempt returns UNAVAILABLE due to LB drop but does not retry
static void test_retry_lb_drop(grpc_end2end_test_config config) {
grpc_call* c;
grpc_op ops[6];

@ -124,6 +124,8 @@ static void test_retry_per_attempt_recv_timeout(
grpc_arg args[] = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 1),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1),
grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
const_cast<char*>(

@ -120,6 +120,8 @@ static void test_retry_per_attempt_recv_timeout_on_last_attempt(
grpc_arg args[] = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 1),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_EXPERIMENTAL_ENABLE_HEDGING), 1),
grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
const_cast<char*>(

Loading…
Cancel
Save