Adding is_optional case to RLS (#29259)

* Adding is_optional case to RLS

* integrated with the updated envoy data-plane

* Fixing an old bug and adding test

* Use the same plugin map for ignore

* Remove ignore set

* Fixed another test.

* addressing code review comments.

* clean up!
pull/29271/head^2
donnadionne 3 years ago committed by GitHub
parent f40fe16aab
commit 2fd632a4c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      src/core/ext/xds/xds_cluster_specifier_plugin.cc
  2. 10
      src/core/ext/xds/xds_cluster_specifier_plugin.h
  3. 47
      src/core/ext/xds/xds_route_config.cc
  4. 7
      src/proto/grpc/testing/xds/v3/route.proto
  5. 105
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -39,7 +39,7 @@ void XdsRouteLookupClusterSpecifierPlugin::PopulateSymtab(
grpc_lookup_v1_RouteLookupConfig_getmsgdef(symtab);
}
absl::StatusOr<Json>
absl::StatusOr<std::string>
XdsRouteLookupClusterSpecifierPlugin::GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const {
@ -83,7 +83,23 @@ XdsRouteLookupClusterSpecifierPlugin::GenerateLoadBalancingPolicyConfig(
policy["rls_experimental"] = std::move(rls_policy);
Json::Array policies;
policies.emplace_back(std::move(policy));
return Json(policies);
Json lb_policy_config(std::move(policies));
grpc_error_handle parse_error = GRPC_ERROR_NONE;
// TODO(roth): If/when we ever add a second plugin, refactor this code
// somehow such that we automatically validate the resulting config against
// the gRPC LB policy registry instead of requiring each plugin to do that
// itself.
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(lb_policy_config,
&parse_error);
if (parse_error != GRPC_ERROR_NONE) {
absl::Status status = absl::InvalidArgumentError(absl::StrCat(
kXdsRouteLookupClusterSpecifierPluginConfigName,
" ClusterSpecifierPlugin returned invalid LB policy config: ",
grpc_error_std_string(parse_error)));
GRPC_ERROR_UNREF(parse_error);
return status;
}
return lb_policy_config.Dump();
}
namespace {
@ -95,6 +111,14 @@ PluginRegistryMap* g_plugin_registry = nullptr;
} // namespace
const XdsClusterSpecifierPluginImpl*
XdsClusterSpecifierPluginRegistry::GetPluginForType(
absl::string_view config_proto_type_name) {
auto it = g_plugin_registry->find(config_proto_type_name);
if (it == g_plugin_registry->end()) return nullptr;
return it->second.get();
}
void XdsClusterSpecifierPluginRegistry::PopulateSymtab(upb_DefPool* symtab) {
for (const auto& p : *g_plugin_registry) {
p.second->PopulateSymtab(symtab);
@ -107,32 +131,6 @@ void XdsClusterSpecifierPluginRegistry::RegisterPlugin(
(*g_plugin_registry)[config_proto_type_name] = std::move(plugin);
}
absl::StatusOr<std::string>
XdsClusterSpecifierPluginRegistry::GenerateLoadBalancingPolicyConfig(
absl::string_view proto_type_name, upb_StringView serialized_plugin_config,
upb_Arena* arena, upb_DefPool* symtab) {
auto it = g_plugin_registry->find(proto_type_name);
if (it == g_plugin_registry->end()) {
return absl::InvalidArgumentError(
"Unable to locate the cluster specifier plugin in the registry");
}
auto lb_policy_config = it->second->GenerateLoadBalancingPolicyConfig(
serialized_plugin_config, arena, symtab);
if (!lb_policy_config.ok()) return lb_policy_config.status();
grpc_error_handle parse_error = GRPC_ERROR_NONE;
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(*lb_policy_config,
&parse_error);
if (parse_error != GRPC_ERROR_NONE) {
absl::Status status = absl::InvalidArgumentError(absl::StrCat(
proto_type_name,
" ClusterSpecifierPlugin returned invalid LB policy config: ",
grpc_error_std_string(parse_error)));
GRPC_ERROR_UNREF(parse_error);
return status;
}
return lb_policy_config->Dump();
}
void XdsClusterSpecifierPluginRegistry::Init() {
g_plugin_registry = new PluginRegistryMap;
RegisterPlugin(absl::make_unique<XdsRouteLookupClusterSpecifierPlugin>(),

@ -44,7 +44,7 @@ class XdsClusterSpecifierPluginImpl {
virtual void PopulateSymtab(upb_DefPool* symtab) const = 0;
// Returns the LB policy config in JSON form.
virtual absl::StatusOr<Json> GenerateLoadBalancingPolicyConfig(
virtual absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const = 0;
};
@ -53,7 +53,7 @@ class XdsRouteLookupClusterSpecifierPlugin
: public XdsClusterSpecifierPluginImpl {
void PopulateSymtab(upb_DefPool* symtab) const override;
absl::StatusOr<Json> GenerateLoadBalancingPolicyConfig(
absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab) const override;
};
@ -66,10 +66,8 @@ class XdsClusterSpecifierPluginRegistry {
static void PopulateSymtab(upb_DefPool* symtab);
static absl::StatusOr<std::string> GenerateLoadBalancingPolicyConfig(
absl::string_view proto_type_name,
upb_StringView serialized_plugin_config, upb_Arena* arena,
upb_DefPool* symtab);
static const XdsClusterSpecifierPluginImpl* GetPluginForType(
absl::string_view config_proto_type_name);
// Global init and shutdown.
static void Init();

@ -325,26 +325,43 @@ grpc_error_handle ClusterSpecifierPluginParse(
cluster_specifier_plugin[i]);
std::string name = UpbStringToStdString(
envoy_config_core_v3_TypedExtensionConfig_name(extension));
if (rds_update->cluster_specifier_plugin_map.find(name) !=
rds_update->cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"Duplicated definition of cluster_specifier_plugin ", name));
}
const google_protobuf_Any* any =
envoy_config_core_v3_TypedExtensionConfig_typed_config(extension);
if (any == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"could not obtrain TypedExtensionConfig for plugin config");
"Could not obtrain TypedExtensionConfig for plugin config.");
}
absl::string_view plugin_type;
grpc_error_handle error =
ExtractExtensionTypeName(context, any, &plugin_type);
if (error != GRPC_ERROR_NONE) return error;
// Find the plugin and generate the policy.
auto lb_policy_config =
XdsClusterSpecifierPluginRegistry::GenerateLoadBalancingPolicyConfig(
plugin_type, google_protobuf_Any_value(any), context.arena,
context.symtab);
if (!lb_policy_config.ok()) {
return absl_status_to_grpc_error(lb_policy_config.status());
bool is_optional = envoy_config_route_v3_ClusterSpecifierPlugin_is_optional(
cluster_specifier_plugin[i]);
const XdsClusterSpecifierPluginImpl* cluster_specifier_plugin_impl =
XdsClusterSpecifierPluginRegistry::GetPluginForType(plugin_type);
std::string lb_policy_config;
if (cluster_specifier_plugin_impl == nullptr) {
if (!is_optional) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("Unknown ClusterSpecifierPlugin type ", plugin_type));
}
// Optional plugin, leave lb_policy_config empty.
} else {
auto config =
cluster_specifier_plugin_impl->GenerateLoadBalancingPolicyConfig(
google_protobuf_Any_value(any), context.arena, context.symtab);
if (!config.ok()) {
return absl_status_to_grpc_error(config.status());
}
lb_policy_config = std::move(*config);
}
rds_update->cluster_specifier_plugin_map[std::move(name)] =
std::move(lb_policy_config.value());
std::move(lb_policy_config);
}
return GRPC_ERROR_NONE;
}
@ -773,12 +790,14 @@ grpc_error_handle RouteActionParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains empty cluster specifier plugin name.");
}
if (cluster_specifier_plugin_map.find(plugin_name) ==
cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains cluster specifier plugin name not "
"configured.");
auto it = cluster_specifier_plugin_map.find(plugin_name);
if (it == cluster_specifier_plugin_map.end()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("RouteAction cluster contains cluster specifier plugin "
"name not configured: ",
plugin_name));
}
if (it->second.empty()) *ignore_route = true;
route->action.emplace<XdsRouteConfigResource::Route::RouteAction::
kClusterSpecifierPluginIndex>(
std::move(plugin_name));

@ -447,6 +447,13 @@ message QueryParameterMatcher {
message ClusterSpecifierPlugin {
// The name of the plugin and its opaque configuration.
core.v3.TypedExtensionConfig extension = 1;
// If is_optional is not set and the plugin defined by this message is not
// a supported type, the containing resource is NACKed. If is_optional is
// set, the resource would not be NACKed for this reason. In this case,
// routes referencing this plugin's name would not be treated as an illegal
// configuration, but would result in a failure if the route is selected.
bool is_optional = 2;
}
// [#protodoc-title: HTTP route configuration]

@ -7745,24 +7745,6 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPlugin) {
TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUndefinedSpecifier) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_RLS_LB", "true");
const char* kNewClusterName = "new_cluster";
const char* kNewEdsServiceName = "new_eds_service_name";
// Populate new EDS resources.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
EdsResourceArgs args1({
{"locality0", CreateEndpointsForBackends(1, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsServiceName));
// Populate new CDS resources.
Cluster new_cluster = default_cluster_;
new_cluster.set_name(kNewClusterName);
new_cluster.mutable_eds_cluster_config()->set_service_name(
kNewEdsServiceName);
balancer_->ads_service()->SetCdsResource(new_cluster);
RouteConfiguration new_route_config = default_route_config_;
auto* default_route =
new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
@ -7774,16 +7756,51 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUndefinedSpecifier) {
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr("RouteAction cluster contains cluster "
"specifier plugin name not configured."));
"specifier plugin name not configured:"));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}
TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksDuplicateSpecifier) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_RLS_LB", "true");
// Prepare the RLSLookupConfig: change route configurations to use cluster
// specifier plugin.
RouteLookupConfig route_lookup_config;
auto* key_builder = route_lookup_config.add_grpc_keybuilders();
auto* name = key_builder->add_names();
name->set_service(kRlsServiceValue);
name->set_method(kRlsMethodValue);
auto* header = key_builder->add_headers();
header->set_key(kRlsTestKey);
header->add_names(kRlsTestKey1);
route_lookup_config.set_lookup_service(
absl::StrCat("localhost:", rls_server_->port()));
route_lookup_config.set_cache_size_bytes(5000);
RouteLookupClusterSpecifier rls;
*rls.mutable_route_lookup_config() = std::move(route_lookup_config);
RouteConfiguration new_route_config = default_route_config_;
auto* plugin = new_route_config.add_cluster_specifier_plugins();
plugin->mutable_extension()->set_name(kRlsClusterSpecifierPluginInstanceName);
plugin->mutable_extension()->mutable_typed_config()->PackFrom(rls);
auto* duplicate_plugin = new_route_config.add_cluster_specifier_plugins();
duplicate_plugin->mutable_extension()->set_name(
kRlsClusterSpecifierPluginInstanceName);
duplicate_plugin->mutable_extension()->mutable_typed_config()->PackFrom(rls);
auto* default_route =
new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
default_route->mutable_route()->set_cluster_specifier_plugin(
kRlsClusterSpecifierPluginInstanceName);
SetRouteConfiguration(balancer_.get(), new_route_config);
const auto response_state = WaitForRdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr(absl::StrCat(
"Duplicated definition of cluster_specifier_plugin ",
kRlsClusterSpecifierPluginInstanceName)));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}
TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProto) {
// TODO(donnadionne): Doug is working on adding a new is_optional field to
// ClusterSpecifierPlugin in envoyproxy/envoy#20301. Once that goes in, the
// behavior we want in this case is that if is_optional is true, then we
// ignore that plugin and ignore any routes that refer to that plugin.
// However, if is_optional is false, then we want to NACK.
TEST_P(RlsTest,
XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProtoNotOptional) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_RLS_LB", "true");
const char* kNewClusterName = "new_cluster";
const char* kNewEdsServiceName = "new_eds_service_name";
@ -7820,10 +7837,40 @@ TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginNacksUnknownSpecifierProto) {
SetRouteConfiguration(balancer_.get(), new_route_config);
const auto response_state = WaitForRdsNack();
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(
response_state->error_message,
::testing::HasSubstr(
"Unable to locate the cluster specifier plugin in the registry"));
EXPECT_THAT(response_state->error_message,
::testing::HasSubstr("Unknown ClusterSpecifierPlugin type "
"grpc.lookup.v1.RouteLookupConfig"));
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}
TEST_P(RlsTest,
XdsRoutingClusterSpecifierPluginIgnoreUnknownSpecifierProtoOptional) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_RLS_LB", "true");
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Prepare the RLSLookupConfig: change route configurations to use cluster
// specifier plugin.
RouteLookupConfig route_lookup_config;
RouteConfiguration new_route_config = default_route_config_;
auto* plugin = new_route_config.add_cluster_specifier_plugins();
plugin->mutable_extension()->set_name(kRlsClusterSpecifierPluginInstanceName);
// Instead of grpc.lookup.v1.RouteLookupClusterSpecifier, let's say we
// mistakenly packed the inner RouteLookupConfig instead.
plugin->mutable_extension()->mutable_typed_config()->PackFrom(
route_lookup_config);
plugin->set_is_optional(true);
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route->mutable_route()->set_cluster_specifier_plugin(
kRlsClusterSpecifierPluginInstanceName);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultClusterName);
SetRouteConfiguration(balancer_.get(), new_route_config);
// Ensure we ignore the cluster specifier plugin and send traffic according to
// the default route.
WaitForAllBackends(0, 1);
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_XDS_RLS_LB");
}

Loading…
Cancel
Save