Revert "[GSM Observability] "Revert Metadata Exchange Implementation"" (#34234)

Reverts grpc/grpc#34233
pull/33646/head
Yash Tibrewal 1 year ago committed by GitHub
parent 4f80a4f9aa
commit 0dd8a056b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/core/lib/transport/metadata_batch.h
  2. 11
      src/cpp/ext/gsm/BUILD
  3. 344
      src/cpp/ext/gsm/metadata_exchange.cc
  4. 64
      src/cpp/ext/gsm/metadata_exchange.h
  5. 4
      src/cpp/ext/otel/BUILD
  6. 109
      src/cpp/ext/otel/key_value_iterable.h
  7. 9
      src/cpp/ext/otel/otel_call_tracer.h
  8. 62
      src/cpp/ext/otel/otel_client_filter.cc
  9. 3
      src/cpp/ext/otel/otel_client_filter.h
  10. 7
      src/cpp/ext/otel/otel_plugin.cc
  11. 50
      src/cpp/ext/otel/otel_plugin.h
  12. 77
      src/cpp/ext/otel/otel_server_call_tracer.cc
  13. 21
      test/cpp/ext/gsm/BUILD
  14. 1
      test/cpp/ext/gsm/gsm_observability_test.cc
  15. 232
      test/cpp/ext/gsm/metadata_exchange_test.cc
  16. 28
      test/cpp/ext/otel/BUILD
  17. 143
      test/cpp/ext/otel/otel_plugin_test.cc
  18. 132
      test/cpp/ext/otel/otel_test_library.cc
  19. 94
      test/cpp/ext/otel/otel_test_library.h
  20. 6
      tools/distrib/fix_build_deps.py

@ -305,6 +305,13 @@ struct GrpcTagsBinMetadata : public SimpleSliceBasedMetadata {
static absl::string_view key() { return "grpc-tags-bin"; }
};
// XEnvoyPeerMetadata
struct XEnvoyPeerMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
using CompressionTraits = StableValueCompressor;
static absl::string_view key() { return "x-envoy-peer-metadata"; }
};
// :authority metadata trait.
struct HttpAuthorityMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
@ -1474,6 +1481,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcServerStatsBinMetadata, grpc_core::GrpcTraceBinMetadata,
grpc_core::GrpcTagsBinMetadata, grpc_core::GrpcLbClientStatsMetadata,
grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata,
grpc_core::XEnvoyPeerMetadata,
// Non-encodable things
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,

@ -32,21 +32,32 @@ grpc_cc_library(
name = "gsm_observability",
srcs = [
"gsm_observability.cc",
"metadata_exchange.cc",
],
hdrs = [
"gsm_observability.h",
"metadata_exchange.h",
],
external_deps = [
"absl/container:flat_hash_set",
"absl/meta:type_traits",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
"absl/types:variant",
"otel/sdk/src/metrics",
"otel/sdk:headers",
"upb_lib",
],
language = "c++",
visibility = ["//:__subpackages__"],
deps = [
"//:gpr_platform",
"//:grpc_base",
"//:protobuf_struct_upb",
"//src/core:env",
"//src/core:slice",
"//src/cpp/ext/otel:otel_plugin",
],
)

@ -0,0 +1,344 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/gsm/metadata_exchange.h"
#include <stddef.h>
#include <array>
#include <cstdint>
#include <unordered_map>
#include "absl/meta/type_traits.h"
#include "absl/strings/escaping.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "google/protobuf/struct.upb.h"
#include "opentelemetry/sdk/resource/semantic_conventions.h"
#include "upb/base/string_view.h"
#include "upb/mem/arena.h"
#include "upb/upb.hpp"
#include "src/core/lib/gprpp/env.h"
namespace grpc {
namespace internal {
namespace {
// The keys that will be used in the Metadata Exchange between local and remote.
constexpr absl::string_view kMetadataExchangeTypeKey = "type";
constexpr absl::string_view kMetadataExchangePodNameKey = "pod_name";
constexpr absl::string_view kMetadataExchangeContainerNameKey =
"container_name";
constexpr absl::string_view kMetadataExchangeNamespaceNameKey =
"namespace_name";
constexpr absl::string_view kMetadataExchangeClusterNameKey = "cluster_name";
constexpr absl::string_view kMetadataExchangeLocationKey = "location";
constexpr absl::string_view kMetadataExchangeProjectIdKey = "project_id";
constexpr absl::string_view kMetadataExchangeCanonicalServiceKey =
"canonical_service";
// The keys that will be used for the peer attributes when recording metrics.
constexpr absl::string_view kPeerTypeAttribute = "gsm.remote_workload_type";
constexpr absl::string_view kPeerPodNameAttribute =
"gsm.remote_workload_pod_name";
constexpr absl::string_view kPeerContainerNameAttribute =
"gsm.remote_workload_container_name";
constexpr absl::string_view kPeerNamespaceNameAttribute =
"gsm.remote_workload_namespace_name";
constexpr absl::string_view kPeerClusterNameAttribute =
"gsm.remote_workload_cluster_name";
constexpr absl::string_view kPeerLocationAttribute =
"gsm.remote_workload_location";
constexpr absl::string_view kPeerProjectIdAttribute =
"gsm.remote_workload_project_id";
constexpr absl::string_view kPeerCanonicalServiceAttribute =
"gsm.remote_workload_canonical_service";
// Type values used by Google Cloud Resource Detector
constexpr absl::string_view kGkeType = "gcp_kubernetes_engine";
enum class GcpResourceType : std::uint8_t { kGke, kUnknown };
GcpResourceType StringToGcpResourceType(absl::string_view type) {
if (type == kGkeType) {
return GcpResourceType::kGke;
}
return GcpResourceType::kUnknown;
}
upb_StringView AbslStrToUpbStr(absl::string_view str) {
return upb_StringView_FromDataAndSize(str.data(), str.size());
}
absl::string_view UpbStrToAbslStr(upb_StringView str) {
return absl::string_view(str.data, str.size);
}
void AddStringKeyValueToStructProto(google_protobuf_Struct* struct_pb,
absl::string_view key,
absl::string_view value, upb_Arena* arena) {
google_protobuf_Value* value_pb = google_protobuf_Value_new(arena);
google_protobuf_Value_set_string_value(value_pb, AbslStrToUpbStr(value));
google_protobuf_Struct_fields_set(struct_pb, AbslStrToUpbStr(key), value_pb,
arena);
}
absl::string_view GetStringValueFromAttributeMap(
const opentelemetry::sdk::common::AttributeMap& map,
absl::string_view key) {
const auto& attributes = map.GetAttributes();
const auto it = attributes.find(std::string(key));
if (it == attributes.end()) {
return "unknown";
}
const auto* string_value = absl::get_if<std::string>(&it->second);
if (string_value == nullptr) {
return "unknown";
}
return *string_value;
}
absl::string_view GetStringValueFromUpbStruct(google_protobuf_Struct* struct_pb,
absl::string_view key,
upb_Arena* arena) {
if (struct_pb == nullptr) {
return "unknown";
}
google_protobuf_Value* value_pb = google_protobuf_Value_new(arena);
bool present = google_protobuf_Struct_fields_get(
struct_pb, AbslStrToUpbStr(key), &value_pb);
if (present) {
if (google_protobuf_Value_has_string_value(value_pb)) {
return UpbStrToAbslStr(google_protobuf_Value_string_value(value_pb));
}
}
return "unknown";
}
class LocalLabelsIterable : public LabelsIterable {
public:
explicit LocalLabelsIterable(
const std::vector<std::pair<absl::string_view, std::string>>& labels)
: labels_(labels) {}
absl::optional<std::pair<absl::string_view, absl::string_view>> Next()
override {
if (pos_ >= labels_.size()) {
return absl::nullopt;
}
return labels_[pos_++];
}
size_t Size() const override { return labels_.size(); }
void ResetIteratorPosition() override { pos_ = 0; }
private:
size_t pos_ = 0;
const std::vector<std::pair<absl::string_view, std::string>>& labels_;
};
class PeerLabelsIterable : public LabelsIterable {
public:
explicit PeerLabelsIterable(grpc_core::Slice remote_metadata)
: metadata_(std::move(remote_metadata)) {}
absl::optional<std::pair<absl::string_view, absl::string_view>> Next()
override {
auto& struct_pb = GetDecodedMetadata();
if (struct_pb.struct_pb == nullptr) {
return absl::nullopt;
}
if (++pos_ == 1) {
return std::make_pair(kPeerTypeAttribute,
GetStringValueFromUpbStruct(
struct_pb.struct_pb, kMetadataExchangeTypeKey,
struct_pb.arena.ptr()));
}
// Only handle GKE type for now.
switch (type_) {
case GcpResourceType::kGke:
if (pos_ - 2 >= kGkeAttributeList.size()) {
return absl::nullopt;
}
return std::make_pair(
kGkeAttributeList[pos_ - 2].otel_attribute,
GetStringValueFromUpbStruct(
struct_pb.struct_pb,
kGkeAttributeList[pos_ - 2].metadata_attribute,
struct_pb.arena.ptr()));
case GcpResourceType::kUnknown:
return absl::nullopt;
}
}
size_t Size() const override {
auto& struct_pb = GetDecodedMetadata();
if (struct_pb.struct_pb == nullptr) {
return 0;
}
if (type_ != GcpResourceType::kGke) {
return 1;
}
return kGkeAttributeList.size();
}
void ResetIteratorPosition() override { pos_ = 0; }
private:
struct GkeAttribute {
absl::string_view otel_attribute;
absl::string_view metadata_attribute;
};
struct StructPb {
upb::Arena arena;
google_protobuf_Struct* struct_pb = nullptr;
};
static constexpr std::array<GkeAttribute, 7> kGkeAttributeList = {
GkeAttribute{kPeerPodNameAttribute, kMetadataExchangePodNameKey},
GkeAttribute{kPeerContainerNameAttribute,
kMetadataExchangeContainerNameKey},
GkeAttribute{kPeerNamespaceNameAttribute,
kMetadataExchangeNamespaceNameKey},
GkeAttribute{kPeerClusterNameAttribute, kMetadataExchangeClusterNameKey},
GkeAttribute{kPeerLocationAttribute, kMetadataExchangeLocationKey},
GkeAttribute{kPeerProjectIdAttribute, kMetadataExchangeProjectIdKey},
GkeAttribute{kPeerCanonicalServiceAttribute,
kMetadataExchangeCanonicalServiceKey},
};
StructPb& GetDecodedMetadata() const {
auto* slice = absl::get_if<grpc_core::Slice>(&metadata_);
if (slice == nullptr) {
return absl::get<StructPb>(metadata_);
}
std::string decoded_metadata;
bool metadata_decoded =
absl::Base64Unescape(slice->as_string_view(), &decoded_metadata);
metadata_ = StructPb{};
auto& struct_pb = absl::get<StructPb>(metadata_);
if (metadata_decoded) {
struct_pb.struct_pb = google_protobuf_Struct_parse(
decoded_metadata.c_str(), decoded_metadata.size(),
struct_pb.arena.ptr());
type_ = StringToGcpResourceType(GetStringValueFromUpbStruct(
struct_pb.struct_pb, kMetadataExchangeTypeKey,
struct_pb.arena.ptr()));
}
return struct_pb;
}
// Holds either the metadata slice or the decoded proto struct.
mutable absl::variant<grpc_core::Slice, StructPb> metadata_;
mutable GcpResourceType type_;
uint32_t pos_ = 0;
};
constexpr std::array<PeerLabelsIterable::GkeAttribute, 7>
PeerLabelsIterable::kGkeAttributeList;
} // namespace
ServiceMeshLabelsInjector::ServiceMeshLabelsInjector(
const opentelemetry::sdk::common::AttributeMap& map) {
upb::Arena arena;
auto* metadata = google_protobuf_Struct_new(arena.ptr());
// Assume kubernetes for now
absl::string_view type_value = GetStringValueFromAttributeMap(
map, opentelemetry::sdk::resource::SemanticConventions::kCloudPlatform);
absl::string_view pod_name_value = GetStringValueFromAttributeMap(
map, opentelemetry::sdk::resource::SemanticConventions::kK8sPodName);
absl::string_view container_name_value = GetStringValueFromAttributeMap(
map,
opentelemetry::sdk::resource::SemanticConventions::kK8sContainerName);
absl::string_view namespace_value = GetStringValueFromAttributeMap(
map,
opentelemetry::sdk::resource::SemanticConventions::kK8sNamespaceName);
absl::string_view cluster_name_value = GetStringValueFromAttributeMap(
map, opentelemetry::sdk::resource::SemanticConventions::kK8sClusterName);
absl::string_view cluster_location_value = GetStringValueFromAttributeMap(
map, opentelemetry::sdk::resource::SemanticConventions::
kCloudRegion); // if regional
if (cluster_location_value == "unknown") {
cluster_location_value = GetStringValueFromAttributeMap(
map, opentelemetry::sdk::resource::SemanticConventions::
kCloudAvailabilityZone); // if zonal
}
absl::string_view project_id_value = GetStringValueFromAttributeMap(
map, opentelemetry::sdk::resource::SemanticConventions::kCloudAccountId);
std::string canonical_service_value =
grpc_core::GetEnv("GSM_CANONICAL_SERVICE_NAME").value_or("unknown");
// Create metadata to be sent over wire.
AddStringKeyValueToStructProto(metadata, kMetadataExchangeTypeKey, type_value,
arena.ptr());
// Only handle GKE for now
if (type_value == kGkeType) {
AddStringKeyValueToStructProto(metadata, kMetadataExchangePodNameKey,
pod_name_value, arena.ptr());
AddStringKeyValueToStructProto(metadata, kMetadataExchangeContainerNameKey,
container_name_value, arena.ptr());
AddStringKeyValueToStructProto(metadata, kMetadataExchangeNamespaceNameKey,
namespace_value, arena.ptr());
AddStringKeyValueToStructProto(metadata, kMetadataExchangeClusterNameKey,
cluster_name_value, arena.ptr());
AddStringKeyValueToStructProto(metadata, kMetadataExchangeLocationKey,
cluster_location_value, arena.ptr());
AddStringKeyValueToStructProto(metadata, kMetadataExchangeProjectIdKey,
project_id_value, arena.ptr());
AddStringKeyValueToStructProto(metadata,
kMetadataExchangeCanonicalServiceKey,
canonical_service_value, arena.ptr());
}
size_t output_length;
char* output =
google_protobuf_Struct_serialize(metadata, arena.ptr(), &output_length);
serialized_labels_to_send_ = grpc_core::Slice::FromCopiedString(
absl::Base64Escape(absl::string_view(output, output_length)));
// Fill up local labels map. The rest we get from the detected Resource and
// from the peer.
// TODO(yashykt): Add mesh_id
}
std::unique_ptr<LabelsIterable> ServiceMeshLabelsInjector::GetPeerLabels(
grpc_metadata_batch* incoming_initial_metadata) {
auto peer_metadata =
incoming_initial_metadata->Take(grpc_core::XEnvoyPeerMetadata());
if (!peer_metadata.has_value()) {
return nullptr;
}
return std::make_unique<PeerLabelsIterable>(*std::move(peer_metadata));
}
std::unique_ptr<LabelsIterable> ServiceMeshLabelsInjector::GetLocalLabels() {
return std::make_unique<LocalLabelsIterable>(local_labels_);
}
void ServiceMeshLabelsInjector::AddLabels(
grpc_metadata_batch* outgoing_initial_metadata) {
outgoing_initial_metadata->Set(grpc_core::XEnvoyPeerMetadata(),
serialized_labels_to_send_.Ref());
}
} // namespace internal
} // namespace grpc

@ -0,0 +1,64 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_GSM_METADATA_EXCHANGE_H
#define GRPC_SRC_CPP_EXT_GSM_METADATA_EXCHANGE_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include "opentelemetry/sdk/common/attribute_utils.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
class ServiceMeshLabelsInjector : public LabelsInjector {
public:
explicit ServiceMeshLabelsInjector(
const opentelemetry::sdk::common::AttributeMap& map);
// Read the incoming initial metadata to get the set of labels to be added to
// metrics.
std::unique_ptr<LabelsIterable> GetPeerLabels(
grpc_metadata_batch* incoming_initial_metadata) override;
// Get the local labels to be added to metrics. To be used when the peer
// metadata is not available, for example, for started RPCs metric.
std::unique_ptr<LabelsIterable> GetLocalLabels() override;
// Modify the outgoing initial metadata with metadata information to be sent
// to the peer.
void AddLabels(grpc_metadata_batch* outgoing_initial_metadata) override;
private:
std::vector<std::pair<absl::string_view, std::string>> local_labels_;
grpc_core::Slice serialized_labels_to_send_;
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_GSM_METADATA_EXCHANGE_H

@ -36,6 +36,7 @@ grpc_cc_library(
"otel_server_call_tracer.cc",
],
hdrs = [
"key_value_iterable.h",
"otel_call_tracer.h",
"otel_client_filter.h",
"otel_plugin.h",
@ -44,13 +45,13 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/time",
"absl/types:optional",
"absl/types:span",
"otel/api",
],
language = "c++",
@ -62,6 +63,7 @@ grpc_cc_library(
"//:gpr_platform",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_public_hdrs",
"//:legacy_context",
"//src/core:arena",
"//src/core:arena_promise",

@ -0,0 +1,109 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_OTEL_KEY_VALUE_ITERABLE_H
#define GRPC_SRC_CPP_EXT_OTEL_KEY_VALUE_ITERABLE_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <utility>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "opentelemetry/common/attribute_value.h"
#include "opentelemetry/common/key_value_iterable.h"
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/string_view.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
inline opentelemetry::nostd::string_view AbslStrViewToOTelStrView(
absl::string_view str) {
return opentelemetry::nostd::string_view(str.data(), str.size());
}
// An iterable class based on opentelemetry::common::KeyValueIterable that
// allows gRPC to iterate on its various sources of attributes and avoid an
// allocation in cases wherever possible.
class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
public:
explicit KeyValueIterable(
LabelsIterable* local_labels_iterable,
LabelsIterable* peer_labels_iterable,
absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels)
: local_labels_iterable_(local_labels_iterable),
peer_labels_iterable_(peer_labels_iterable),
additional_labels_(additional_labels) {}
bool ForEachKeyValue(opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const noexcept override {
if (local_labels_iterable_ != nullptr) {
local_labels_iterable_->ResetIteratorPosition();
while (const auto& pair = local_labels_iterable_->Next()) {
if (!callback(AbslStrViewToOTelStrView(pair->first),
AbslStrViewToOTelStrView(pair->second))) {
return false;
}
}
}
if (peer_labels_iterable_ != nullptr) {
peer_labels_iterable_->ResetIteratorPosition();
while (const auto& pair = peer_labels_iterable_->Next()) {
if (!callback(AbslStrViewToOTelStrView(pair->first),
AbslStrViewToOTelStrView(pair->second))) {
return false;
}
}
}
for (const auto& pair : additional_labels_) {
if (!callback(AbslStrViewToOTelStrView(pair.first),
AbslStrViewToOTelStrView(pair.second))) {
return false;
}
}
return true;
}
size_t size() const noexcept override {
return (local_labels_iterable_ != nullptr ? local_labels_iterable_->Size()
: 0) +
(peer_labels_iterable_ != nullptr ? peer_labels_iterable_->Size()
: 0) +
additional_labels_.size();
}
private:
LabelsIterable* local_labels_iterable_;
LabelsIterable* peer_labels_iterable_;
absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels_;
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_OTEL_KEY_VALUE_ITERABLE_H

@ -23,6 +23,7 @@
#include <stdint.h>
#include <memory>
#include <string>
#include "absl/base/thread_annotations.h"
@ -41,6 +42,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/otel/otel_client_filter.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
@ -68,14 +70,14 @@ class OpenTelemetryCallTracer : public grpc_core::ClientCallTracer {
}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
grpc_metadata_batch* send_initial_metadata) override;
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override;
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
grpc_metadata_batch* recv_initial_metadata) override;
void RecordReceivedMessage(
const grpc_core::SliceBuffer& recv_message) override;
void RecordReceivedDecompressedMessage(
@ -93,6 +95,8 @@ class OpenTelemetryCallTracer : public grpc_core::ClientCallTracer {
const bool arena_allocated_;
// Start time (for measuring latency).
absl::Time start_time_;
std::unique_ptr<LabelsIterable> local_labels_;
std::unique_ptr<LabelsIterable> peer_labels_;
};
explicit OpenTelemetryCallTracer(OpenTelemetryClientFilter* parent,
@ -124,7 +128,6 @@ class OpenTelemetryCallTracer : public grpc_core::ClientCallTracer {
const OpenTelemetryClientFilter* parent_;
// Client method.
grpc_core::Slice path_;
absl::string_view method_;
grpc_core::Arena* arena_;
grpc_core::Mutex mu_;
// Non-transparent attempts per call

@ -20,13 +20,13 @@
#include "src/cpp/ext/otel/otel_client_filter.h"
#include <array>
#include <functional>
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
@ -34,21 +34,25 @@
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "opentelemetry/context/context.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/cpp/ext/otel/key_value_iterable.h"
#include "src/cpp/ext/otel/otel_call_tracer.h"
#include "src/cpp/ext/otel/otel_plugin.h"
@ -100,11 +104,34 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
// TODO(yashykt): Figure out how to get this to work with absl::string_view
// We don't have the peer labels at this point.
if (OTelPluginState().labels_injector != nullptr) {
local_labels_ = OTelPluginState().labels_injector->GetLocalLabels();
}
if (OTelPluginState().client.attempt.started != nullptr) {
OTelPluginState().client.attempt.started->Add(
1, {{std::string(OTelMethodKey()), std::string(parent_->method_)},
{std::string(OTelTargetKey()), parent_->parent_->target()}});
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {
{{OTelMethodKey(),
absl::StripPrefix(parent_->path_.as_string_view(), "/")},
{OTelTargetKey(), parent_->parent_->target()}}};
KeyValueIterable labels(local_labels_.get(), peer_labels_.get(),
additional_labels);
OTelPluginState().client.attempt.started->Add(1, labels);
}
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
peer_labels_ =
OTelPluginState().labels_injector->GetPeerLabels(recv_initial_metadata);
}
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata);
}
}
@ -138,13 +165,19 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) {
absl::InlinedVector<std::pair<std::string, std::string>, 2> attributes = {
{std::string(OTelMethodKey()), std::string(parent_->method_)},
{std::string(OTelStatusKey()), absl::StatusCodeToString(status.code())},
{std::string(OTelTargetKey()), parent_->parent_->target()}};
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {
{{OTelMethodKey(),
absl::StripPrefix(parent_->path_.as_string_view(), "/")},
{OTelTargetKey(), parent_->parent_->target()},
{OTelStatusKey(),
grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(local_labels_.get(), peer_labels_.get(),
additional_labels);
if (OTelPluginState().client.attempt.duration != nullptr) {
OTelPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), attributes,
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.sent_total_compressed_message_size !=
@ -153,7 +186,7 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
attributes, opentelemetry::context::Context{});
labels, opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.rcvd_total_compressed_message_size !=
nullptr) {
@ -161,7 +194,7 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
attributes, opentelemetry::context::Context{});
labels, opentelemetry::context::Context{});
}
}
@ -194,10 +227,7 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordAnnotation(
OpenTelemetryCallTracer::OpenTelemetryCallTracer(
OpenTelemetryClientFilter* parent, grpc_core::Slice path,
grpc_core::Arena* arena)
: parent_(parent),
path_(std::move(path)),
method_(absl::StripPrefix(path_.as_string_view(), "/")),
arena_(arena) {}
: parent_(parent), path_(std::move(path)), arena_(arena) {}
OpenTelemetryCallTracer::~OpenTelemetryCallTracer() {}

@ -25,6 +25,7 @@
#include <utility>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
@ -47,7 +48,7 @@ class OpenTelemetryClientFilter : public grpc_core::ChannelFilter {
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override;
const std::string& target() const { return target_; }
absl::string_view target() const { return target_; }
private:
explicit OpenTelemetryClientFilter(std::string target)

@ -114,6 +114,12 @@ OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableMetrics(
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector) {
labels_injector_ = std::move(labels_injector);
return *this;
}
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider = meter_provider_;
@ -164,6 +170,7 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
meter->CreateUInt64Histogram(std::string(
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName()));
}
g_otel_plugin_state_->labels_injector = std::move(labels_injector_);
g_otel_plugin_state_->meter_provider = std::move(meter_provider);
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenTelemetryServerCallTracerFactory);

@ -21,20 +21,64 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <string>
#include <utility>
#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc {
namespace internal {
// An iterable container interface that can be used as a return type for the
// OTel plugin's label injector.
class LabelsIterable {
public:
virtual ~LabelsIterable() = default;
// Returns the key-value label at the current position or absl::nullopt if the
// iterator has reached the end.
virtual absl::optional<std::pair<absl::string_view, absl::string_view>>
Next() = 0;
virtual size_t Size() const = 0;
// Resets position of iterator to the start.
virtual void ResetIteratorPosition() = 0;
};
// An interface that allows you to add additional labels on the calls traced
// through the OTel plugin.
class LabelsInjector {
public:
virtual ~LabelsInjector() {}
// Read the incoming initial metadata to get the set of labels to be added to
// metrics. (Does not include the local labels.)
virtual std::unique_ptr<LabelsIterable> GetPeerLabels(
grpc_metadata_batch* incoming_initial_metadata) = 0;
// Get the local labels to be added to metrics. To be used when the peer
// metadata is not available, for example, for started RPCs metric.
// It is the responsibility of the implementation to make sure that the
// backing store for the absl::string_view remains valid for the lifetime of
// gRPC.
virtual std::unique_ptr<LabelsIterable> GetLocalLabels() = 0;
// Modify the outgoing initial metadata with metadata information to be sent
// to the peer.
virtual void AddLabels(grpc_metadata_batch* outgoing_initial_metadata) = 0;
};
struct OTelPluginState {
struct Client {
struct Attempt {
@ -58,6 +102,7 @@ struct OTelPluginState {
} server;
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider;
std::unique_ptr<LabelsInjector> labels_injector;
};
const struct OTelPluginState& OTelPluginState();
@ -90,6 +135,10 @@ class OpenTelemetryPluginBuilder {
OpenTelemetryPluginBuilder& DisableMetrics(
const absl::flat_hash_set<absl::string_view>& metric_names);
// Builds and registers the OTel Plugin
OpenTelemetryPluginBuilder& SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector);
void BuildAndRegisterGlobal();
// The base set of metrics -
@ -105,6 +154,7 @@ class OpenTelemetryPluginBuilder {
private:
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider_;
std::unique_ptr<LabelsInjector> labels_injector_;
absl::flat_hash_set<std::string> metrics_;
};

@ -20,27 +20,29 @@
#include "src/cpp/ext/otel/otel_server_call_tracer.h"
#include <array>
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "opentelemetry/context/context.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/otel/key_value_iterable.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
@ -52,7 +54,12 @@ namespace {
class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
public:
OpenTelemetryServerCallTracer() : start_time_(absl::Now()) {}
OpenTelemetryServerCallTracer() : start_time_(absl::Now()) {
// We don't have the peer labels at this point.
if (OTelPluginState().labels_injector != nullptr) {
local_labels_ = OTelPluginState().labels_injector->GetLocalLabels();
}
}
std::string TraceId() override {
// Not implemented
@ -72,7 +79,13 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
grpc_metadata_batch* send_initial_metadata) override {
// Only add labels to outgoing metadata if labels were received from peer.
if (OTelPluginState().labels_injector != nullptr &&
peer_labels_ != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata);
}
}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override;
@ -119,34 +132,28 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
}
private:
grpc_core::Slice path_;
absl::string_view method_;
std::string authority_;
absl::Time start_time_;
absl::Duration elapsed_time_;
grpc_core::Slice path_;
std::unique_ptr<LabelsIterable> local_labels_;
std::unique_ptr<LabelsIterable> peer_labels_;
};
void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
const auto* path =
recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata());
if (path != nullptr) {
path_ = path->Ref();
}
method_ = absl::StripPrefix(path_.as_string_view(), "/");
const auto* authority =
recv_initial_metadata->get_pointer(grpc_core::HttpAuthorityMetadata());
// Override with host metadata if authority is absent.
if (authority == nullptr) {
authority = recv_initial_metadata->get_pointer(grpc_core::HostMetadata());
}
if (authority != nullptr) {
authority_ = std::string(authority->as_string_view());
}
// TODO(yashykt): Figure out how to get this to work with absl::string_view
path_ =
recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata())->Ref();
if (OTelPluginState().labels_injector != nullptr) {
peer_labels_ =
OTelPluginState().labels_injector->GetPeerLabels(recv_initial_metadata);
}
std::array<std::pair<absl::string_view, absl::string_view>, 1>
additional_labels = {
{{OTelMethodKey(), absl::StripPrefix(path_.as_string_view(), "/")}}};
if (OTelPluginState().server.call.started != nullptr) {
OTelPluginState().server.call.started->Add(
1, {{std::string(OTelMethodKey()), std::string(method_)}});
1, KeyValueIterable(local_labels_.get(), peer_labels_.get(),
additional_labels));
}
}
@ -159,27 +166,29 @@ void OpenTelemetryServerCallTracer::RecordSendTrailingMetadata(
void OpenTelemetryServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
absl::InlinedVector<std::pair<std::string, std::string>, 2> attributes = {
{std::string(OTelMethodKey()), std::string(method_)},
{std::string(OTelStatusKey()),
absl::StatusCodeToString(
static_cast<absl::StatusCode>(final_info->final_status))}};
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {
{{OTelMethodKey(), absl::StripPrefix(path_.as_string_view(), "/")},
{OTelStatusKey(),
grpc_status_code_to_string(final_info->final_status)}}};
KeyValueIterable labels(local_labels_.get(), peer_labels_.get(),
additional_labels);
if (OTelPluginState().server.call.duration != nullptr) {
OTelPluginState().server.call.duration->Record(
absl::ToDoubleSeconds(elapsed_time_), attributes,
absl::ToDoubleSeconds(elapsed_time_), labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().server.call.sent_total_compressed_message_size !=
nullptr) {
OTelPluginState().server.call.sent_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.outgoing.data_bytes,
attributes, opentelemetry::context::Context{});
final_info->stats.transport_stream_stats.outgoing.data_bytes, labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().server.call.rcvd_total_compressed_message_size !=
nullptr) {
OTelPluginState().server.call.rcvd_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.incoming.data_bytes,
attributes, opentelemetry::context::Context{});
final_info->stats.transport_stream_stats.incoming.data_bytes, labels,
opentelemetry::context::Context{});
}
}

@ -40,3 +40,24 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "metadata_exchange_test",
srcs = [
"metadata_exchange_test.cc",
],
external_deps = [
"google_cloud_cpp:experimental-opentelemetry",
"gtest",
"otel/sdk/src/metrics",
],
language = "C++",
tags = [
],
deps = [
"//:grpc++",
"//src/cpp/ext/gsm:gsm_observability",
"//test/core/util:grpc_test_util",
"//test/cpp/ext/otel:otel_test_library",
],
)

@ -21,6 +21,7 @@
#include "google/cloud/opentelemetry/resource_detector.h"
#include "gtest/gtest.h"
#include "src/core/lib/gprpp/env.h"
#include "test/core/util/test_config.h"
namespace grpc {

@ -0,0 +1,232 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/cpp/ext/gsm/metadata_exchange.h"
#include "absl/functional/any_invocable.h"
#include "api/include/opentelemetry/metrics/provider.h"
#include "gmock/gmock.h"
#include "google/cloud/opentelemetry/resource_detector.h"
#include "gtest/gtest.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/env.h"
#include "src/cpp/ext/gsm/gsm_observability.h"
#include "src/cpp/ext/otel/otel_plugin.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/ext/otel/otel_test_library.h"
namespace grpc {
namespace testing {
namespace {
class TestScenario {
public:
enum class Type : std::uint8_t { kGke, kUnknown };
explicit TestScenario(Type type) : type_(type) {}
opentelemetry::sdk::resource::Resource GetTestResource() const {
switch (type_) {
case Type::kGke:
return TestGkeResource();
case Type::kUnknown:
return TestUnknownResource();
}
}
static std::string Name(const ::testing::TestParamInfo<TestScenario>& info) {
switch (info.param.type_) {
case Type::kGke:
return "gke";
case Type::kUnknown:
return "unknown";
}
}
Type type() const { return type_; }
private:
static opentelemetry::sdk::resource::Resource TestGkeResource() {
opentelemetry::sdk::common::AttributeMap attributes;
attributes.SetAttribute("cloud.platform", "gcp_kubernetes_engine");
attributes.SetAttribute("k8s.pod.name", "pod");
attributes.SetAttribute("k8s.container.name", "container");
attributes.SetAttribute("k8s.namespace.name", "namespace");
attributes.SetAttribute("k8s.cluster.name", "cluster");
attributes.SetAttribute("cloud.region", "region");
attributes.SetAttribute("cloud.account.id", "id");
return opentelemetry::sdk::resource::Resource::Create(attributes);
}
static opentelemetry::sdk::resource::Resource TestUnknownResource() {
opentelemetry::sdk::common::AttributeMap attributes;
attributes.SetAttribute("cloud.platform", "random");
return opentelemetry::sdk::resource::Resource::Create(attributes);
}
Type type_;
};
class MetadataExchangeTest
: public OTelPluginEnd2EndTest,
public ::testing::WithParamInterface<TestScenario> {
protected:
void Init(const absl::flat_hash_set<absl::string_view>& metric_names) {
OTelPluginEnd2EndTest::Init(
metric_names, /*resource=*/GetParam().GetTestResource(),
/*labels_injector=*/
std::make_unique<grpc::internal::ServiceMeshLabelsInjector>(
GetParam().GetTestResource().GetAttributes()));
}
void VerifyGkeServiceMeshAttributes(
const std::map<std::string,
opentelemetry::sdk::common::OwnedAttributeValue>&
attributes,
bool local_only = false) {
if (!local_only) {
switch (GetParam().type()) {
case TestScenario::Type::kGke:
EXPECT_EQ(
absl::get<std::string>(attributes.at("gsm.remote_workload_type")),
"gcp_kubernetes_engine");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_pod_name")),
"pod");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_container_name")),
"container");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_namespace_name")),
"namespace");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_cluster_name")),
"cluster");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_location")),
"region");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_project_id")),
"id");
EXPECT_EQ(absl::get<std::string>(
attributes.at("gsm.remote_workload_canonical_service")),
"canonical_service");
break;
case TestScenario::Type::kUnknown:
EXPECT_EQ(
absl::get<std::string>(attributes.at("gsm.remote_workload_type")),
"random");
break;
}
}
}
};
TEST_P(MetadataExchangeTest, ClientAttemptStarted) {
Init(/*metric_names=*/{
grpc::internal::OTelClientAttemptStartedInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data = absl::get_if<opentelemetry::sdk::metrics::SumPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
auto client_started_value = absl::get_if<int64_t>(&point_data->value_);
ASSERT_NE(client_started_value, nullptr);
EXPECT_EQ(*client_started_value, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.method")), kMethodName);
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.target")),
canonical_server_address_);
VerifyGkeServiceMeshAttributes(attributes, /*local_only=*/true);
}
TEST_P(MetadataExchangeTest, ClientAttemptDuration) {
Init(/*metric_names=*/{
grpc::internal::OTelClientAttemptDurationInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.method")), kMethodName);
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.target")),
canonical_server_address_);
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.status")), "OK");
VerifyGkeServiceMeshAttributes(attributes);
}
TEST_P(MetadataExchangeTest, ServerCallDuration) {
Init(/*metric_names=*/{
grpc::internal::OTelServerCallDurationInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.method")), kMethodName);
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.status")), "OK");
VerifyGkeServiceMeshAttributes(attributes);
}
INSTANTIATE_TEST_SUITE_P(
MetadataExchange, MetadataExchangeTest,
::testing::Values(TestScenario(TestScenario::Type::kGke),
TestScenario(TestScenario::Type::kUnknown)),
&TestScenario::Name);
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
grpc_core::SetEnv("GSM_CANONICAL_SERVICE_NAME", "canonical_service");
return RUN_ALL_TESTS();
}

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
licenses(["notice"])
@ -21,6 +21,31 @@ grpc_package(
visibility = "tests",
)
grpc_cc_library(
name = "otel_test_library",
testonly = 1,
srcs = [
"otel_test_library.cc",
],
hdrs = [
"otel_test_library.h",
],
external_deps = [
"gtest",
"otel/api",
"otel/sdk/src/metrics",
],
language = "C++",
tags = [
],
deps = [
"//:grpc++",
"//src/cpp/ext/otel:otel_plugin",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:test_service_impl",
],
)
grpc_cc_test(
name = "otel_plugin_test",
srcs = [
@ -35,6 +60,7 @@ grpc_cc_test(
tags = [
],
deps = [
":otel_test_library",
"//:grpc++",
"//src/cpp/ext/otel:otel_plugin",
"//test/core/util:grpc_test_util",

@ -31,6 +31,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/ext/otel/otel_test_library.h"
namespace grpc {
namespace testing {
@ -44,120 +45,8 @@ TEST(OTelPluginBuildTest, SdkDependency) {
opentelemetry::sdk::metrics::MeterProvider();
}
class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader {
public:
opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(
opentelemetry::sdk::metrics::InstrumentType) const noexcept override {
return opentelemetry::sdk::metrics::AggregationTemporality::kDelta;
}
bool OnForceFlush(std::chrono::microseconds) noexcept override {
return true;
}
bool OnShutDown(std::chrono::microseconds) noexcept override { return true; }
void OnInitialized() noexcept override {}
};
class OTelPluginEnd2EndTest : public ::testing::Test {
protected:
using ::testing::Test::SetUp;
void SetUp(const absl::flat_hash_set<absl::string_view>& metric_names,
bool test_no_meter_provider = false) {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
grpc_core::CoreConfiguration::Reset();
grpc::internal::OpenTelemetryPluginBuilder ot_builder;
ot_builder.EnableMetrics(metric_names);
if (!test_no_meter_provider) {
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
reader_.reset(new grpc::testing::MockMetricReader);
meter_provider->AddMetricReader(reader_);
ot_builder.SetMeterProvider(std::move(meter_provider));
}
ot_builder.BuildAndRegisterGlobal();
grpc_init();
grpc::ServerBuilder builder;
int port;
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("localhost:", port);
canonical_server_address_ = absl::StrCat("dns:///", server_address_);
stub_ = EchoTestService::NewStub(grpc::CreateChannel(
server_address_, grpc::InsecureChannelCredentials()));
}
void TearDown() override {
server_->Shutdown();
grpc_shutdown_blocking();
delete grpc_core::ServerCallTracerFactory::Get(grpc_core::ChannelArgs());
grpc_core::ServerCallTracerFactory::RegisterGlobal(nullptr);
}
void ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(channel);
}
void SendRPC() {
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
grpc::Status status = stub_->Echo(&context, request, &response);
}
absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
ReadCurrentMetricsData(
absl::AnyInvocable<
bool(const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
continue_predicate) {
absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
data;
auto deadline = absl::Now() + absl::Seconds(5);
do {
reader_->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
rm.scope_metric_data_) {
for (const opentelemetry::sdk::metrics::MetricData& md :
smd.metric_data_) {
for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
md.point_data_attr_) {
data[md.instrument_descriptor.name_].push_back(dp);
}
}
}
return true;
});
} while (continue_predicate(data) && deadline > absl::Now());
return data;
}
const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo";
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_;
std::string server_address_;
std::string canonical_server_address_;
CallbackTestServiceImpl service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<EchoTestService::Stub> stub_;
};
TEST_F(OTelPluginEnd2EndTest, ClientAttemptStarted) {
SetUp({grpc::internal::OTelClientAttemptStartedInstrumentName()});
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
@ -185,7 +74,7 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptStarted) {
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptDuration) {
SetUp({grpc::internal::OTelClientAttemptDurationInstrumentName()});
Init({grpc::internal::OTelClientAttemptDurationInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
@ -216,8 +105,8 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptDuration) {
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptSentTotalCompressedMessageSize) {
SetUp({grpc::internal::
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName()});
Init({grpc::internal::
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName()});
SendRPC();
const char* kMetricName =
"grpc.client.attempt.sent_total_compressed_message_size";
@ -249,8 +138,8 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptSentTotalCompressedMessageSize) {
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptRcvdTotalCompressedMessageSize) {
SetUp({grpc::internal::
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName()});
Init({grpc::internal::
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName()});
SendRPC();
const char* kMetricName =
"grpc.client.attempt.rcvd_total_compressed_message_size";
@ -282,7 +171,7 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptRcvdTotalCompressedMessageSize) {
}
TEST_F(OTelPluginEnd2EndTest, ServerCallStarted) {
SetUp({grpc::internal::OTelServerCallStartedInstrumentName()});
Init({grpc::internal::OTelServerCallStartedInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.server.call.started";
auto data = ReadCurrentMetricsData(
@ -306,7 +195,7 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallStarted) {
}
TEST_F(OTelPluginEnd2EndTest, ServerCallDuration) {
SetUp({grpc::internal::OTelServerCallDurationInstrumentName()});
Init({grpc::internal::OTelServerCallDurationInstrumentName()});
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
@ -333,8 +222,8 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallDuration) {
}
TEST_F(OTelPluginEnd2EndTest, ServerCallSentTotalCompressedMessageSize) {
SetUp({grpc::internal::
OTelServerCallSentTotalCompressedMessageSizeInstrumentName()});
Init({grpc::internal::
OTelServerCallSentTotalCompressedMessageSizeInstrumentName()});
SendRPC();
const char* kMetricName =
"grpc.server.call.sent_total_compressed_message_size";
@ -362,8 +251,8 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallSentTotalCompressedMessageSize) {
}
TEST_F(OTelPluginEnd2EndTest, ServerCallRcvdTotalCompressedMessageSize) {
SetUp({grpc::internal::
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName()});
Init({grpc::internal::
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName()});
SendRPC();
const char* kMetricName =
"grpc.server.call.rcvd_total_compressed_message_size";
@ -392,8 +281,10 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallRcvdTotalCompressedMessageSize) {
// Make sure that no meter provider results in normal operations.
TEST_F(OTelPluginEnd2EndTest, NoMeterProviderRegistered) {
SetUp({grpc::internal::OTelClientAttemptStartedInstrumentName()},
/*test_no_meter_provider=*/true);
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()},
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/true);
SendRPC();
}

@ -0,0 +1,132 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "test/cpp/ext/otel/otel_test_library.h"
#include "absl/functional/any_invocable.h"
#include "api/include/opentelemetry/metrics/provider.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/core_configuration.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace grpc {
namespace testing {
void OTelPluginEnd2EndTest::Init(
const absl::flat_hash_set<absl::string_view>& metric_names,
opentelemetry::sdk::resource::Resource resource,
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector,
bool test_no_meter_provider) {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(
std::make_unique<opentelemetry::sdk::metrics::ViewRegistry>(),
std::move(resource));
reader_.reset(new grpc::testing::MockMetricReader);
meter_provider->AddMetricReader(reader_);
grpc_core::CoreConfiguration::Reset();
grpc::internal::OpenTelemetryPluginBuilder ot_builder;
ot_builder.EnableMetrics(metric_names);
if (!test_no_meter_provider) {
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
reader_.reset(new grpc::testing::MockMetricReader);
meter_provider->AddMetricReader(reader_);
ot_builder.SetMeterProvider(std::move(meter_provider));
}
ot_builder.SetLabelsInjector(std::move(labels_injector));
ot_builder.BuildAndRegisterGlobal();
grpc_init();
grpc::ServerBuilder builder;
int port;
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("localhost:", port);
canonical_server_address_ = absl::StrCat("dns:///", server_address_);
stub_ = EchoTestService::NewStub(
grpc::CreateChannel(server_address_, grpc::InsecureChannelCredentials()));
}
void OTelPluginEnd2EndTest::TearDown() {
server_->Shutdown();
grpc_shutdown_blocking();
delete grpc_core::ServerCallTracerFactory::Get(grpc_core::ChannelArgs());
grpc_core::ServerCallTracerFactory::RegisterGlobal(nullptr);
}
void OTelPluginEnd2EndTest::ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(std::move(channel));
}
void OTelPluginEnd2EndTest::SendRPC() {
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
grpc::Status status = stub_->Echo(&context, request, &response);
}
absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
OTelPluginEnd2EndTest::ReadCurrentMetricsData(
absl::AnyInvocable<
bool(const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
continue_predicate) {
absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
data;
auto deadline = absl::Now() + absl::Seconds(5);
do {
reader_->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
rm.scope_metric_data_) {
for (const opentelemetry::sdk::metrics::MetricData& md :
smd.metric_data_) {
for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
md.point_data_attr_) {
data[md.instrument_descriptor.name_].push_back(dp);
}
}
}
return true;
});
} while (continue_predicate(data) && deadline > absl::Now());
return data;
}
} // namespace testing
} // namespace grpc

@ -0,0 +1,94 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H
#define GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H
#include "absl/functional/any_invocable.h"
#include "api/include/opentelemetry/metrics/provider.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/cpp/ext/otel/otel_plugin.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace grpc {
namespace testing {
class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader {
public:
opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(
opentelemetry::sdk::metrics::InstrumentType) const noexcept override {
return opentelemetry::sdk::metrics::AggregationTemporality::kDelta;
}
bool OnForceFlush(std::chrono::microseconds) noexcept override {
return true;
}
bool OnShutDown(std::chrono::microseconds) noexcept override { return true; }
void OnInitialized() noexcept override {}
};
class OTelPluginEnd2EndTest : public ::testing::Test {
protected:
// Note that we can't use SetUp() here since we want to send in parameters.
void Init(
const absl::flat_hash_set<absl::string_view>& metric_names,
opentelemetry::sdk::resource::Resource resource =
opentelemetry::sdk::resource::Resource::Create({}),
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector = nullptr,
bool test_no_meter_provider = false);
void TearDown() override;
void ResetStub(std::shared_ptr<Channel> channel);
void SendRPC();
absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
ReadCurrentMetricsData(
absl::AnyInvocable<
bool(const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
continue_predicate);
const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo";
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_;
std::string server_address_;
std::string canonical_server_address_;
CallbackTestServiceImpl service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<EchoTestService::Stub> stub_;
};
} // namespace testing
} // namespace grpc
#endif // GRPC_TEST_CPP_EXT_OTEL_OTEL_TEST_LIBRARY_H

@ -94,6 +94,10 @@ EXTERNAL_DEPS = {
"absl/utility/utility.h": "absl/utility",
"address_sorting/address_sorting.h": "address_sorting",
"google/cloud/opentelemetry/resource_detector.h": "google_cloud_cpp:experimental-opentelemetry",
"opentelemetry/common/attribute_value.h": "otel/api",
"opentelemetry/common/key_value_iterable.h": "otel/api",
"opentelemetry/nostd/function_ref.h": "otel/api",
"opentelemetry/nostd/string_view.h": "otel/api",
"opentelemetry/context/context.h": "otel/api",
"opentelemetry/metrics/meter.h": "otel/api",
"opentelemetry/metrics/meter_provider.h": "otel/api",
@ -102,6 +106,8 @@ EXTERNAL_DEPS = {
"opentelemetry/nostd/shared_ptr.h": "otel/api",
"opentelemetry/nostd/unique_ptr.h": "otel/api",
"opentelemetry/sdk/metrics/meter_provider.h": "otel/sdk/src/metrics",
"opentelemetry/sdk/common/attribute_utils.h": "otel/sdk:headers",
"opentelemetry/sdk/resource/semantic_conventions.h": "otel/sdk:headers",
"ares.h": "cares",
"fuzztest/fuzztest.h": ["fuzztest", "fuzztest_main"],
"google/api/monitored_resource.pb.h": (

Loading…
Cancel
Save