Update RDS parsing for use on servers (#27715)

* Update RDS parsing for use on servers

* Unused variable

* Reviewer comments

* Automated change: Fix sanity tests

* Fix tests

* Reviewer comments

* Reviewer comments

* clang-tidy

* Reviewer comments

* Fix test

* Reviewer comments

* Reviewer comments

Co-authored-by: yashykt <yashykt@users.noreply.github.com>
pull/25992/head
Yash Tibrewal 3 years ago committed by GitHub
parent b8c38d6bc2
commit 9ac9a013fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 128
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  2. 226
      src/core/ext/xds/xds_api.cc
  3. 138
      src/core/ext/xds/xds_api.h
  4. 9
      src/proto/grpc/testing/xds/v3/route.proto
  5. 807
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -235,7 +235,7 @@ class XdsResolver : public Resolver {
void MaybeAddCluster(const std::string& name);
grpc_error_handle CreateMethodConfig(
const XdsApi::Route& route,
const XdsApi::Route::ClusterWeight* cluster_weight,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
RefCountedPtr<ServiceConfig>* method_config);
RefCountedPtr<XdsResolver> resolver_;
@ -392,30 +392,34 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector(
route_table_.emplace_back();
auto& route_entry = route_table_.back();
route_entry.route = route;
// If the route doesn't specify a timeout, set its timeout to the global
// one.
if (!route.max_stream_duration.has_value()) {
route_entry.route.max_stream_duration =
resolver_->current_listener_.http_connection_manager
.http_max_stream_duration;
}
if (route.weighted_clusters.empty()) {
*error = CreateMethodConfig(route_entry.route, nullptr,
&route_entry.method_config);
MaybeAddCluster(route.cluster_name);
} else {
uint32_t end = 0;
for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
Route::ClusterWeightState cluster_weight_state;
*error = CreateMethodConfig(route_entry.route, &weighted_cluster,
&cluster_weight_state.method_config);
if (*error != GRPC_ERROR_NONE) return;
end += weighted_cluster.weight;
cluster_weight_state.range_end = end;
cluster_weight_state.cluster = weighted_cluster.name;
route_entry.weighted_cluster_state.push_back(
std::move(cluster_weight_state));
MaybeAddCluster(weighted_cluster.name);
auto* route_action =
absl::get_if<XdsApi::Route::RouteAction>(&route_entry.route.action);
if (route_action != nullptr) {
// If the route doesn't specify a timeout, set its timeout to the global
// one.
if (!route_action->max_stream_duration.has_value()) {
route_action->max_stream_duration =
resolver_->current_listener_.http_connection_manager
.http_max_stream_duration;
}
if (route_action->weighted_clusters.empty()) {
*error = CreateMethodConfig(route_entry.route, nullptr,
&route_entry.method_config);
MaybeAddCluster(route_action->cluster_name);
} else {
uint32_t end = 0;
for (const auto& weighted_cluster : route_action->weighted_clusters) {
Route::ClusterWeightState cluster_weight_state;
*error = CreateMethodConfig(route_entry.route, &weighted_cluster,
&cluster_weight_state.method_config);
if (*error != GRPC_ERROR_NONE) return;
end += weighted_cluster.weight;
cluster_weight_state.range_end = end;
cluster_weight_state.cluster = weighted_cluster.name;
route_entry.weighted_cluster_state.push_back(
std::move(cluster_weight_state));
MaybeAddCluster(weighted_cluster.name);
}
}
}
}
@ -447,7 +451,7 @@ XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride(
const std::string& instance_name,
const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
const XdsApi::Route::ClusterWeight* cluster_weight) {
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight) {
// Check ClusterWeight, if any.
if (cluster_weight != nullptr) {
auto it = cluster_weight->typed_per_filter_config.find(instance_name);
@ -465,11 +469,14 @@ const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride(
grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
const XdsApi::Route& route,
const XdsApi::Route::ClusterWeight* cluster_weight,
const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
RefCountedPtr<ServiceConfig>* method_config) {
std::vector<std::string> fields;
const auto& route_action =
absl::get<XdsApi::Route::RouteAction>(route.action);
// Set retry policy if any.
if (route.retry_policy.has_value() && !route.retry_policy->retry_on.Empty()) {
if (route_action.retry_policy.has_value() &&
!route_action.retry_policy->retry_on.Empty()) {
std::vector<std::string> retry_parts;
retry_parts.push_back(absl::StrFormat(
"\"retryPolicy\": {\n"
@ -477,25 +484,27 @@ grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
" \"initialBackoff\": \"%d.%09ds\",\n"
" \"maxBackoff\": \"%d.%09ds\",\n"
" \"backoffMultiplier\": 2,\n",
route.retry_policy->num_retries + 1,
route.retry_policy->retry_back_off.base_interval.seconds,
route.retry_policy->retry_back_off.base_interval.nanos,
route.retry_policy->retry_back_off.max_interval.seconds,
route.retry_policy->retry_back_off.max_interval.nanos));
route_action.retry_policy->num_retries + 1,
route_action.retry_policy->retry_back_off.base_interval.seconds,
route_action.retry_policy->retry_back_off.base_interval.nanos,
route_action.retry_policy->retry_back_off.max_interval.seconds,
route_action.retry_policy->retry_back_off.max_interval.nanos));
std::vector<std::string> code_parts;
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
code_parts.push_back(" \"CANCELLED\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_DEADLINE_EXCEEDED)) {
if (route_action.retry_policy->retry_on.Contains(
GRPC_STATUS_DEADLINE_EXCEEDED)) {
code_parts.push_back(" \"DEADLINE_EXCEEDED\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
code_parts.push_back(" \"INTERNAL\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_RESOURCE_EXHAUSTED)) {
if (route_action.retry_policy->retry_on.Contains(
GRPC_STATUS_RESOURCE_EXHAUSTED)) {
code_parts.push_back(" \"RESOURCE_EXHAUSTED\"");
}
if (route.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
code_parts.push_back(" \"UNAVAILABLE\"");
}
retry_parts.push_back(
@ -505,12 +514,13 @@ grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
fields.emplace_back(absl::StrJoin(retry_parts, ""));
}
// Set timeout.
if (route.max_stream_duration.has_value() &&
(route.max_stream_duration->seconds != 0 ||
route.max_stream_duration->nanos != 0)) {
fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
route.max_stream_duration->seconds,
route.max_stream_duration->nanos));
if (route_action.max_stream_duration.has_value() &&
(route_action.max_stream_duration->seconds != 0 ||
route_action.max_stream_duration->nanos != 0)) {
fields.emplace_back(
absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
route_action.max_stream_duration->seconds,
route_action.max_stream_duration->nanos));
}
// Handle xDS HTTP filters.
std::map<std::string, std::vector<std::string>> per_filter_configs;
@ -613,9 +623,9 @@ bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
}
absl::optional<uint64_t> HeaderHashHelper(
const XdsApi::Route::HashPolicy& policy,
const XdsApi::Route::RouteAction::HashPolicy& policy,
grpc_metadata_batch* initial_metadata) {
GPR_ASSERT(policy.type == XdsApi::Route::HashPolicy::HEADER);
GPR_ASSERT(policy.type == XdsApi::Route::RouteAction::HashPolicy::HEADER);
std::string value_buffer;
absl::optional<absl::string_view> header_value =
GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
@ -659,10 +669,20 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
continue;
}
// Found a route match
const auto* route_action =
absl::get_if<XdsApi::Route::RouteAction>(&entry.route.action);
if (route_action == nullptr) {
CallConfig call_config;
call_config.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Matching route has inappropriate action"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
return call_config;
}
absl::string_view cluster_name;
RefCountedPtr<ServiceConfig> method_config;
if (entry.route.weighted_clusters.empty()) {
cluster_name = entry.route.cluster_name;
if (route_action->weighted_clusters.empty()) {
cluster_name = route_action->cluster_name;
method_config = entry.method_config;
} else {
const uint32_t key =
@ -694,13 +714,13 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
GPR_ASSERT(it != clusters_.end());
// Generate a hash.
absl::optional<uint64_t> hash;
for (const auto& hash_policy : entry.route.hash_policies) {
for (const auto& hash_policy : route_action->hash_policies) {
absl::optional<uint64_t> new_hash;
switch (hash_policy.type) {
case XdsApi::Route::HashPolicy::HEADER:
case XdsApi::Route::RouteAction::HashPolicy::HEADER:
new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
break;
case XdsApi::Route::HashPolicy::CHANNEL_ID:
case XdsApi::Route::RouteAction::HashPolicy::CHANNEL_ID:
new_hash = static_cast<uint64_t>(
reinterpret_cast<uintptr_t>(resolver_.get()));
break;
@ -907,13 +927,15 @@ void XdsResolver::GenerateResult() {
grpc_error_handle error = GRPC_ERROR_NONE;
auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &error);
if (error != GRPC_ERROR_NONE) {
OnError(error);
OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE));
return;
}
Result result;
error = CreateServiceConfig(&result.service_config);
if (error != GRPC_ERROR_NONE) {
OnError(error);
OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE));
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {

@ -115,11 +115,58 @@ bool XdsAggregateAndLogicalDnsClusterEnabled() {
return parse_succeeded && parsed_value;
}
// TODO(yashykt): Remove once RBAC is no longer experimental
bool XdsRbacEnabled() {
char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_RBAC");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
//
// XdsApi::RetryPolicy
//
std::string XdsApi::RetryPolicy::RetryBackOff::ToString() const {
std::vector<std::string> contents;
contents.push_back(
absl::StrCat("RetryBackOff Base: ", base_interval.ToString()));
contents.push_back(
absl::StrCat("RetryBackOff max: ", max_interval.ToString()));
return absl::StrJoin(contents, ",");
}
std::string XdsApi::RetryPolicy::ToString() const {
std::vector<std::string> contents;
contents.push_back(absl::StrFormat("num_retries=%d", num_retries));
contents.push_back(retry_back_off.ToString());
return absl::StrCat("{", absl::StrJoin(contents, ","), "}");
}
//
// XdsApi::Route::Matchers
//
std::string XdsApi::Route::Matchers::ToString() const {
std::vector<std::string> contents;
contents.push_back(
absl::StrFormat("PathMatcher{%s}", path_matcher.ToString()));
for (const HeaderMatcher& header_matcher : header_matchers) {
contents.push_back(header_matcher.ToString());
}
if (fraction_per_million.has_value()) {
contents.push_back(absl::StrFormat("Fraction Per Million %d",
fraction_per_million.value()));
}
return absl::StrJoin(contents, "\n");
}
//
// XdsApi::Route::HashPolicy
// XdsApi::Route::RouteAction::HashPolicy
//
XdsApi::Route::HashPolicy::HashPolicy(const HashPolicy& other)
XdsApi::Route::RouteAction::HashPolicy::HashPolicy(const HashPolicy& other)
: type(other.type),
header_name(other.header_name),
regex_substitution(other.regex_substitution) {
@ -129,8 +176,8 @@ XdsApi::Route::HashPolicy::HashPolicy(const HashPolicy& other)
}
}
XdsApi::Route::HashPolicy& XdsApi::Route::HashPolicy::operator=(
const HashPolicy& other) {
XdsApi::Route::RouteAction::HashPolicy&
XdsApi::Route::RouteAction::HashPolicy::operator=(const HashPolicy& other) {
type = other.type;
header_name = other.header_name;
if (other.regex != nullptr) {
@ -141,14 +188,14 @@ XdsApi::Route::HashPolicy& XdsApi::Route::HashPolicy::operator=(
return *this;
}
XdsApi::Route::HashPolicy::HashPolicy(HashPolicy&& other) noexcept
XdsApi::Route::RouteAction::HashPolicy::HashPolicy(HashPolicy&& other) noexcept
: type(other.type),
header_name(std::move(other.header_name)),
regex(std::move(other.regex)),
regex_substitution(std::move(other.regex_substitution)) {}
XdsApi::Route::HashPolicy& XdsApi::Route::HashPolicy::operator=(
HashPolicy&& other) noexcept {
XdsApi::Route::RouteAction::HashPolicy&
XdsApi::Route::RouteAction::HashPolicy::operator=(HashPolicy&& other) noexcept {
type = other.type;
header_name = std::move(other.header_name);
regex = std::move(other.regex);
@ -156,7 +203,7 @@ XdsApi::Route::HashPolicy& XdsApi::Route::HashPolicy::operator=(
return *this;
}
bool XdsApi::Route::HashPolicy::HashPolicy::operator==(
bool XdsApi::Route::RouteAction::HashPolicy::HashPolicy::operator==(
const HashPolicy& other) const {
if (type != other.type) return false;
if (type == Type::HEADER) {
@ -172,7 +219,7 @@ bool XdsApi::Route::HashPolicy::HashPolicy::operator==(
return true;
}
std::string XdsApi::Route::HashPolicy::ToString() const {
std::string XdsApi::Route::RouteAction::HashPolicy::ToString() const {
std::vector<std::string> contents;
switch (type) {
case Type::HEADER:
@ -193,43 +240,10 @@ std::string XdsApi::Route::HashPolicy::ToString() const {
}
//
// XdsApi::Route::RetryPolicy
//
std::string XdsApi::Route::RetryPolicy::RetryBackOff::ToString() const {
std::vector<std::string> contents;
contents.push_back(
absl::StrCat("RetryBackOff Base: ", base_interval.ToString()));
contents.push_back(
absl::StrCat("RetryBackOff max: ", max_interval.ToString()));
return absl::StrJoin(contents, ",");
}
std::string XdsApi::Route::RetryPolicy::ToString() const {
std::vector<std::string> contents;
contents.push_back(absl::StrFormat("num_retries=%d", num_retries));
contents.push_back(retry_back_off.ToString());
return absl::StrJoin(contents, ",");
}
//
// XdsApi::Route
// XdsApi::Route::RouteAction::ClusterWeight
//
std::string XdsApi::Route::Matchers::ToString() const {
std::vector<std::string> contents;
contents.push_back(
absl::StrFormat("PathMatcher{%s}", path_matcher.ToString()));
for (const HeaderMatcher& header_matcher : header_matchers) {
contents.push_back(header_matcher.ToString());
}
if (fraction_per_million.has_value()) {
contents.push_back(absl::StrFormat("Fraction Per Million %d",
fraction_per_million.value()));
}
return absl::StrJoin(contents, "\n");
}
std::string XdsApi::Route::ClusterWeight::ToString() const {
std::string XdsApi::Route::RouteAction::ClusterWeight::ToString() const {
std::vector<std::string> contents;
contents.push_back(absl::StrCat("cluster=", name));
contents.push_back(absl::StrCat("weight=", weight));
@ -246,15 +260,17 @@ std::string XdsApi::Route::ClusterWeight::ToString() const {
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}
std::string XdsApi::Route::ToString() const {
//
// XdsApi::Route::RouteAction
//
std::string XdsApi::Route::RouteAction::ToString() const {
std::vector<std::string> contents;
contents.push_back(matchers.ToString());
for (const HashPolicy& hash_policy : hash_policies) {
contents.push_back(absl::StrCat("hash_policy=", hash_policy.ToString()));
}
if (retry_policy.has_value()) {
contents.push_back(
absl::StrCat("retry_policy={", retry_policy->ToString(), "}"));
contents.push_back(absl::StrCat("retry_policy=", retry_policy->ToString()));
}
if (!cluster_name.empty()) {
contents.push_back(absl::StrFormat("Cluster name: %s", cluster_name));
@ -265,6 +281,25 @@ std::string XdsApi::Route::ToString() const {
if (max_stream_duration.has_value()) {
contents.push_back(max_stream_duration->ToString());
}
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}
//
// XdsApi::Route
//
std::string XdsApi::Route::ToString() const {
std::vector<std::string> contents;
contents.push_back(matchers.ToString());
auto* route_action = absl::get_if<XdsApi::Route::RouteAction>(&action);
if (route_action != nullptr) {
contents.push_back(absl::StrCat("route=", route_action->ToString()));
} else if (absl::holds_alternative<XdsApi::Route::NonForwardingAction>(
action)) {
contents.push_back("non_forwarding_action={}");
} else {
contents.push_back("unknown_action={}");
}
if (!typed_per_filter_config.empty()) {
contents.push_back("typed_per_filter_config={");
for (const auto& p : typed_per_filter_config) {
@ -1528,9 +1563,9 @@ XdsApi::Duration DurationParse(const google_protobuf_Duration* proto_duration) {
grpc_error_handle RetryPolicyParse(
const EncodingContext& context,
const envoy_config_route_v3_RetryPolicy* retry_policy,
absl::optional<XdsApi::Route::RetryPolicy>* retry) {
absl::optional<XdsApi::RetryPolicy>* retry) {
std::vector<grpc_error_handle> errors;
XdsApi::Route::RetryPolicy retry_to_return;
XdsApi::RetryPolicy retry_to_return;
auto retry_on = UpbStringToStdString(
envoy_config_route_v3_RetryPolicy_retry_on(retry_policy));
std::vector<absl::string_view> codes = absl::StrSplit(retry_on, ',');
@ -1610,11 +1645,8 @@ grpc_error_handle RetryPolicyParse(
grpc_error_handle RouteActionParse(const EncodingContext& context,
const envoy_config_route_v3_Route* route_msg,
XdsApi::Route* route, bool* ignore_route) {
if (!envoy_config_route_v3_Route_has_route(route_msg)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No RouteAction found in route.");
}
XdsApi::Route::RouteAction* route,
bool* ignore_route) {
const envoy_config_route_v3_RouteAction* route_action =
envoy_config_route_v3_Route_route(route_msg);
// Get the cluster or weighted_clusters in the RouteAction.
@ -1643,7 +1675,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
for (size_t j = 0; j < clusters_size; ++j) {
const envoy_config_route_v3_WeightedCluster_ClusterWeight*
cluster_weight = clusters[j];
XdsApi::Route::ClusterWeight cluster;
XdsApi::Route::RouteAction::ClusterWeight cluster;
cluster.name = UpbStringToStdString(
envoy_config_route_v3_WeightedCluster_ClusterWeight_name(
cluster_weight));
@ -1712,7 +1744,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
for (size_t i = 0; i < size; ++i) {
const envoy_config_route_v3_RouteAction_HashPolicy* hash_policy =
hash_policies[i];
XdsApi::Route::HashPolicy policy;
XdsApi::Route::RouteAction::HashPolicy policy;
policy.terminal =
envoy_config_route_v3_RouteAction_HashPolicy_terminal(hash_policy);
const envoy_config_route_v3_RouteAction_HashPolicy_Header* header;
@ -1720,7 +1752,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
filter_state;
if ((header = envoy_config_route_v3_RouteAction_HashPolicy_header(
hash_policy)) != nullptr) {
policy.type = XdsApi::Route::HashPolicy::Type::HEADER;
policy.type = XdsApi::Route::RouteAction::HashPolicy::Type::HEADER;
policy.header_name = UpbStringToStdString(
envoy_config_route_v3_RouteAction_HashPolicy_Header_header_name(
header));
@ -1764,7 +1796,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
envoy_config_route_v3_RouteAction_HashPolicy_FilterState_key(
filter_state));
if (key == "io.grpc.channel_id") {
policy.type = XdsApi::Route::HashPolicy::Type::CHANNEL_ID;
policy.type = XdsApi::Route::RouteAction::HashPolicy::Type::CHANNEL_ID;
} else {
gpr_log(GPR_DEBUG,
"RouteAction HashPolicy contains policy specifier "
@ -1783,7 +1815,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
const envoy_config_route_v3_RetryPolicy* retry_policy =
envoy_config_route_v3_RouteAction_retry_policy(route_action);
if (retry_policy != nullptr) {
absl::optional<XdsApi::Route::RetryPolicy> retry;
absl::optional<XdsApi::RetryPolicy> retry;
grpc_error_handle error = RetryPolicyParse(context, retry_policy, &retry);
if (error != GRPC_ERROR_NONE) return error;
route->retry_policy = retry;
@ -1833,7 +1865,7 @@ grpc_error_handle RouteConfigParse(
if (error != GRPC_ERROR_NONE) return error;
}
// Parse retry policy.
absl::optional<XdsApi::Route::RetryPolicy> virtual_host_retry_policy;
absl::optional<XdsApi::RetryPolicy> virtual_host_retry_policy;
const envoy_config_route_v3_RetryPolicy* retry_policy =
envoy_config_route_v3_VirtualHost_retry_policy(virtual_hosts[i]);
if (retry_policy != nullptr) {
@ -1872,11 +1904,21 @@ grpc_error_handle RouteConfigParse(
if (error != GRPC_ERROR_NONE) return error;
error = RouteRuntimeFractionParse(match, &route);
if (error != GRPC_ERROR_NONE) return error;
error = RouteActionParse(context, routes[j], &route, &ignore_route);
if (error != GRPC_ERROR_NONE) return error;
if (ignore_route) continue;
if (route.retry_policy == absl::nullopt && retry_policy != nullptr) {
route.retry_policy = virtual_host_retry_policy;
if (envoy_config_route_v3_Route_has_route(routes[j])) {
route.action.emplace<XdsApi::Route::RouteAction>();
auto& route_action =
absl::get<XdsApi::Route::RouteAction>(route.action);
error =
RouteActionParse(context, routes[j], &route_action, &ignore_route);
if (error != GRPC_ERROR_NONE) return error;
if (ignore_route) continue;
if (route_action.retry_policy == absl::nullopt &&
retry_policy != nullptr) {
route_action.retry_policy = virtual_host_retry_policy;
}
} else if (envoy_config_route_v3_Route_has_non_forwarding_action(
routes[j])) {
route.action.emplace<XdsApi::Route::NonForwardingAction>();
}
if (context.use_v3) {
grpc_error_handle error = ParseTypedPerFilterConfig<
@ -2233,32 +2275,47 @@ grpc_error_handle HttpConnectionManagerParse(
absl::StrFormat("Filter %s is not supported on %s", filter_type,
is_client ? "clients" : "servers"));
}
if (i < num_filters - 1) {
absl::StatusOr<XdsHttpFilterImpl::FilterConfig> filter_config =
filter_impl->GenerateFilterConfig(google_protobuf_Any_value(any),
context.arena);
if (!filter_config.ok()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"filter config for type ", filter_type,
" failed to parse: ", filter_config.status().ToString()));
}
http_connection_manager->http_filters.emplace_back(
XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter{
std::string(name), std::move(*filter_config)});
}
if (http_connection_manager->http_filters.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Expected at least one HTTP filter");
}
// Make sure that the last filter is terminal and non-last filters are
// non-terminal. Note that this check is being performed in a separate loop
// to take care of the case where there are two terminal filters in the list
// out of which only one gets added in the final list.
for (const auto& http_filter : http_connection_manager->http_filters) {
const XdsHttpFilterImpl* filter_impl =
XdsHttpFilterRegistry::GetFilterForType(
http_filter.config.config_proto_type_name);
if (&http_filter != &http_connection_manager->http_filters.back()) {
// Filters before the last filter must not be terminal.
if (filter_impl->IsTerminalFilter()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("terminal filter for config type ", filter_type,
absl::StrCat("terminal filter for config type ",
http_filter.config.config_proto_type_name,
" must be the last filter in the chain"));
}
} else {
// The last filter must be terminal.
if (!filter_impl->IsTerminalFilter()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("non-terminal filter for config type ", filter_type,
absl::StrCat("non-terminal filter for config type ",
http_filter.config.config_proto_type_name,
" is the last filter in the chain"));
}
}
absl::StatusOr<XdsHttpFilterImpl::FilterConfig> filter_config =
filter_impl->GenerateFilterConfig(google_protobuf_Any_value(any),
context.arena);
if (!filter_config.ok()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"filter config for type ", filter_type,
" failed to parse: ", filter_config.status().ToString()));
}
http_connection_manager->http_filters.emplace_back(
XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter{
std::string(name), std::move(*filter_config)});
}
} else {
// If using a v2 config, we just hard-code a list containing only the
@ -2269,7 +2326,10 @@ grpc_error_handle HttpConnectionManagerParse(
XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter{
"router", {kXdsHttpRouterFilterConfigName, Json()}});
}
if (is_client) {
// Guarding parsing of RouteConfig on the server side with the environmental
// variable since that's the first feature on the server side that will be
// using this.
if (is_client || XdsRbacEnabled()) {
// Found inlined route_config. Parse it to find the cluster_name.
if (envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_has_route_config(
http_connection_manager_proto)) {
@ -2504,6 +2564,8 @@ grpc_error_handle FilterChainParse(
filter_chain_match, &filter_chain->filter_chain_match);
if (error != GRPC_ERROR_NONE) errors.push_back(error);
}
filter_chain->filter_chain_data =
std::make_shared<XdsApi::LdsUpdate::FilterChainData>();
// Parse the filters list. Currently we only support HttpConnectionManager.
size_t size = 0;
auto* filters =
@ -2539,8 +2601,6 @@ grpc_error_handle FilterChainParse(
"Could not parse HttpConnectionManager config from filter "
"typed_config"));
} else {
filter_chain->filter_chain_data =
std::make_shared<XdsApi::LdsUpdate::FilterChainData>();
grpc_error_handle error = HttpConnectionManagerParse(
false /* is_client */, context, http_connection_manager, is_v2,
&filter_chain->filter_chain_data->http_connection_manager);

@ -27,6 +27,7 @@
#include "absl/container/inlined_vector.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "envoy/admin/v3/config_dump.upb.h"
#include "re2/re2.h"
#include "upb/def.hpp"
@ -65,6 +66,29 @@ class XdsApi {
using TypedPerFilterConfig =
std::map<std::string, XdsHttpFilterImpl::FilterConfig>;
struct RetryPolicy {
internal::StatusCodeSet retry_on;
uint32_t num_retries;
struct RetryBackOff {
Duration base_interval;
Duration max_interval;
bool operator==(const RetryBackOff& other) const {
return base_interval == other.base_interval &&
max_interval == other.max_interval;
}
std::string ToString() const;
};
RetryBackOff retry_back_off;
bool operator==(const RetryPolicy& other) const {
return (retry_on == other.retry_on && num_retries == other.num_retries &&
retry_back_off == other.retry_back_off);
}
std::string ToString() const;
};
// TODO(donnadionne): When we can use absl::variant<>, consider using that
// for: PathMatcher, HeaderMatcher, cluster_name and weighted_clusters
struct Route {
@ -82,85 +106,83 @@ class XdsApi {
std::string ToString() const;
};
struct HashPolicy {
enum Type { HEADER, CHANNEL_ID };
Type type;
bool terminal = false;
// Fields used for type HEADER.
std::string header_name;
std::unique_ptr<RE2> regex = nullptr;
std::string regex_substitution;
Matchers matchers;
struct UnknownAction {
bool operator==(const UnknownAction& /* other */) const { return true; }
};
HashPolicy() {}
struct RouteAction {
struct HashPolicy {
enum Type { HEADER, CHANNEL_ID };
Type type;
bool terminal = false;
// Fields used for type HEADER.
std::string header_name;
std::unique_ptr<RE2> regex = nullptr;
std::string regex_substitution;
// Copyable.
HashPolicy(const HashPolicy& other);
HashPolicy& operator=(const HashPolicy& other);
HashPolicy() {}
// Moveable.
HashPolicy(HashPolicy&& other) noexcept;
HashPolicy& operator=(HashPolicy&& other) noexcept;
// Copyable.
HashPolicy(const HashPolicy& other);
HashPolicy& operator=(const HashPolicy& other);
bool operator==(const HashPolicy& other) const;
std::string ToString() const;
};
Matchers matchers;
std::vector<HashPolicy> hash_policies;
// Moveable.
HashPolicy(HashPolicy&& other) noexcept;
HashPolicy& operator=(HashPolicy&& other) noexcept;
struct RetryPolicy {
internal::StatusCodeSet retry_on;
uint32_t num_retries;
bool operator==(const HashPolicy& other) const;
std::string ToString() const;
};
struct RetryBackOff {
Duration base_interval;
Duration max_interval;
struct ClusterWeight {
std::string name;
uint32_t weight;
TypedPerFilterConfig typed_per_filter_config;
bool operator==(const RetryBackOff& other) const {
return base_interval == other.base_interval &&
max_interval == other.max_interval;
bool operator==(const ClusterWeight& other) const {
return name == other.name && weight == other.weight &&
typed_per_filter_config == other.typed_per_filter_config;
}
std::string ToString() const;
};
RetryBackOff retry_back_off;
bool operator==(const RetryPolicy& other) const {
return (retry_on == other.retry_on &&
num_retries == other.num_retries &&
retry_back_off == other.retry_back_off);
std::vector<HashPolicy> hash_policies;
absl::optional<RetryPolicy> retry_policy;
// Action for this route.
// TODO(roth): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of the two fields can be set.
std::string cluster_name;
std::vector<ClusterWeight> weighted_clusters;
// Storing the timeout duration from route action:
// RouteAction.max_stream_duration.grpc_timeout_header_max or
// RouteAction.max_stream_duration.max_stream_duration if the former is
// not set.
absl::optional<Duration> max_stream_duration;
bool operator==(const RouteAction& other) const {
return hash_policies == other.hash_policies &&
retry_policy == other.retry_policy &&
cluster_name == other.cluster_name &&
weighted_clusters == other.weighted_clusters &&
max_stream_duration == other.max_stream_duration;
}
std::string ToString() const;
};
absl::optional<RetryPolicy> retry_policy;
// Action for this route.
// TODO(roth): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of the two fields can be set.
std::string cluster_name;
struct ClusterWeight {
std::string name;
uint32_t weight;
TypedPerFilterConfig typed_per_filter_config;
bool operator==(const ClusterWeight& other) const {
return name == other.name && weight == other.weight &&
typed_per_filter_config == other.typed_per_filter_config;
struct NonForwardingAction {
bool operator==(const NonForwardingAction& /* other */) const {
return true;
}
std::string ToString() const;
};
std::vector<ClusterWeight> weighted_clusters;
// Storing the timeout duration from route action:
// RouteAction.max_stream_duration.grpc_timeout_header_max or
// RouteAction.max_stream_duration.max_stream_duration if the former is
// not set.
absl::optional<Duration> max_stream_duration;
absl::variant<UnknownAction, RouteAction, NonForwardingAction> action;
TypedPerFilterConfig typed_per_filter_config;
bool operator==(const Route& other) const {
return matchers == other.matchers && cluster_name == other.cluster_name &&
retry_policy == other.retry_policy &&
weighted_clusters == other.weighted_clusters &&
max_stream_duration == other.max_stream_duration &&
return matchers == other.matchers && action == other.action &&
typed_per_filter_config == other.typed_per_filter_config;
}
std::string ToString() const;

@ -103,12 +103,21 @@ message Route {
// Route matching parameters.
RouteMatch match = 1;
message NonForwardingAction {
}
oneof action {
// Route request to some upstream cluster.
RouteAction route = 2;
// Return a redirect.
RedirectAction redirect = 3;
// An action used when the route will generate a response directly,
// without forwarding to an upstream host. This will be used in non-proxy
// xDS clients like the gRPC server. It could also be used in the future
// in Envoy for a filter that directly generates responses for requests.
NonForwardingAction non_forwarding_action = 18;
}
// The typed_per_filter_config field can be used to provide route-specific

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save