[CSM C++] Fix behavior when peer does not send metadata (#34692)

@stanley-cheung noticed a bug where CSM labels were not being added on
metrics if the peer was not also CSM Observability enabled.

This PR fixes the behavior to add in the local labels in this case, as
well as add the remote workload type label with the value of unknown.
pull/34708/head
Yash Tibrewal 1 year ago committed by GitHub
parent 1c4da38d40
commit 5fd09c1aff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      src/cpp/ext/csm/metadata_exchange.cc
  2. 3
      src/cpp/ext/csm/metadata_exchange.h
  3. 3
      src/cpp/ext/otel/otel_client_filter.cc
  4. 7
      src/cpp/ext/otel/otel_plugin.h
  5. 7
      src/cpp/ext/otel/otel_server_call_tracer.cc
  6. 41
      test/cpp/ext/csm/metadata_exchange_test.cc

@ -215,9 +215,6 @@ class MeshLabelsIterable : public LabelsIterable {
if (pos_ < local_labels_size) {
return local_labels_[pos_++];
}
if (struct_pb.struct_pb == nullptr) {
return absl::nullopt;
}
if (++pos_ == local_labels_size + 1) {
return std::make_pair(kPeerTypeAttribute,
GetStringValueFromUpbStruct(
@ -225,7 +222,7 @@ class MeshLabelsIterable : public LabelsIterable {
struct_pb.arena.ptr()));
}
// Only handle GKE type for now.
switch (type_) {
switch (remote_type_) {
case GcpResourceType::kGke:
if ((pos_ - 2 - local_labels_size) >= kGkeAttributeList.size()) {
return absl::nullopt;
@ -247,7 +244,7 @@ class MeshLabelsIterable : public LabelsIterable {
if (struct_pb.struct_pb == nullptr) {
return local_labels_.size();
}
if (type_ != GcpResourceType::kGke) {
if (remote_type_ != GcpResourceType::kGke) {
return local_labels_.size() + 1;
}
return local_labels_.size() + kGkeAttributeList.size() + 1;
@ -255,6 +252,12 @@ class MeshLabelsIterable : public LabelsIterable {
void ResetIteratorPosition() override { pos_ = 0; }
// Returns true if the peer sent a non-empty base64 encoded
// "x-envoy-peer-metadata" metadata.
bool GotRemoteLabels() const {
return GetDecodedMetadata().struct_pb != nullptr;
}
private:
struct GkeAttribute {
absl::string_view otel_attribute;
@ -283,6 +286,12 @@ class MeshLabelsIterable : public LabelsIterable {
if (slice == nullptr) {
return absl::get<StructPb>(metadata_);
}
// Treat an empty slice as an invalid metadata value.
if (slice->empty()) {
metadata_ = StructPb{};
auto& struct_pb = absl::get<StructPb>(metadata_);
return struct_pb;
}
std::string decoded_metadata;
bool metadata_decoded =
absl::Base64Unescape(slice->as_string_view(), &decoded_metadata);
@ -292,7 +301,7 @@ class MeshLabelsIterable : public LabelsIterable {
struct_pb.struct_pb = google_protobuf_Struct_parse(
decoded_metadata.c_str(), decoded_metadata.size(),
struct_pb.arena.ptr());
type_ = StringToGcpResourceType(GetStringValueFromUpbStruct(
remote_type_ = StringToGcpResourceType(GetStringValueFromUpbStruct(
struct_pb.struct_pb, kMetadataExchangeTypeKey,
struct_pb.arena.ptr()));
}
@ -302,7 +311,7 @@ class MeshLabelsIterable : public LabelsIterable {
const std::vector<std::pair<absl::string_view, std::string>>& local_labels_;
// Holds either the metadata slice or the decoded proto struct.
mutable absl::variant<grpc_core::Slice, StructPb> metadata_;
mutable GcpResourceType type_;
mutable GcpResourceType remote_type_ = GcpResourceType::kUnknown;
uint32_t pos_ = 0;
};
@ -398,15 +407,22 @@ std::unique_ptr<LabelsIterable> ServiceMeshLabelsInjector::GetLabels(
grpc_metadata_batch* incoming_initial_metadata) {
auto peer_metadata =
incoming_initial_metadata->Take(grpc_core::XEnvoyPeerMetadata());
if (!peer_metadata.has_value()) {
return nullptr;
}
return std::make_unique<MeshLabelsIterable>(local_labels_,
*std::move(peer_metadata));
return std::make_unique<MeshLabelsIterable>(
local_labels_, peer_metadata.has_value() ? *std::move(peer_metadata)
: grpc_core::Slice());
}
void ServiceMeshLabelsInjector::AddLabels(
grpc_metadata_batch* outgoing_initial_metadata) {
grpc_metadata_batch* outgoing_initial_metadata,
LabelsIterable* labels_from_incoming_metadata) {
// On the server, if the labels from incoming metadata did not have a
// non-empty base64 encoded "x-envoy-peer-metadata", do not perform metadata
// exchange.
if (labels_from_incoming_metadata != nullptr &&
!static_cast<MeshLabelsIterable*>(labels_from_incoming_metadata)
->GotRemoteLabels()) {
return;
}
outgoing_initial_metadata->Set(grpc_core::XEnvoyPeerMetadata(),
serialized_labels_to_send_.Ref());
}

@ -47,7 +47,8 @@ class ServiceMeshLabelsInjector : public LabelsInjector {
// Modify the outgoing initial metadata with metadata information to be sent
// to the peer.
void AddLabels(grpc_metadata_batch* outgoing_initial_metadata) override;
void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
LabelsIterable* labels_from_incoming_metadata) override;
private:
std::vector<std::pair<absl::string_view, std::string>> local_labels_;

@ -138,7 +138,8 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata);
OTelPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
}
}

@ -70,8 +70,11 @@ class LabelsInjector {
grpc_metadata_batch* incoming_initial_metadata) = 0;
// Modify the outgoing initial metadata with metadata information to be sent
// to the peer.
virtual void AddLabels(grpc_metadata_batch* outgoing_initial_metadata) = 0;
// to the peer. On the server side, \a labels_from_incoming_metadata returned
// from `GetLabels` should be provided as input here. On the client side, this
// should be nullptr.
virtual void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
LabelsIterable* labels_from_incoming_metadata) = 0;
};
struct OTelPluginState {

@ -76,10 +76,9 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
// arguments.
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override {
// Only add labels to outgoing metadata if labels were received from peer.
if (OTelPluginState().labels_injector != nullptr &&
injected_labels_ != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata);
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata,
injected_labels_.get());
}
}

@ -111,7 +111,8 @@ class MetadataExchangeTest
: public OTelPluginEnd2EndTest,
public ::testing::WithParamInterface<TestScenario> {
protected:
void Init(const absl::flat_hash_set<absl::string_view>& metric_names) {
void Init(const absl::flat_hash_set<absl::string_view>& metric_names,
bool enable_client_side_injector = true) {
const char* kBootstrap =
"{\"node\": {\"id\": "
"\"projects/1234567890/networks/mesh:mesh-id/nodes/"
@ -134,7 +135,12 @@ class MetadataExchangeTest
metric_names, /*resource=*/GetParam().GetTestResource(),
/*labels_injector=*/
std::make_unique<grpc::internal::ServiceMeshLabelsInjector>(
GetParam().GetTestResource().GetAttributes()));
GetParam().GetTestResource().GetAttributes()),
/*test_no_meter_provider=*/false,
/*target_selector=*/
[enable_client_side_injector](absl::string_view /*target*/) {
return enable_client_side_injector;
});
}
~MetadataExchangeTest() override {
@ -197,6 +203,7 @@ class MetadataExchangeTest
char* bootstrap_file_name_ = nullptr;
};
// Verify that grpc.client.attempt.started does not get service mesh attributes
TEST_P(MetadataExchangeTest, ClientAttemptStarted) {
Init(/*metric_names=*/{
grpc::internal::OTelClientAttemptStartedInstrumentName()});
@ -245,6 +252,7 @@ TEST_P(MetadataExchangeTest, ClientAttemptDuration) {
VerifyServiceMeshAttributes(attributes);
}
// Verify that grpc.server.call.started does not get service mesh attributes
TEST_P(MetadataExchangeTest, ServerCallStarted) {
Init(
/*metric_names=*/{grpc::internal::OTelServerCallStartedInstrumentName()});
@ -287,6 +295,35 @@ TEST_P(MetadataExchangeTest, ServerCallDuration) {
VerifyServiceMeshAttributes(attributes);
}
// Test that the server records unknown when the client does not send metadata
TEST_P(MetadataExchangeTest, ClientDoesNotSendMetadata) {
Init(
/*metric_names=*/{grpc::internal::OTelServerCallDurationInstrumentName()},
/*enable_client_side_injector=*/false);
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.method")), kMethodName);
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.status")), "OK");
EXPECT_EQ(
absl::get<std::string>(attributes.at("csm.workload_canonical_service")),
"canonical_service");
EXPECT_EQ(absl::get<std::string>(attributes.at("csm.mesh_id")), "mesh-id");
EXPECT_EQ(absl::get<std::string>(attributes.at("csm.remote_workload_type")),
"unknown");
}
INSTANTIATE_TEST_SUITE_P(
MetadataExchange, MetadataExchangeTest,
::testing::Values(

Loading…
Cancel
Save