[xDS] generalize CDS metadata handling (#37468)

This is a step toward gRFC A83 (https://github.com/grpc/proposal/pull/438).

Closes #37468

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37468 from markdroth:cds_metadata 2d17c46cc8
PiperOrigin-RevId: 664910293
pull/37532/head
Mark D. Roth 3 months ago committed by Copybara-Service
parent 688cccec75
commit 41da29972f
  1. 1
      src/core/BUILD
  2. 20
      src/core/load_balancing/xds/xds_cluster_impl.cc
  3. 12
      src/core/xds/grpc/xds_cluster.cc
  4. 7
      src/core/xds/grpc/xds_cluster.h
  5. 63
      src/core/xds/grpc/xds_cluster_parser.cc
  6. 8
      src/core/xds/grpc/xds_common_types_parser.cc
  7. 5
      src/core/xds/grpc/xds_common_types_parser.h
  8. 99
      test/core/xds/xds_cluster_resource_type_test.cc

@ -5418,6 +5418,7 @@ grpc_cc_library(
"xds/grpc/xds_cluster.h", "xds/grpc/xds_cluster.h",
], ],
external_deps = [ external_deps = [
"absl/container:flat_hash_map",
"absl/strings", "absl/strings",
"absl/types:optional", "absl/types:optional",
"absl/types:variant", "absl/types:variant",

@ -292,6 +292,8 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
// Current config from the resolver. // Current config from the resolver.
RefCountedPtr<XdsClusterImplLbConfig> config_; RefCountedPtr<XdsClusterImplLbConfig> config_;
std::shared_ptr<const XdsClusterResource> cluster_resource_; std::shared_ptr<const XdsClusterResource> cluster_resource_;
RefCountedStringValue service_telemetry_label_;
RefCountedStringValue namespace_telemetry_label_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_; RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
// Current concurrent number of requests. // Current concurrent number of requests.
@ -396,10 +398,9 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
: call_counter_(xds_cluster_impl_lb->call_counter_), : call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_( max_concurrent_requests_(
xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests), xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
service_telemetry_label_( service_telemetry_label_(xds_cluster_impl_lb->service_telemetry_label_),
xds_cluster_impl_lb->cluster_resource_->service_telemetry_label),
namespace_telemetry_label_( namespace_telemetry_label_(
xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label), xds_cluster_impl_lb->namespace_telemetry_label_),
drop_config_(xds_cluster_impl_lb->drop_config_), drop_config_(xds_cluster_impl_lb->drop_config_),
drop_stats_(xds_cluster_impl_lb->drop_stats_), drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) { picker_(std::move(picker)) {
@ -647,6 +648,19 @@ absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
// Update config state, now that we're done comparing old and new fields. // Update config state, now that we're done comparing old and new fields.
config_ = std::move(new_config); config_ = std::move(new_config);
cluster_resource_ = new_cluster_config.cluster; cluster_resource_ = new_cluster_config.cluster;
auto it2 =
cluster_resource_->metadata.find("com.google.csm.telemetry_labels");
if (it2 != cluster_resource_->metadata.end()) {
auto& json_object = it2->second.object();
auto it3 = json_object.find("service_name");
if (it3 != json_object.end() && it3->second.type() == Json::Type::kString) {
service_telemetry_label_ = RefCountedStringValue(it3->second.string());
}
it3 = json_object.find("service_namespace");
if (it3 != json_object.end() && it3->second.type() == Json::Type::kString) {
namespace_telemetry_label_ = RefCountedStringValue(it3->second.string());
}
}
drop_config_ = endpoint_config->endpoints != nullptr drop_config_ = endpoint_config->endpoints != nullptr
? endpoint_config->endpoints->drop_config ? endpoint_config->endpoints->drop_config
: nullptr; : nullptr;

@ -65,14 +65,14 @@ std::string XdsClusterResource::ToString() const {
absl::StrCat("max_concurrent_requests=", max_concurrent_requests)); absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
contents.push_back(absl::StrCat("override_host_statuses=", contents.push_back(absl::StrCat("override_host_statuses=",
override_host_statuses.ToString())); override_host_statuses.ToString()));
if (!service_telemetry_label.as_string_view().empty()) { if (!metadata.empty()) {
contents.push_back(absl::StrCat("service_name_telemetry_label=", std::vector<std::string> metadata_entries;
service_telemetry_label.as_string_view())); for (const auto& p : metadata) {
metadata_entries.push_back(
absl::StrCat(p.first, "=", JsonDump(p.second)));
} }
if (!namespace_telemetry_label.as_string_view().empty()) {
contents.push_back( contents.push_back(
absl::StrCat("service_namespace_telemetry_label=", absl::StrCat("metadata={", absl::StrJoin(metadata_entries, ", "), "}"));
namespace_telemetry_label.as_string_view()));
} }
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
} }

@ -20,6 +20,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "absl/types/variant.h" #include "absl/types/variant.h"
@ -86,8 +87,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
XdsHealthStatusSet override_host_statuses; XdsHealthStatusSet override_host_statuses;
RefCountedStringValue service_telemetry_label; absl::flat_hash_map<std::string, Json> metadata;
RefCountedStringValue namespace_telemetry_label;
bool operator==(const XdsClusterResource& other) const { bool operator==(const XdsClusterResource& other) const {
return type == other.type && lb_policy_config == other.lb_policy_config && return type == other.type && lb_policy_config == other.lb_policy_config &&
@ -97,8 +97,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
max_concurrent_requests == other.max_concurrent_requests && max_concurrent_requests == other.max_concurrent_requests &&
outlier_detection == other.outlier_detection && outlier_detection == other.outlier_detection &&
override_host_statuses == other.override_host_statuses && override_host_statuses == other.override_host_statuses &&
service_telemetry_label == other.service_telemetry_label && metadata == other.metadata;
namespace_telemetry_label == other.namespace_telemetry_label;
} }
std::string ToString() const; std::string ToString() const;

@ -641,34 +641,49 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
cds_update->override_host_statuses.Add( cds_update->override_host_statuses.Add(
XdsHealthStatus(XdsHealthStatus::kHealthy)); XdsHealthStatus(XdsHealthStatus::kHealthy));
} }
// Record telemetry labels (if any). // Parse metadata.
const envoy_config_core_v3_Metadata* metadata = const envoy_config_core_v3_Metadata* metadata =
envoy_config_cluster_v3_Cluster_metadata(cluster); envoy_config_cluster_v3_Cluster_metadata(cluster);
if (metadata != nullptr) { if (metadata != nullptr) {
google_protobuf_Struct* telemetry_labels_struct; // First, try typed_filter_metadata.
if (envoy_config_core_v3_Metadata_filter_metadata_get(
metadata,
StdStringToUpbString(
absl::string_view("com.google.csm.telemetry_labels")),
&telemetry_labels_struct)) {
size_t iter = kUpb_Map_Begin; size_t iter = kUpb_Map_Begin;
const google_protobuf_Struct_FieldsEntry* fields_entry; const envoy_config_core_v3_Metadata_TypedFilterMetadataEntry* typed_entry;
while ((fields_entry = google_protobuf_Struct_fields_next( while (
telemetry_labels_struct, &iter)) != nullptr) { (typed_entry = envoy_config_core_v3_Metadata_typed_filter_metadata_next(
// Adds any entry whose value is a string to telemetry_labels. metadata, &iter)) != nullptr) {
const google_protobuf_Value* value = absl::string_view key = UpbStringToAbsl(
google_protobuf_Struct_FieldsEntry_value(fields_entry); envoy_config_core_v3_Metadata_TypedFilterMetadataEntry_key(
if (google_protobuf_Value_has_string_value(value)) { typed_entry));
if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key( ValidationErrors::ScopedField field(
fields_entry)) == "service_name") { &errors, absl::StrCat(".metadata.typed_filter_metadata[", key, "]"));
cds_update->service_telemetry_label = RefCountedStringValue( auto extension = ExtractXdsExtension(
UpbStringToAbsl(google_protobuf_Value_string_value(value))); context,
} else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key( envoy_config_core_v3_Metadata_TypedFilterMetadataEntry_value(
fields_entry)) == "service_namespace") { typed_entry),
cds_update->namespace_telemetry_label = RefCountedStringValue( &errors);
UpbStringToAbsl(google_protobuf_Value_string_value(value))); if (!extension.has_value()) continue;
} // TODO(roth): If we ever need to support another type here, refactor
} // this into a separate registry.
if (extension->type == "extensions.filters.http.gcp_authn.v3.Audience") {
// TODO(roth): In a subsequent PR, add parsing here.
}
}
// Then, try filter_metadata.
iter = kUpb_Map_Begin;
const envoy_config_core_v3_Metadata_FilterMetadataEntry* entry;
while ((entry = envoy_config_core_v3_Metadata_filter_metadata_next(
metadata, &iter)) != nullptr) {
absl::string_view key = UpbStringToAbsl(
envoy_config_core_v3_Metadata_FilterMetadataEntry_key(entry));
auto json = ParseProtobufStructToJson(
context,
envoy_config_core_v3_Metadata_FilterMetadataEntry_value(entry));
if (!json.ok()) {
ValidationErrors::ScopedField field(
&errors, absl::StrCat(".metadata.filter_metadata[", key, "]"));
errors.AddError(json.status().message());
} else if (!cds_update->metadata.contains(key)) {
cds_update->metadata[key] = std::move(*json);
} }
} }
} }

@ -372,11 +372,9 @@ CommonTlsContext CommonTlsContextParse(
} }
// //
// ExtractXdsExtension // ParseProtobufStructToJson()
// //
namespace {
absl::StatusOr<Json> ParseProtobufStructToJson( absl::StatusOr<Json> ParseProtobufStructToJson(
const XdsResourceType::DecodeContext& context, const XdsResourceType::DecodeContext& context,
const google_protobuf_Struct* resource) { const google_protobuf_Struct* resource) {
@ -405,7 +403,9 @@ absl::StatusOr<Json> ParseProtobufStructToJson(
return std::move(*json); return std::move(*json);
} }
} // namespace //
// ExtractXdsExtension()
//
absl::optional<XdsExtension> ExtractXdsExtension( absl::optional<XdsExtension> ExtractXdsExtension(
const XdsResourceType::DecodeContext& context, const XdsResourceType::DecodeContext& context,

@ -21,6 +21,7 @@
#include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h" #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
#include "google/protobuf/any.upb.h" #include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h" #include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/wrappers.upb.h" #include "google/protobuf/wrappers.upb.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
@ -45,6 +46,10 @@ CommonTlsContext CommonTlsContextParse(
common_tls_context_proto, common_tls_context_proto,
ValidationErrors* errors); ValidationErrors* errors);
absl::StatusOr<Json> ParseProtobufStructToJson(
const XdsResourceType::DecodeContext& context,
const google_protobuf_Struct* resource);
absl::optional<XdsExtension> ExtractXdsExtension( absl::optional<XdsExtension> ExtractXdsExtension(
const XdsResourceType::DecodeContext& context, const XdsResourceType::DecodeContext& context,
const google_protobuf_Any* any, ValidationErrors* errors); const google_protobuf_Any* any, ValidationErrors* errors);

@ -1643,54 +1643,32 @@ TEST_F(HostOverrideStatusTest, CanExplicitlySetToEmpty) {
EXPECT_EQ(resource.override_host_statuses.ToString(), "{}"); EXPECT_EQ(resource.override_host_statuses.ToString(), "{}");
} }
using TelemetryLabelTest = XdsClusterTest; using MetadataTest = XdsClusterTest;
TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) { MATCHER_P(JsonEq, json_str, "") {
Cluster cluster; std::string actual = JsonDump(arg);
cluster.set_type(cluster.EDS); bool ok = ::testing::ExplainMatchResult(json_str, actual, result_listener);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); if (!ok) *result_listener << "Actual: " << actual;
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata(); return ok;
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
*label_map["service_name"].mutable_string_value() = "abc";
*label_map["service_namespace"].mutable_string_value() = "xyz";
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.service_telemetry_label.as_string_view(), "abc");
EXPECT_EQ(resource.namespace_telemetry_label.as_string_view(), "xyz");
}
TEST_F(TelemetryLabelTest, MissingMetadataField) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
} }
TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) { TEST_F(MetadataTest, MetadataSet) {
Cluster cluster; Cluster cluster;
cluster.set_type(cluster.EDS); cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata(); auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map = *filter_map["some_key"].mutable_fields(); auto& label_map = *filter_map["filter_key"].mutable_fields();
*label_map["some_value"].mutable_string_value() = "abc"; *label_map["string_value"].mutable_string_value() = "abc";
label_map["bool_value"].set_bool_value(true);
label_map["number_value"].set_number_value(3.14);
label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE);
auto& list_value_values =
*label_map["list_value"].mutable_list_value()->mutable_values();
*list_value_values.Add()->mutable_string_value() = "efg";
list_value_values.Add()->set_number_value(3.14);
auto& struct_value_fields =
*label_map["struct_value"].mutable_struct_value()->mutable_fields();
struct_value_fields["bool_value"].set_bool_value(false);
std::string serialized_resource; std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource)); ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get(); auto* resource_type = XdsClusterResourceType::Get();
@ -1699,31 +1677,22 @@ TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource = auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource); static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(), EXPECT_THAT(resource.metadata,
::testing::IsEmpty()); ::testing::ElementsAre(::testing::Pair(
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), "filter_key", JsonEq("{"
::testing::IsEmpty()); "\"bool_value\":true,"
} "\"list_value\":[\"efg\",3.14],"
"\"null_value\":null,"
TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) { "\"number_value\":3.14,"
"\"string_value\":\"abc\","
"\"struct_value\":{\"bool_value\":false}"
"}"))));
}
TEST_F(MetadataTest, MetadataUnset) {
Cluster cluster; Cluster cluster;
cluster.set_type(cluster.EDS); cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
label_map["bool_value"].set_bool_value(true);
label_map["number_value"].set_number_value(3.14);
*label_map["string_value"].mutable_string_value() = "abc";
*label_map["service_name"].mutable_string_value() = "service";
label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE);
auto& list_value_values =
*label_map["list_value"].mutable_list_value()->mutable_values();
*list_value_values.Add()->mutable_string_value() = "efg";
list_value_values.Add()->set_number_value(3.14);
auto& struct_value_fields =
*label_map["struct_value"].mutable_struct_value()->mutable_fields();
struct_value_fields["bool_value"].set_bool_value(false);
std::string serialized_resource; std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource)); ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get(); auto* resource_type = XdsClusterResourceType::Get();
@ -1732,9 +1701,7 @@ TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource = auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource); static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(), "service"); EXPECT_THAT(resource.metadata, ::testing::ElementsAre());
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
} }
} // namespace } // namespace

Loading…
Cancel
Save