From 281228407fe7b4ace33b4da798a89da37b3229c1 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 6 Dec 2022 14:52:02 -0800 Subject: [PATCH] xDS cluster: use absl::variant for cluster type (#31820) * xDS cluster: use absl::variant for cluster type * fix xds_cluster_resource_type test --- src/core/BUILD | 1 + .../client_channel/lb_policy/xds/cds.cc | 37 ++++--- src/core/ext/xds/xds_cluster.cc | 98 ++++++++++--------- src/core/ext/xds/xds_cluster.h | 58 ++++++----- .../xds/xds_cluster_resource_type_test.cc | 26 +++-- 5 files changed, 123 insertions(+), 97 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 51ca51c268c..51fa539a5ea 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3858,6 +3858,7 @@ grpc_cc_library( "lb_policy", "lb_policy_factory", "lb_policy_registry", + "match", "pollset_set", "subchannel_interface", "time", diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index a516ef8d20a..de706db5451 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -46,6 +46,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" @@ -389,11 +390,11 @@ absl::StatusOr CdsLb::GenerateDiscoveryMechanismForCluster( // Don't have the update we need yet. if (!state.update.has_value()) return false; // For AGGREGATE clusters, recursively expand to child clusters. - if (state.update->cluster_type == - XdsClusterResource::ClusterType::AGGREGATE) { + auto* aggregate = + absl::get_if(&state.update->type); + if (aggregate != nullptr) { bool missing_cluster = false; - for (const std::string& child_name : - state.update->prioritized_cluster_names) { + for (const std::string& child_name : aggregate->prioritized_cluster_names) { auto result = GenerateDiscoveryMechanismForCluster( child_name, depth + 1, discovery_mechanisms, clusters_added); if (!result.ok()) return result; @@ -444,21 +445,19 @@ absl::StatusOr CdsLb::GenerateDiscoveryMechanismForCluster( } mechanism["outlierDetection"] = std::move(outlier_detection); } - switch (state.update->cluster_type) { - case XdsClusterResource::ClusterType::EDS: - mechanism["type"] = "EDS"; - if (!state.update->eds_service_name.empty()) { - mechanism["edsServiceName"] = state.update->eds_service_name; - } - break; - case XdsClusterResource::ClusterType::LOGICAL_DNS: - mechanism["type"] = "LOGICAL_DNS"; - mechanism["dnsHostname"] = state.update->dns_hostname; - break; - default: - GPR_ASSERT(0); - break; - } + Match( + state.update->type, + [&](const XdsClusterResource::Eds& eds) { + mechanism["type"] = "EDS"; + if (!eds.eds_service_name.empty()) { + mechanism["edsServiceName"] = eds.eds_service_name; + } + }, + [&](const XdsClusterResource::LogicalDns& logical_dns) { + mechanism["type"] = "LOGICAL_DNS"; + mechanism["dnsHostname"] = logical_dns.hostname; + }, + [&](const XdsClusterResource::Aggregate&) { GPR_ASSERT(0); }); if (state.update->lrs_load_reporting_server.has_value()) { mechanism["lrsLoadReportingServer"] = state.update->lrs_load_reporting_server->ToJson(); diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc index 554abb363e8..52f809d3524 100644 --- a/src/core/ext/xds/xds_cluster.cc +++ b/src/core/ext/xds/xds_cluster.cc @@ -57,6 +57,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" @@ -89,33 +90,35 @@ bool XdsHostOverrideEnabled() { std::string XdsClusterResource::ToString() const { std::vector contents; - switch (cluster_type) { - case EDS: - contents.push_back("cluster_type=EDS"); - if (!eds_service_name.empty()) { - contents.push_back(absl::StrCat("eds_service_name=", eds_service_name)); - } - break; - case LOGICAL_DNS: - contents.push_back("cluster_type=LOGICAL_DNS"); - contents.push_back(absl::StrCat("dns_hostname=", dns_hostname)); - break; - case AGGREGATE: - contents.push_back("cluster_type=AGGREGATE"); - contents.push_back( - absl::StrCat("prioritized_cluster_names=[", - absl::StrJoin(prioritized_cluster_names, ", "), "]")); + Match( + type, + [&](const Eds& eds) { + contents.push_back("type=EDS"); + if (!eds.eds_service_name.empty()) { + contents.push_back( + absl::StrCat("eds_service_name=", eds.eds_service_name)); + } + }, + [&](const LogicalDns& logical_dns) { + contents.push_back("type=LOGICAL_DNS"); + contents.push_back(absl::StrCat("dns_hostname=", logical_dns.hostname)); + }, + [&](const Aggregate& aggregate) { + contents.push_back("type=AGGREGATE"); + contents.push_back(absl::StrCat( + "prioritized_cluster_names=[", + absl::StrJoin(aggregate.prioritized_cluster_names, ", "), "]")); + }); + contents.push_back( + absl::StrCat("lb_policy_config=", Json{lb_policy_config}.Dump())); + if (lrs_load_reporting_server.has_value()) { + contents.push_back(absl::StrCat("lrs_load_reporting_server_name=", + lrs_load_reporting_server->server_uri())); } if (!common_tls_context.Empty()) { contents.push_back( absl::StrCat("common_tls_context=", common_tls_context.ToString())); } - if (lrs_load_reporting_server.has_value()) { - contents.push_back(absl::StrCat("lrs_load_reporting_server_name=", - lrs_load_reporting_server->server_uri())); - } - contents.push_back( - absl::StrCat("lb_policy_config=", Json{lb_policy_config}.Dump())); contents.push_back( absl::StrCat("max_concurrent_requests=", max_concurrent_requests)); return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); @@ -172,8 +175,9 @@ CommonTlsContext UpstreamTlsContextParse( return common_tls_context; } -void EdsConfigParse(const envoy_config_cluster_v3_Cluster* cluster, - XdsClusterResource* cds_update, ValidationErrors* errors) { +XdsClusterResource::Eds EdsConfigParse( + const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) { + XdsClusterResource::Eds eds; ValidationErrors::ScopedField field(errors, ".eds_cluster_config"); const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config = envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); @@ -196,20 +200,22 @@ void EdsConfigParse(const envoy_config_cluster_v3_Cluster* cluster, envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( eds_cluster_config); if (service_name.size != 0) { - cds_update->eds_service_name = UpbStringToStdString(service_name); + eds.eds_service_name = UpbStringToStdString(service_name); } } } + return eds; } -void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster, - XdsClusterResource* cds_update, ValidationErrors* errors) { +XdsClusterResource::LogicalDns LogicalDnsParse( + const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) { + XdsClusterResource::LogicalDns logical_dns; ValidationErrors::ScopedField field(errors, ".load_assignment"); const auto* load_assignment = envoy_config_cluster_v3_Cluster_load_assignment(cluster); if (load_assignment == nullptr) { errors->AddError("field not present for LOGICAL_DNS cluster"); - return; + return logical_dns; } ValidationErrors::ScopedField field2(errors, ".endpoints"); size_t num_localities; @@ -220,7 +226,7 @@ void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster, errors->AddError(absl::StrCat( "must contain exactly one locality for LOGICAL_DNS cluster, found ", num_localities)); - return; + return logical_dns; } ValidationErrors::ScopedField field3(errors, "[0].lb_endpoints"); size_t num_endpoints; @@ -231,27 +237,27 @@ void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster, errors->AddError(absl::StrCat( "must contain exactly one endpoint for LOGICAL_DNS cluster, found ", num_endpoints)); - return; + return logical_dns; } ValidationErrors::ScopedField field4(errors, "[0].endpoint"); const auto* endpoint = envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]); if (endpoint == nullptr) { errors->AddError("field not present"); - return; + return logical_dns; } ValidationErrors::ScopedField field5(errors, ".address"); const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint); if (address == nullptr) { errors->AddError("field not present"); - return; + return logical_dns; } ValidationErrors::ScopedField field6(errors, ".socket_address"); const auto* socket_address = envoy_config_core_v3_Address_socket_address(address); if (socket_address == nullptr) { errors->AddError("field not present"); - return; + return logical_dns; } if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size != 0) { @@ -269,30 +275,32 @@ void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors::ScopedField field(errors, ".port_value"); errors->AddError("field not present"); } - cds_update->dns_hostname = JoinHostPort( + logical_dns.hostname = JoinHostPort( address_str, envoy_config_core_v3_SocketAddress_port_value(socket_address)); + return logical_dns; } -void AggregateClusterParse(const XdsResourceType::DecodeContext& context, - absl::string_view serialized_config, - XdsClusterResource* cds_update, - ValidationErrors* errors) { +XdsClusterResource::Aggregate AggregateClusterParse( + const XdsResourceType::DecodeContext& context, + absl::string_view serialized_config, ValidationErrors* errors) { + XdsClusterResource::Aggregate aggregate; const auto* aggregate_cluster_config = envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse( serialized_config.data(), serialized_config.size(), context.arena); if (aggregate_cluster_config == nullptr) { errors->AddError("can't parse aggregate cluster config"); - return; + return aggregate; } size_t size; const upb_StringView* clusters = envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters( aggregate_cluster_config, &size); for (size_t i = 0; i < size; ++i) { - cds_update->prioritized_cluster_names.emplace_back( + aggregate.prioritized_cluster_names.emplace_back( UpbStringToStdString(clusters[i])); } + return aggregate; } void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context, @@ -402,12 +410,10 @@ absl::StatusOr CdsResourceParse( // Check the cluster discovery type. if (envoy_config_cluster_v3_Cluster_type(cluster) == envoy_config_cluster_v3_Cluster_EDS) { - cds_update.cluster_type = XdsClusterResource::ClusterType::EDS; - EdsConfigParse(cluster, &cds_update, &errors); + cds_update.type = EdsConfigParse(cluster, &errors); } else if (envoy_config_cluster_v3_Cluster_type(cluster) == envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { - cds_update.cluster_type = XdsClusterResource::ClusterType::LOGICAL_DNS; - LogicalDnsParse(cluster, &cds_update, &errors); + cds_update.type = LogicalDnsParse(cluster, &errors); } else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { ValidationErrors::ScopedField field(&errors, ".cluster_type"); const auto* custom_cluster_type = @@ -428,14 +434,14 @@ absl::StatusOr CdsResourceParse( errors.AddError( absl::StrCat("unknown cluster_type extension: ", type_url)); } else { - cds_update.cluster_type = XdsClusterResource::ClusterType::AGGREGATE; // Retrieve aggregate clusters. ValidationErrors::ScopedField field( &errors, ".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]"); absl::string_view serialized_config = UpbStringToAbsl(google_protobuf_Any_value(typed_config)); - AggregateClusterParse(context, serialized_config, &cds_update, &errors); + cds_update.type = + AggregateClusterParse(context, serialized_config, &errors); } } } else { diff --git a/src/core/ext/xds/xds_cluster.h b/src/core/ext/xds/xds_cluster.h index 8c29d8e3a75..a8812c1a28d 100644 --- a/src/core/ext/xds/xds_cluster.h +++ b/src/core/ext/xds/xds_cluster.h @@ -48,28 +48,46 @@ bool XdsCustomLbPolicyEnabled(); bool XdsHostOverrideEnabled(); struct XdsClusterResource : public XdsResourceType::ResourceData { - enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE }; - ClusterType cluster_type; - // For cluster type EDS. - // The name to use in the EDS request. - // If empty, the cluster name will be used. - std::string eds_service_name; - // For cluster type LOGICAL_DNS. - // The hostname to lookup in DNS. - std::string dns_hostname; - // For cluster type AGGREGATE. - // The prioritized list of cluster names. - std::vector prioritized_cluster_names; + struct Eds { + // If empty, defaults to the cluster name. + std::string eds_service_name; - // Tls Context used by clients - CommonTlsContext common_tls_context; + bool operator==(const Eds& other) const { + return eds_service_name == other.eds_service_name; + } + }; + + struct LogicalDns { + // The hostname to lookup in DNS. + std::string hostname; + + bool operator==(const LogicalDns& other) const { + return hostname == other.hostname; + } + }; + + struct Aggregate { + // Prioritized list of cluster names. + std::vector prioritized_cluster_names; + + bool operator==(const Aggregate& other) const { + return prioritized_cluster_names == other.prioritized_cluster_names; + } + }; + + absl::variant type; + + // The LB policy to use for locality and endpoint picking. + Json::Array lb_policy_config; + + // Note: Remaining fields are not used for aggregate clusters. // The LRS server to use for load reporting. // If not set, load reporting will be disabled. absl::optional lrs_load_reporting_server; - // The LB policy to use for locality and endpoint picking. - Json::Array lb_policy_config; + // Tls Context used by clients + CommonTlsContext common_tls_context; // Maximum number of outstanding requests can be made to the upstream // cluster. @@ -78,13 +96,9 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { absl::optional outlier_detection; bool operator==(const XdsClusterResource& other) const { - return cluster_type == other.cluster_type && - eds_service_name == other.eds_service_name && - dns_hostname == other.dns_hostname && - prioritized_cluster_names == other.prioritized_cluster_names && - common_tls_context == other.common_tls_context && + return type == other.type && lb_policy_config == other.lb_policy_config && lrs_load_reporting_server == other.lrs_load_reporting_server && - lb_policy_config == other.lb_policy_config && + common_tls_context == other.common_tls_context && max_concurrent_requests == other.max_concurrent_requests && outlier_detection == other.outlier_detection; } diff --git a/test/core/xds/xds_cluster_resource_type_test.cc b/test/core/xds/xds_cluster_resource_type_test.cc index ac8fba5c0e6..8aba85794c4 100644 --- a/test/core/xds/xds_cluster_resource_type_test.cc +++ b/test/core/xds/xds_cluster_resource_type_test.cc @@ -153,8 +153,9 @@ TEST_F(XdsClusterTest, MinimumValidConfig) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.EDS); - EXPECT_EQ(resource.eds_service_name, ""); + auto* eds = absl::get_if(&resource.type); + ASSERT_NE(eds, nullptr); + EXPECT_EQ(eds->eds_service_name, ""); // Check defaults. EXPECT_EQ(Json{resource.lb_policy_config}.Dump(), "[{\"xds_wrr_locality_experimental\":{\"childPolicy\":" @@ -184,8 +185,9 @@ TEST_F(ClusterTypeTest, EdsConfigSourceAds) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.EDS); - EXPECT_EQ(resource.eds_service_name, ""); + auto* eds = absl::get_if(&resource.type); + ASSERT_NE(eds, nullptr); + EXPECT_EQ(eds->eds_service_name, ""); } TEST_F(ClusterTypeTest, EdsServiceName) { @@ -204,8 +206,9 @@ TEST_F(ClusterTypeTest, EdsServiceName) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.EDS); - EXPECT_EQ(resource.eds_service_name, "bar"); + auto* eds = absl::get_if(&resource.type); + ASSERT_NE(eds, nullptr); + EXPECT_EQ(eds->eds_service_name, "bar"); } TEST_F(ClusterTypeTest, DiscoveryTypeNotPresent) { @@ -307,8 +310,10 @@ TEST_F(ClusterTypeTest, LogicalDnsValid) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.LOGICAL_DNS); - EXPECT_EQ(resource.dns_hostname, "server.example.com:443"); + auto* logical_dns = + absl::get_if(&resource.type); + ASSERT_NE(logical_dns, nullptr); + EXPECT_EQ(logical_dns->hostname, "server.example.com:443"); } TEST_F(ClusterTypeTest, LogicalDnsMissingLoadAssignment) { @@ -539,8 +544,9 @@ TEST_F(ClusterTypeTest, AggregateClusterValid) { ASSERT_TRUE(decode_result.name.has_value()); EXPECT_EQ(*decode_result.name, "foo"); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.cluster_type, resource.AGGREGATE); - EXPECT_THAT(resource.prioritized_cluster_names, + auto* aggregate = absl::get_if(&resource.type); + ASSERT_NE(aggregate, nullptr); + EXPECT_THAT(aggregate->prioritized_cluster_names, ::testing::ElementsAre("bar", "baz", "quux")); }