Adding retry policy (#26566)

pull/26770/head
donnadionne 3 years ago committed by GitHub
parent 1265cfd29a
commit 7eaf37bce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      src/core/ext/filters/client_channel/client_channel.cc
  2. 1
      src/core/ext/filters/client_channel/client_channel.h
  3. 51
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  4. 165
      src/core/ext/xds/xds_api.cc
  5. 28
      src/core/ext/xds/xds_api.h
  6. 4
      src/core/lib/channel/status_util.h
  7. 16
      src/proto/grpc/testing/xds/v3/route.proto
  8. 343
      test/cpp/end2end/xds_end2end_test.cc

@ -1057,10 +1057,6 @@ void ClientChannel::Destroy(grpc_channel_element* elem) {
namespace {
bool GetEnableRetries(const grpc_channel_args* args) {
return grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, false);
}
RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
const grpc_channel_args* args) {
const bool use_local_subchannel_pool = grpc_channel_args_find_bool(
@ -1082,7 +1078,6 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args,
grpc_error_handle* error)
: deadline_checking_enabled_(
grpc_deadline_checking_enabled(args->channel_args)),
enable_retries_(GetEnableRetries(args->channel_args)),
owning_stack_(args->channel_stack),
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
@ -1507,14 +1502,6 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
config_selector =
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Construct dynamic filter stack.
std::vector<const grpc_channel_filter*> filters =
config_selector->GetFilters();
if (enable_retries_) {
filters.push_back(&kRetryFilterVtable);
} else {
filters.push_back(&DynamicTerminationFilter::kFilterVtable);
}
absl::InlinedVector<grpc_arg, 2> args_to_add = {
grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL), this,
@ -1526,6 +1513,16 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
channel_args_, args_to_add.data(), args_to_add.size());
new_args = config_selector->ModifyChannelArgs(new_args);
bool enable_retries =
grpc_channel_args_find_bool(new_args, GRPC_ARG_ENABLE_RETRIES, false);
// Construct dynamic filter stack.
std::vector<const grpc_channel_filter*> filters =
config_selector->GetFilters();
if (enable_retries) {
filters.push_back(&kRetryFilterVtable);
} else {
filters.push_back(&DynamicTerminationFilter::kFilterVtable);
}
RefCountedPtr<DynamicFilters> dynamic_filters =
DynamicFilters::Create(new_args, std::move(filters));
GPR_ASSERT(dynamic_filters != nullptr);

@ -276,7 +276,6 @@ class ClientChannel {
// Fields set at construction and never modified.
//
const bool deadline_checking_enabled_;
const bool enable_retries_;
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
const grpc_channel_args* channel_args_;

@ -242,6 +242,7 @@ class XdsResolver : public Resolver {
RouteTable route_table_;
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
std::vector<const grpc_channel_filter*> filters_;
bool retry_enabled_ = false;
};
void OnListenerUpdate(XdsApi::LdsUpdate listener);
@ -468,6 +469,43 @@ grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
const XdsApi::Route::ClusterWeight* cluster_weight,
RefCountedPtr<ServiceConfig>* method_config) {
std::vector<std::string> fields;
// Set retry policy if any.
if (route.retry_policy.has_value()) {
retry_enabled_ = true;
std::vector<std::string> retry_parts;
retry_parts.push_back(absl::StrFormat(
"\"retryPolicy\": {\n"
" \"maxAttempts\": %d,\n"
" \"initialBackoff\": \"%d.%09ds\",\n"
" \"maxBackoff\": \"%d.%09ds\",\n"
" \"backoffMultiplier\": 2,\n",
route.retry_policy->num_retries + 1,
route.retry_policy->retry_back_off.base_interval.seconds,
route.retry_policy->retry_back_off.base_interval.nanos,
route.retry_policy->retry_back_off.max_interval.seconds,
route.retry_policy->retry_back_off.max_interval.nanos));
std::vector<std::string> code_parts;
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
code_parts.push_back(" \"CANCELLED\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_DEADLINE_EXCEEDED)) {
code_parts.push_back(" \"DEADLINE_EXCEEDED\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
code_parts.push_back(" \"INTERNAL\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_RESOURCE_EXHAUSTED)) {
code_parts.push_back(" \"RESOURCE_EXHAUSTED\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
code_parts.push_back(" \"UNAVAILABLE\"");
}
retry_parts.push_back(
absl::StrFormat(" \"retryableStatusCodes\": [\n %s ]\n",
absl::StrJoin(code_parts, ",\n")));
retry_parts.push_back(absl::StrFormat(" }"));
fields.emplace_back(absl::StrJoin(retry_parts, ""));
}
// Set timeout.
if (route.max_stream_duration.has_value() &&
(route.max_stream_duration->seconds != 0 ||
@ -537,7 +575,18 @@ grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
grpc_channel_args* XdsResolver::XdsConfigSelector::ModifyChannelArgs(
grpc_channel_args* args) {
return args;
// The max number of args to add is 1 so far; when more args need to be added
// we will increase the size of args_to_add accordingly;
absl::InlinedVector<grpc_arg, 1> args_to_add;
if (retry_enabled_) {
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 1));
}
if (args_to_add.empty()) return args;
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
args, args_to_add.data(), args_to_add.size());
grpc_channel_args_destroy(args);
return new_args;
}
void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {

@ -137,6 +137,17 @@ bool XdsSecurityEnabled() {
return parse_succeeded && parsed_value;
}
// TODO(donnadionne): Check to see if retry policy is enabled, this will be
// removed once retry functionality is fully integration-tested and enabled by
// default.
bool XdsRetryEnabled() {
char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
//
// XdsApi::Route::HashPolicy
//
@ -214,6 +225,25 @@ std::string XdsApi::Route::HashPolicy::ToString() const {
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}
//
// XdsApi::Route::RetryPolicy
//
std::string XdsApi::Route::RetryPolicy::RetryBackOff::ToString() const {
std::vector<std::string> contents;
contents.push_back(
absl::StrCat("RetryBackOff Base: ", base_interval.ToString()));
contents.push_back(
absl::StrCat("RetryBackOff max: ", max_interval.ToString()));
return absl::StrJoin(contents, ",");
}
std::string XdsApi::Route::RetryPolicy::ToString() const {
std::vector<std::string> contents;
contents.push_back(absl::StrFormat("num_retries=%d", num_retries));
contents.push_back(retry_back_off.ToString());
return absl::StrJoin(contents, ",");
}
//
// XdsApi::Route
//
@ -255,6 +285,10 @@ std::string XdsApi::Route::ToString() const {
for (const HashPolicy& hash_policy : hash_policies) {
contents.push_back(absl::StrCat("hash_policy=", hash_policy.ToString()));
}
if (retry_policy.has_value()) {
contents.push_back(
absl::StrCat("retry_policy={", retry_policy->ToString(), "}"));
}
if (!cluster_name.empty()) {
contents.push_back(absl::StrFormat("Cluster name: %s", cluster_name));
}
@ -1536,6 +1570,100 @@ grpc_error_handle ParseTypedPerFilterConfig(
return GRPC_ERROR_NONE;
}
XdsApi::Duration DurationParse(const google_protobuf_Duration* proto_duration) {
XdsApi::Duration duration;
duration.seconds = google_protobuf_Duration_seconds(proto_duration);
duration.nanos = google_protobuf_Duration_nanos(proto_duration);
return duration;
}
grpc_error_handle RetryPolicyParse(
const EncodingContext& context,
const envoy_config_route_v3_RetryPolicy* retry_policy,
absl::optional<XdsApi::Route::RetryPolicy>* retry) {
std::vector<grpc_error_handle> errors;
XdsApi::Route::RetryPolicy retry_to_return;
auto retry_on = UpbStringToStdString(
envoy_config_route_v3_RetryPolicy_retry_on(retry_policy));
std::vector<absl::string_view> codes = absl::StrSplit(retry_on, ',');
for (const auto& code : codes) {
if (code == "cancelled") {
retry_to_return.retry_on.Add(GRPC_STATUS_CANCELLED);
} else if (code == "deadline-exceeded") {
retry_to_return.retry_on.Add(GRPC_STATUS_DEADLINE_EXCEEDED);
} else if (code == "internal") {
retry_to_return.retry_on.Add(GRPC_STATUS_INTERNAL);
} else if (code == "resource-exhausted") {
retry_to_return.retry_on.Add(GRPC_STATUS_RESOURCE_EXHAUSTED);
} else if (code == "unavailable") {
retry_to_return.retry_on.Add(GRPC_STATUS_UNAVAILABLE);
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "Unsupported retry_on policy %s.",
std::string(code).c_str());
}
}
}
// TODO(donnadionne): when we add support for per_try_timeout, we will need to
// return a policy if per_try_timeout is set even if retry_on specified no
// supported policies.
if (retry_to_return.retry_on.Empty()) return GRPC_ERROR_NONE;
const google_protobuf_UInt32Value* num_retries =
envoy_config_route_v3_RetryPolicy_num_retries(retry_policy);
if (num_retries != nullptr) {
uint32_t num_retries_value = google_protobuf_UInt32Value_value(num_retries);
if (num_retries_value == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
"RouteAction RetryPolicy num_retries set to invalid value 0."));
} else {
retry_to_return.num_retries = num_retries_value;
}
} else {
retry_to_return.num_retries = 1;
}
const envoy_config_route_v3_RetryPolicy_RetryBackOff* backoff =
envoy_config_route_v3_RetryPolicy_retry_back_off(retry_policy);
if (backoff != nullptr) {
const google_protobuf_Duration* base_interval =
envoy_config_route_v3_RetryPolicy_RetryBackOff_base_interval(backoff);
if (base_interval == nullptr) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
"RouteAction RetryPolicy RetryBackoff missing base interval."));
} else {
retry_to_return.retry_back_off.base_interval =
DurationParse(base_interval);
}
const google_protobuf_Duration* max_interval =
envoy_config_route_v3_RetryPolicy_RetryBackOff_max_interval(backoff);
XdsApi::Duration max;
if (max_interval != nullptr) {
max = DurationParse(max_interval);
} else {
// if max interval is not set, it is 10x the base, if the value in nanos
// can yield another second, adjust the value in seconds accordingly.
max.seconds = retry_to_return.retry_back_off.base_interval.seconds * 10;
max.nanos = retry_to_return.retry_back_off.base_interval.nanos * 10;
if (max.nanos > 1000000000) {
max.seconds += max.nanos / 1000000000;
max.nanos = max.nanos % 1000000000;
}
}
retry_to_return.retry_back_off.max_interval = max;
} else {
retry_to_return.retry_back_off.base_interval.seconds = 0;
retry_to_return.retry_back_off.base_interval.nanos = 25000000;
retry_to_return.retry_back_off.max_interval.seconds = 0;
retry_to_return.retry_back_off.max_interval.nanos = 250000000;
}
if (errors.empty()) {
*retry = retry_to_return;
return GRPC_ERROR_NONE;
} else {
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing retry policy",
&errors);
}
}
grpc_error_handle RouteActionParse(const EncodingContext& context,
const envoy_config_route_v3_Route* route_msg,
XdsApi::Route* route, bool* ignore_route) {
@ -1629,10 +1757,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
max_stream_duration);
}
if (duration != nullptr) {
XdsApi::Duration duration_in_route;
duration_in_route.seconds = google_protobuf_Duration_seconds(duration);
duration_in_route.nanos = google_protobuf_Duration_nanos(duration);
route->max_stream_duration = duration_in_route;
route->max_stream_duration = DurationParse(duration);
}
}
}
@ -1713,6 +1838,17 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
route->hash_policies.emplace_back(std::move(policy));
}
}
// Get retry policy
if (XdsRetryEnabled()) {
const envoy_config_route_v3_RetryPolicy* retry_policy =
envoy_config_route_v3_RouteAction_retry_policy(route_action);
if (retry_policy != nullptr) {
absl::optional<XdsApi::Route::RetryPolicy> retry;
grpc_error_handle error = RetryPolicyParse(context, retry_policy, &retry);
if (error != GRPC_ERROR_NONE) return error;
route->retry_policy = retry;
}
}
return GRPC_ERROR_NONE;
}
@ -1758,6 +1894,17 @@ grpc_error_handle RouteConfigParse(
&vhost.typed_per_filter_config);
if (error != GRPC_ERROR_NONE) return error;
}
// Parse retry policy.
absl::optional<XdsApi::Route::RetryPolicy> virtual_host_retry_policy;
const envoy_config_route_v3_RetryPolicy* retry_policy =
envoy_config_route_v3_VirtualHost_retry_policy(virtual_hosts[i]);
if (XdsRetryEnabled()) {
if (retry_policy != nullptr) {
grpc_error_handle error =
RetryPolicyParse(context, retry_policy, &virtual_host_retry_policy);
if (error != GRPC_ERROR_NONE) return error;
}
}
// Parse routes.
size_t num_routes;
const envoy_config_route_v3_Route* const* routes =
@ -1792,6 +1939,10 @@ grpc_error_handle RouteConfigParse(
error = RouteActionParse(context, routes[j], &route, &ignore_route);
if (error != GRPC_ERROR_NONE) return error;
if (ignore_route) continue;
if (XdsRetryEnabled() && route.retry_policy == absl::nullopt &&
retry_policy != nullptr) {
route.retry_policy = virtual_host_retry_policy;
}
if (context.use_v3) {
grpc_error_handle error = ParseTypedPerFilterConfig<
envoy_config_route_v3_Route,
@ -1949,10 +2100,8 @@ grpc_error_handle HttpConnectionManagerParse(
const google_protobuf_Duration* duration =
envoy_config_core_v3_HttpProtocolOptions_max_stream_duration(options);
if (duration != nullptr) {
http_connection_manager->http_max_stream_duration.seconds =
google_protobuf_Duration_seconds(duration);
http_connection_manager->http_max_stream_duration.nanos =
google_protobuf_Duration_nanos(duration);
http_connection_manager->http_max_stream_duration =
DurationParse(duration);
}
}
// Parse filters.

@ -38,6 +38,7 @@
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/matchers/matchers.h"
namespace grpc_core {
@ -109,10 +110,34 @@ class XdsApi {
bool operator==(const HashPolicy& other) const;
std::string ToString() const;
};
Matchers matchers;
std::vector<HashPolicy> hash_policies;
struct RetryPolicy {
internal::StatusCodeSet retry_on;
uint32_t num_retries;
struct RetryBackOff {
Duration base_interval;
Duration max_interval;
bool operator==(const RetryBackOff& other) const {
return base_interval == other.base_interval &&
max_interval == other.max_interval;
}
std::string ToString() const;
};
RetryBackOff retry_back_off;
bool operator==(const RetryPolicy& other) const {
return (retry_on == other.retry_on &&
num_retries == other.num_retries &&
retry_back_off == other.retry_back_off);
}
std::string ToString() const;
};
absl::optional<RetryPolicy> retry_policy;
// Action for this route.
// TODO(roth): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of the two fields can be set.
@ -139,6 +164,7 @@ class XdsApi {
bool operator==(const Route& other) const {
return matchers == other.matchers && cluster_name == other.cluster_name &&
retry_policy == other.retry_policy &&
weighted_clusters == other.weighted_clusters &&
max_stream_duration == other.max_stream_duration &&
typed_per_filter_config == other.typed_per_filter_config;

@ -53,6 +53,10 @@ class StatusCodeSet {
return status_code_mask_ & (1 << status);
}
bool operator==(const StatusCodeSet& other) const {
return status_code_mask_ == other.status_code_mask_;
}
private:
int status_code_mask_ = 0; // A bitfield of status codes in the set.
};

@ -31,6 +31,18 @@ import "google/protobuf/wrappers.proto";
// * Routing :ref:`architecture overview <arch_overview_http_routing>`
// * HTTP :ref:`router filter <config_http_filters_router>`
message RetryPolicy {
string retry_on = 1;
google.protobuf.UInt32Value num_retries = 2;
message RetryBackOff {
google.protobuf.Duration base_interval = 1;
google.protobuf.Duration max_interval = 2;
}
RetryBackOff retry_back_off = 8;
}
// The top level element in the routing configuration is a virtual host. Each virtual host has
// a logical name as well as a set of domains that get routed to it based on the incoming request's
// host header. This allows a single listener to service multiple top level domain path trees. Once
@ -72,6 +84,8 @@ message VirtualHost {
// specific; see the :ref:`HTTP filter documentation <config_http_filters>`
// for if and how it is utilized.
map<string, google.protobuf.Any> typed_per_filter_config = 15;
RetryPolicy retry_policy = 16;
}
// A route is both a specification of how to match a request as well as an indication of what to do
@ -318,6 +332,8 @@ message RouteAction {
repeated HashPolicy hash_policy = 15;
RetryPolicy retry_policy = 9;
// Specifies the maximum stream duration for this route.
MaxStreamDuration max_stream_duration = 36;
}

@ -1878,8 +1878,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
bool wait_for_ready = false;
bool server_fail = false;
std::vector<std::pair<std::string, std::string>> metadata;
int server_sleep_us = 0;
int client_cancel_after_us = 0;
bool skip_cancelled_check = false;
StatusCode server_expected_error = StatusCode::OK;
RpcOptions() {}
@ -1919,11 +1921,21 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return *this;
}
RpcOptions& set_server_sleep_us(int rpc_server_sleep_us) {
server_sleep_us = rpc_server_sleep_us;
return *this;
}
RpcOptions& set_client_cancel_after_us(int rpc_client_cancel_after_us) {
client_cancel_after_us = rpc_client_cancel_after_us;
return *this;
}
RpcOptions& set_server_expected_error(StatusCode code) {
server_expected_error = code;
return *this;
}
// Populates context and request.
void SetupRpc(ClientContext* context, EchoRequest* request) const {
for (const auto& item : metadata) {
@ -1939,6 +1951,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
request->mutable_param()->mutable_expected_error()->set_code(
GRPC_STATUS_FAILED_PRECONDITION);
}
if (server_sleep_us != 0) {
request->mutable_param()->set_server_sleep_us(server_sleep_us);
}
if (client_cancel_after_us != 0) {
request->mutable_param()->set_client_cancel_after_us(
client_cancel_after_us);
@ -2186,6 +2201,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
if (local_response) response = new EchoResponse;
ClientContext context;
EchoRequest request;
if (rpc_options.server_expected_error != StatusCode::OK) {
auto* error = request.mutable_param()->mutable_expected_error();
error->set_code(rpc_options.server_expected_error);
}
rpc_options.SetupRpc(&context, &request);
Status status;
switch (rpc_options.service) {
@ -5595,6 +5614,330 @@ TEST_P(LdsRdsTest, XdsRoutingWithOnlyApplicationTimeout) {
kTimeoutApplicationSecond * 1000000000);
}
TEST_P(LdsRdsTest, XdsRetryPolicyNumRetries) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
const size_t kNumRetries = 3;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on(
"5xx,cancelled,deadline-exceeded,internal,resource-exhausted,"
"unavailable");
retry_policy->mutable_num_retries()->set_value(kNumRetries);
SetRouteConfiguration(0, new_route_config);
// Ensure we retried the correct number of times on all supported status.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_server_expected_error(StatusCode::CANCELLED))
.set_expected_error_code(StatusCode::CANCELLED));
EXPECT_EQ(kNumRetries + 1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::DEADLINE_EXCEEDED))
.set_expected_error_code(StatusCode::DEADLINE_EXCEEDED));
EXPECT_EQ(kNumRetries + 1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_server_expected_error(StatusCode::INTERNAL))
.set_expected_error_code(StatusCode::INTERNAL));
EXPECT_EQ(kNumRetries + 1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::RESOURCE_EXHAUSTED))
.set_expected_error_code(StatusCode::RESOURCE_EXHAUSTED));
EXPECT_EQ(kNumRetries + 1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_server_expected_error(StatusCode::UNAVAILABLE))
.set_expected_error_code(StatusCode::UNAVAILABLE));
EXPECT_EQ(kNumRetries + 1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
// Ensure we don't retry on an unsupported status.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::UNAUTHENTICATED))
.set_expected_error_code(StatusCode::UNAUTHENTICATED));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyAtVirtualHostLevel) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
const size_t kNumRetries = 3;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* retry_policy =
new_route_config.mutable_virtual_hosts(0)->mutable_retry_policy();
retry_policy->set_retry_on(
"cancelled,deadline-exceeded,internal,resource-exhausted,unavailable");
retry_policy->mutable_num_retries()->set_value(kNumRetries);
SetRouteConfiguration(0, new_route_config);
// Ensure we retried the correct number of times on a supported status.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::DEADLINE_EXCEEDED))
.set_expected_error_code(StatusCode::DEADLINE_EXCEEDED));
EXPECT_EQ(kNumRetries + 1, backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyLongBackOff) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
// Set num retries to 3, but due to longer back off, we expect only 1 retry
// will take place.
const size_t kNumRetries = 3;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on(
"5xx,cancelled,deadline-exceeded,internal,resource-exhausted,"
"unavailable");
retry_policy->mutable_num_retries()->set_value(kNumRetries);
auto base_interval =
retry_policy->mutable_retry_back_off()->mutable_base_interval();
// Set backoff to 1 second, 1/2 of rpc timeout of 2 second.
base_interval->set_seconds(1 * grpc_test_slowdown_factor());
base_interval->set_nanos(0);
SetRouteConfiguration(0, new_route_config);
// No need to set max interval and just let it be the default of 10x of base.
// We expect 1 retry before the RPC times out with DEADLINE_EXCEEDED.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_timeout_ms(2500).set_server_expected_error(
StatusCode::CANCELLED))
.set_expected_error_code(StatusCode::DEADLINE_EXCEEDED));
EXPECT_EQ(1 + 1, backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyMaxBackOff) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
// Set num retries to 3, but due to longer back off, we expect only 2 retry
// will take place, while the 2nd one will obey the max backoff.
const size_t kNumRetries = 3;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on(
"5xx,cancelled,deadline-exceeded,internal,resource-exhausted,"
"unavailable");
retry_policy->mutable_num_retries()->set_value(kNumRetries);
auto base_interval =
retry_policy->mutable_retry_back_off()->mutable_base_interval();
// Set backoff to 1 second.
base_interval->set_seconds(1 * grpc_test_slowdown_factor());
base_interval->set_nanos(0);
auto max_interval =
retry_policy->mutable_retry_back_off()->mutable_max_interval();
// Set max interval to be the same as base, so 2 retries will take 2 seconds
// and both retries will take place before the 2.5 seconds rpc timeout.
// Tested to ensure if max is not set, this test will be the same as
// XdsRetryPolicyLongBackOff and we will only see 1 retry in that case.
max_interval->set_seconds(1 * grpc_test_slowdown_factor());
max_interval->set_nanos(0);
SetRouteConfiguration(0, new_route_config);
// We expect 2 retry before the RPC times out with DEADLINE_EXCEEDED.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_timeout_ms(2500).set_server_expected_error(
StatusCode::CANCELLED))
.set_expected_error_code(StatusCode::DEADLINE_EXCEEDED));
EXPECT_EQ(2 + 1, backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyUnsupportedStatusCode) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
const size_t kNumRetries = 3;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on("5xx");
retry_policy->mutable_num_retries()->set_value(kNumRetries);
SetRouteConfiguration(0, new_route_config);
// We expect no retry.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::DEADLINE_EXCEEDED))
.set_expected_error_code(StatusCode::DEADLINE_EXCEEDED));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyInvalidNumRetriesZero) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on("deadline-exceeded");
// Setting num_retries to zero is not valid.
retry_policy->mutable_num_retries()->set_value(0);
SetRouteConfiguration(0, new_route_config);
ASSERT_TRUE(WaitForRdsNack()) << "timed out waiting for NACK";
const auto response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(
response_state.error_message,
::testing::HasSubstr(
"RouteAction RetryPolicy num_retries set to invalid value 0."));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyRetryBackOffMissingBaseInterval) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY", "true");
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on("deadline-exceeded");
retry_policy->mutable_num_retries()->set_value(1);
// RetryBackoff is there but base interval is missing.
auto max_interval =
retry_policy->mutable_retry_back_off()->mutable_max_interval();
max_interval->set_seconds(0);
max_interval->set_nanos(250000000);
SetRouteConfiguration(0, new_route_config);
ASSERT_TRUE(WaitForRdsNack()) << "timed out waiting for NACK";
const auto response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(
response_state.error_message,
::testing::HasSubstr(
"RouteAction RetryPolicy RetryBackoff missing base interval."));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY");
}
TEST_P(LdsRdsTest, XdsRetryPolicyDisabled) {
const size_t kNumRetries = 3;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Construct route config to set retry policy.
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* retry_policy = route1->mutable_route()->mutable_retry_policy();
retry_policy->set_retry_on(
"5xx,cancelled,deadline-exceeded,internal,resource-exhausted,"
"unavailable");
retry_policy->mutable_num_retries()->set_value(kNumRetries);
SetRouteConfiguration(0, new_route_config);
// Ensure we don't retry on supported statuses.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_server_expected_error(StatusCode::CANCELLED))
.set_expected_error_code(StatusCode::CANCELLED));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::DEADLINE_EXCEEDED))
.set_expected_error_code(StatusCode::DEADLINE_EXCEEDED));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_server_expected_error(StatusCode::INTERNAL))
.set_expected_error_code(StatusCode::INTERNAL));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::RESOURCE_EXHAUSTED))
.set_expected_error_code(StatusCode::RESOURCE_EXHAUSTED));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(
RpcOptions().set_server_expected_error(StatusCode::UNAVAILABLE))
.set_expected_error_code(StatusCode::UNAVAILABLE));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
ResetBackendCounters();
// Ensure we don't retry on an unsupported status.
CheckRpcSendFailure(
CheckRpcSendFailureOptions()
.set_rpc_options(RpcOptions().set_server_expected_error(
StatusCode::UNAUTHENTICATED))
.set_expected_error_code(StatusCode::UNAUTHENTICATED));
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
}
TEST_P(LdsRdsTest, XdsRoutingHeadersMatching) {
const char* kNewClusterName = "new_cluster";
const char* kNewEdsServiceName = "new_eds_service_name";

Loading…
Cancel
Save