From 8f581c8b43b9125887505663c3558eb25919f7a5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 14 Aug 2024 00:18:08 +0000 Subject: [PATCH] [xDS] generalize CDS metadata handling --- src/core/BUILD | 1 + .../load_balancing/xds/xds_cluster_impl.cc | 20 +++- src/core/xds/grpc/xds_cluster.cc | 14 +-- src/core/xds/grpc/xds_cluster.h | 7 +- src/core/xds/grpc/xds_cluster_parser.cc | 65 ++++++++----- src/core/xds/grpc/xds_common_types_parser.cc | 8 +- src/core/xds/grpc/xds_common_types_parser.h | 5 + .../xds/xds_cluster_resource_type_test.cc | 93 ++++++------------- 8 files changed, 107 insertions(+), 106 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index e762447cfbc..a42c305e0f3 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -5415,6 +5415,7 @@ grpc_cc_library( "xds/grpc/xds_cluster.h", ], external_deps = [ + "absl/container:flat_hash_map", "absl/strings", "absl/types:optional", "absl/types:variant", diff --git a/src/core/load_balancing/xds/xds_cluster_impl.cc b/src/core/load_balancing/xds/xds_cluster_impl.cc index 9a06ab8eeb1..877508ba8c5 100644 --- a/src/core/load_balancing/xds/xds_cluster_impl.cc +++ b/src/core/load_balancing/xds/xds_cluster_impl.cc @@ -293,6 +293,8 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { // Current config from the resolver. RefCountedPtr config_; std::shared_ptr cluster_resource_; + RefCountedStringValue service_telemetry_label_; + RefCountedStringValue namespace_telemetry_label_; RefCountedPtr drop_config_; // Current concurrent number of requests. @@ -397,10 +399,9 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb, : call_counter_(xds_cluster_impl_lb->call_counter_), max_concurrent_requests_( xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests), - service_telemetry_label_( - xds_cluster_impl_lb->cluster_resource_->service_telemetry_label), + service_telemetry_label_(xds_cluster_impl_lb->service_telemetry_label_), namespace_telemetry_label_( - xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label), + xds_cluster_impl_lb->namespace_telemetry_label_), drop_config_(xds_cluster_impl_lb->drop_config_), drop_stats_(xds_cluster_impl_lb->drop_stats_), picker_(std::move(picker)) { @@ -648,6 +649,19 @@ absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) { // Update config state, now that we're done comparing old and new fields. config_ = std::move(new_config); cluster_resource_ = new_cluster_config.cluster; + auto it2 = + cluster_resource_->metadata.find("com.google.csm.telemetry_labels"); + if (it2 != cluster_resource_->metadata.end()) { + auto& json_object = it2->second.object(); + auto it3 = json_object.find("service_name"); + if (it3 != json_object.end() && it3->second.type() == Json::Type::kString) { + service_telemetry_label_ = RefCountedStringValue(it3->second.string()); + } + it3 = json_object.find("service_namespace"); + if (it3 != json_object.end() && it3->second.type() == Json::Type::kString) { + namespace_telemetry_label_ = RefCountedStringValue(it3->second.string()); + } + } drop_config_ = endpoint_config->endpoints != nullptr ? endpoint_config->endpoints->drop_config : nullptr; diff --git a/src/core/xds/grpc/xds_cluster.cc b/src/core/xds/grpc/xds_cluster.cc index 300528f5a16..eae5a95d398 100644 --- a/src/core/xds/grpc/xds_cluster.cc +++ b/src/core/xds/grpc/xds_cluster.cc @@ -65,14 +65,14 @@ std::string XdsClusterResource::ToString() const { absl::StrCat("max_concurrent_requests=", max_concurrent_requests)); contents.push_back(absl::StrCat("override_host_statuses=", override_host_statuses.ToString())); - if (!service_telemetry_label.as_string_view().empty()) { - contents.push_back(absl::StrCat("service_name_telemetry_label=", - service_telemetry_label.as_string_view())); - } - if (!namespace_telemetry_label.as_string_view().empty()) { + if (!metadata.empty()) { + std::vector metadata_entries; + for (const auto& p : metadata) { + metadata_entries.push_back( + absl::StrCat(p.first, "=", JsonDump(p.second))); + } contents.push_back( - absl::StrCat("service_namespace_telemetry_label=", - namespace_telemetry_label.as_string_view())); + absl::StrCat("metadata={", absl::StrJoin(metadata_entries, ", "), "}")); } return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); } diff --git a/src/core/xds/grpc/xds_cluster.h b/src/core/xds/grpc/xds_cluster.h index 601593541e5..a0d087d8196 100644 --- a/src/core/xds/grpc/xds_cluster.h +++ b/src/core/xds/grpc/xds_cluster.h @@ -20,6 +20,7 @@ #include #include +#include "absl/container/flat_hash_map.h" #include "absl/types/optional.h" #include "absl/types/variant.h" @@ -86,8 +87,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { XdsHealthStatusSet override_host_statuses; - RefCountedStringValue service_telemetry_label; - RefCountedStringValue namespace_telemetry_label; + absl::flat_hash_map metadata; bool operator==(const XdsClusterResource& other) const { return type == other.type && lb_policy_config == other.lb_policy_config && @@ -97,8 +97,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { max_concurrent_requests == other.max_concurrent_requests && outlier_detection == other.outlier_detection && override_host_statuses == other.override_host_statuses && - service_telemetry_label == other.service_telemetry_label && - namespace_telemetry_label == other.namespace_telemetry_label; + metadata == other.metadata; } std::string ToString() const; diff --git a/src/core/xds/grpc/xds_cluster_parser.cc b/src/core/xds/grpc/xds_cluster_parser.cc index c26c62c458e..a6a465c5c13 100644 --- a/src/core/xds/grpc/xds_cluster_parser.cc +++ b/src/core/xds/grpc/xds_cluster_parser.cc @@ -641,34 +641,49 @@ absl::StatusOr> CdsResourceParse( cds_update->override_host_statuses.Add( XdsHealthStatus(XdsHealthStatus::kHealthy)); } - // Record telemetry labels (if any). + // Parse metadata. const envoy_config_core_v3_Metadata* metadata = envoy_config_cluster_v3_Cluster_metadata(cluster); if (metadata != nullptr) { - google_protobuf_Struct* telemetry_labels_struct; - if (envoy_config_core_v3_Metadata_filter_metadata_get( - metadata, - StdStringToUpbString( - absl::string_view("com.google.csm.telemetry_labels")), - &telemetry_labels_struct)) { - size_t iter = kUpb_Map_Begin; - const google_protobuf_Struct_FieldsEntry* fields_entry; - while ((fields_entry = google_protobuf_Struct_fields_next( - telemetry_labels_struct, &iter)) != nullptr) { - // Adds any entry whose value is a string to telemetry_labels. - const google_protobuf_Value* value = - google_protobuf_Struct_FieldsEntry_value(fields_entry); - if (google_protobuf_Value_has_string_value(value)) { - if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key( - fields_entry)) == "service_name") { - cds_update->service_telemetry_label = RefCountedStringValue( - UpbStringToAbsl(google_protobuf_Value_string_value(value))); - } else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key( - fields_entry)) == "service_namespace") { - cds_update->namespace_telemetry_label = RefCountedStringValue( - UpbStringToAbsl(google_protobuf_Value_string_value(value))); - } - } + // First, try typed_filter_metadata. + size_t iter = kUpb_Map_Begin; + const envoy_config_core_v3_Metadata_TypedFilterMetadataEntry* typed_entry; + while ( + (typed_entry = envoy_config_core_v3_Metadata_typed_filter_metadata_next( + metadata, &iter)) != nullptr) { + absl::string_view key = UpbStringToAbsl( + envoy_config_core_v3_Metadata_TypedFilterMetadataEntry_key( + typed_entry)); + ValidationErrors::ScopedField field( + &errors, absl::StrCat(".metadata.typed_filter_metadata[", key, "]")); + auto extension = ExtractXdsExtension( + context, + envoy_config_core_v3_Metadata_TypedFilterMetadataEntry_value( + typed_entry), + &errors); + if (!extension.has_value()) continue; + // TODO(roth): If we ever need to support another type here, refactor + // this into a separate registry. + if (extension->type == "extensions.filters.http.gcp_authn.v3.Audience") { + // TODO(roth): In a subsequent PR, add parsing here. + } + } + // Then, try filter_metadata. + iter = kUpb_Map_Begin; + const envoy_config_core_v3_Metadata_FilterMetadataEntry* entry; + while ((entry = envoy_config_core_v3_Metadata_filter_metadata_next( + metadata, &iter)) != nullptr) { + absl::string_view key = UpbStringToAbsl( + envoy_config_core_v3_Metadata_FilterMetadataEntry_key(entry)); + auto json = ParseProtobufStructToJson( + context, + envoy_config_core_v3_Metadata_FilterMetadataEntry_value(entry)); + if (!json.ok()) { + ValidationErrors::ScopedField field( + &errors, absl::StrCat(".metadata.filter_metadata[", key, "]")); + errors.AddError(json.status().message()); + } else if (!cds_update->metadata.contains(key)) { + cds_update->metadata[std::move(key)] = std::move(*json); } } } diff --git a/src/core/xds/grpc/xds_common_types_parser.cc b/src/core/xds/grpc/xds_common_types_parser.cc index 3da948fe503..8b617a6143d 100644 --- a/src/core/xds/grpc/xds_common_types_parser.cc +++ b/src/core/xds/grpc/xds_common_types_parser.cc @@ -372,11 +372,9 @@ CommonTlsContext CommonTlsContextParse( } // -// ExtractXdsExtension +// ParseProtobufStructToJson() // -namespace { - absl::StatusOr ParseProtobufStructToJson( const XdsResourceType::DecodeContext& context, const google_protobuf_Struct* resource) { @@ -405,7 +403,9 @@ absl::StatusOr ParseProtobufStructToJson( return std::move(*json); } -} // namespace +// +// ExtractXdsExtension() +// absl::optional ExtractXdsExtension( const XdsResourceType::DecodeContext& context, diff --git a/src/core/xds/grpc/xds_common_types_parser.h b/src/core/xds/grpc/xds_common_types_parser.h index 880e50b7ded..b73a04ec99a 100644 --- a/src/core/xds/grpc/xds_common_types_parser.h +++ b/src/core/xds/grpc/xds_common_types_parser.h @@ -21,6 +21,7 @@ #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h" #include "google/protobuf/any.upb.h" #include "google/protobuf/duration.upb.h" +#include "google/protobuf/struct.upb.h" #include "google/protobuf/wrappers.upb.h" #include "src/core/lib/gprpp/time.h" @@ -45,6 +46,10 @@ CommonTlsContext CommonTlsContextParse( common_tls_context_proto, ValidationErrors* errors); +absl::StatusOr ParseProtobufStructToJson( + const XdsResourceType::DecodeContext& context, + const google_protobuf_Struct* resource); + absl::optional ExtractXdsExtension( const XdsResourceType::DecodeContext& context, const google_protobuf_Any* any, ValidationErrors* errors); diff --git a/test/core/xds/xds_cluster_resource_type_test.cc b/test/core/xds/xds_cluster_resource_type_test.cc index 7cf4c192681..b07dfd8af36 100644 --- a/test/core/xds/xds_cluster_resource_type_test.cc +++ b/test/core/xds/xds_cluster_resource_type_test.cc @@ -1645,52 +1645,30 @@ TEST_F(HostOverrideStatusTest, CanExplicitlySetToEmpty) { using TelemetryLabelTest = XdsClusterTest; -TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) { - Cluster cluster; - cluster.set_type(cluster.EDS); - cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); - auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata(); - auto& label_map = - *filter_map["com.google.csm.telemetry_labels"].mutable_fields(); - *label_map["service_name"].mutable_string_value() = "abc"; - *label_map["service_namespace"].mutable_string_value() = "xyz"; - std::string serialized_resource; - ASSERT_TRUE(cluster.SerializeToString(&serialized_resource)); - auto* resource_type = XdsClusterResourceType::Get(); - auto decode_result = - resource_type->Decode(decode_context_, serialized_resource); - ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); - auto& resource = - static_cast(**decode_result.resource); - EXPECT_EQ(resource.service_telemetry_label.as_string_view(), "abc"); - EXPECT_EQ(resource.namespace_telemetry_label.as_string_view(), "xyz"); -} - -TEST_F(TelemetryLabelTest, MissingMetadataField) { - Cluster cluster; - cluster.set_type(cluster.EDS); - cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); - std::string serialized_resource; - ASSERT_TRUE(cluster.SerializeToString(&serialized_resource)); - auto* resource_type = XdsClusterResourceType::Get(); - auto decode_result = - resource_type->Decode(decode_context_, serialized_resource); - ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); - auto& resource = - static_cast(**decode_result.resource); - EXPECT_THAT(resource.service_telemetry_label.as_string_view(), - ::testing::IsEmpty()); - EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), - ::testing::IsEmpty()); +MATCHER_P(JsonEq, json_str, "") { + std::string actual = JsonDump(arg); + bool ok = ::testing::ExplainMatchResult(json_str, actual, result_listener); + if (!ok) *result_listener << "Actual: " << actual; + return ok; } -TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) { +TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) { Cluster cluster; cluster.set_type(cluster.EDS); cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata(); - auto& label_map = *filter_map["some_key"].mutable_fields(); - *label_map["some_value"].mutable_string_value() = "abc"; + auto& label_map = *filter_map["filter_key"].mutable_fields(); + *label_map["string_value"].mutable_string_value() = "abc"; + label_map["bool_value"].set_bool_value(true); + label_map["number_value"].set_number_value(3.14); + label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE); + auto& list_value_values = + *label_map["list_value"].mutable_list_value()->mutable_values(); + *list_value_values.Add()->mutable_string_value() = "efg"; + list_value_values.Add()->set_number_value(3.14); + auto& struct_value_fields = + *label_map["struct_value"].mutable_struct_value()->mutable_fields(); + struct_value_fields["bool_value"].set_bool_value(false); std::string serialized_resource; ASSERT_TRUE(cluster.SerializeToString(&serialized_resource)); auto* resource_type = XdsClusterResourceType::Get(); @@ -1699,31 +1677,22 @@ TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) { ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); auto& resource = static_cast(**decode_result.resource); - EXPECT_THAT(resource.service_telemetry_label.as_string_view(), - ::testing::IsEmpty()); - EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), - ::testing::IsEmpty()); + EXPECT_THAT(resource.metadata, + ::testing::ElementsAre(::testing::Pair( + "filter_key", JsonEq("{" + "\"bool_value\":true," + "\"list_value\":[\"efg\",3.14]," + "\"null_value\":null," + "\"number_value\":3.14," + "\"string_value\":\"abc\"," + "\"struct_value\":{\"bool_value\":false}" + "}")))); } -TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) { +TEST_F(TelemetryLabelTest, MissingMetadataField) { Cluster cluster; cluster.set_type(cluster.EDS); cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); - auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata(); - auto& label_map = - *filter_map["com.google.csm.telemetry_labels"].mutable_fields(); - label_map["bool_value"].set_bool_value(true); - label_map["number_value"].set_number_value(3.14); - *label_map["string_value"].mutable_string_value() = "abc"; - *label_map["service_name"].mutable_string_value() = "service"; - label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE); - auto& list_value_values = - *label_map["list_value"].mutable_list_value()->mutable_values(); - *list_value_values.Add()->mutable_string_value() = "efg"; - list_value_values.Add()->set_number_value(3.14); - auto& struct_value_fields = - *label_map["struct_value"].mutable_struct_value()->mutable_fields(); - struct_value_fields["bool_value"].set_bool_value(false); std::string serialized_resource; ASSERT_TRUE(cluster.SerializeToString(&serialized_resource)); auto* resource_type = XdsClusterResourceType::Get(); @@ -1732,9 +1701,7 @@ TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) { ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); auto& resource = static_cast(**decode_result.resource); - EXPECT_THAT(resource.service_telemetry_label.as_string_view(), "service"); - EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), - ::testing::IsEmpty()); + EXPECT_THAT(resource.metadata, ::testing::ElementsAre()); } } // namespace