Gcp Observabiliy : Add constant labels support for stats and tracing (#32128)

* Gcp Observabiliy : Add constant labels support for stats and tracing

* Register GCP Observability labels

* sanity

* Fix build for CI

* TEST_TAG_KEY fix

* Remove duplicitous constant label setting
pull/32146/head^2
Yash Tibrewal 2 years ago committed by GitHub
parent ee1d980832
commit 05491fb2f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      include/grpcpp/opencensus.h
  2. 2
      src/cpp/ext/filters/census/channel_filter.cc
  3. 13
      src/cpp/ext/filters/census/client_filter.cc
  4. 31
      src/cpp/ext/filters/census/context.cc
  5. 24
      src/cpp/ext/filters/census/grpc_plugin.cc
  6. 47
      src/cpp/ext/filters/census/grpc_plugin.h
  7. 18
      src/cpp/ext/filters/census/server_filter.cc
  8. 81
      src/cpp/ext/filters/census/views.cc
  9. 6
      src/cpp/ext/gcp/observability.cc
  10. 55
      test/cpp/ext/filters/census/BUILD
  11. 139
      test/cpp/ext/filters/census/constant_labels_test.cc
  12. 33
      test/cpp/ext/filters/census/library.cc
  13. 194
      test/cpp/ext/filters/census/library.h
  14. 165
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc

@ -183,6 +183,13 @@ class CensusContext {
name, parent_ctxt)),
tags_({}) {}
CensusContext(absl::string_view name,
const ::opencensus::trace::SpanContext& parent_ctxt,
const ::opencensus::tags::TagMap& tags)
: span_(::opencensus::trace::Span::StartSpanWithRemoteParent(
name, parent_ctxt)),
tags_(tags) {}
void AddSpanAttribute(absl::string_view key,
opencensus::trace::AttributeValueRef attribute) {
span_.AddAttribute(key, attribute);

@ -29,7 +29,7 @@ namespace internal {
grpc_error_handle OpenCensusChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
OpenCensusExporterRegistry::Get().RunRegistryPostInit();
OpenCensusRegistry::Get().RunFunctionsPostInit();
return absl::OkStatus();
}

@ -72,7 +72,7 @@ constexpr uint32_t
grpc_error_handle OpenCensusClientChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
OpenCensusExporterRegistry::Get().RunRegistryPostInit();
OpenCensusRegistry::Get().RunFunctionsPostInit();
tracing_enabled_ = grpc_core::ChannelArgs::FromC(args->channel_args)
.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY)
.value_or(true);
@ -268,9 +268,9 @@ OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args,
tracing_enabled_(tracing_enabled) {}
OpenCensusCallTracer::~OpenCensusCallTracer() {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
if (OpenCensusStatsEnabled()) {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(method_));
::opencensus::stats::Record(
{{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
@ -329,8 +329,11 @@ void OpenCensusCallTracer::RecordAnnotation(absl::string_view annotation) {
CensusContext OpenCensusCallTracer::CreateCensusContextForCallAttempt() {
if (!OpenCensusTracingEnabled() || !tracing_enabled_) return CensusContext();
GPR_DEBUG_ASSERT(context_.Context().IsValid());
return CensusContext(absl::StrCat("Attempt.", method_), &(context_.Span()),
context_.tags());
auto context = CensusContext(absl::StrCat("Attempt.", method_),
&(context_.Span()), context_.tags());
grpc::internal::OpenCensusRegistry::Get()
.PopulateCensusContextWithConstantAttributes(&context);
return context;
}
} // namespace internal

@ -28,6 +28,7 @@
#include "opencensus/trace/propagation/grpc_trace_bin.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/rpc_encoding.h"
namespace grpc {
@ -44,10 +45,16 @@ void GenerateServerContext(absl::string_view tracing, absl::string_view method,
SpanContext parent_ctx =
opencensus::trace::propagation::FromGrpcTraceBinHeader(tracing);
if (parent_ctx.IsValid()) {
new (context) CensusContext(method, parent_ctx);
return;
new (context) CensusContext(method, parent_ctx,
grpc::internal::OpenCensusRegistry::Get()
.PopulateTagMapWithConstantLabels({}));
} else {
new (context)
CensusContext(method, grpc::internal::OpenCensusRegistry::Get()
.PopulateTagMapWithConstantLabels({}));
}
new (context) CensusContext(method, TagMap{});
grpc::internal::OpenCensusRegistry::Get()
.PopulateCensusContextWithConstantAttributes(context);
}
void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
@ -59,19 +66,27 @@ void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
SpanContext span_ctxt = parent_ctxt->Context();
Span span = parent_ctxt->Span();
if (span_ctxt.IsValid()) {
new (ctxt) CensusContext(method, &span, TagMap{});
new (ctxt) CensusContext(method, &span,
grpc::internal::OpenCensusRegistry::Get()
.PopulateTagMapWithConstantLabels({}));
grpc::internal::OpenCensusRegistry::Get()
.PopulateCensusContextWithConstantAttributes(ctxt);
return;
}
}
const Span& span = opencensus::trace::GetCurrentSpan();
const TagMap& tags = opencensus::tags::GetCurrentTagMap();
const TagMap& tags = grpc::internal::OpenCensusRegistry::Get()
.PopulateTagMapWithConstantLabels(
opencensus::tags::GetCurrentTagMap());
if (span.context().IsValid()) {
// Create span with parent.
new (ctxt) CensusContext(method, &span, tags);
return;
} else {
// Create span without parent.
new (ctxt) CensusContext(method, tags);
}
// Create span without parent.
new (ctxt) CensusContext(method, tags);
grpc::internal::OpenCensusRegistry::Get()
.PopulateCensusContextWithConstantAttributes(ctxt);
}
size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context,

@ -178,14 +178,32 @@ std::atomic<bool> g_open_census_tracing_enabled(true);
} // namespace
//
// OpenCensusExporterRegistry
// OpenCensusRegistry
//
OpenCensusExporterRegistry& OpenCensusExporterRegistry::Get() {
static OpenCensusExporterRegistry* registry = new OpenCensusExporterRegistry;
OpenCensusRegistry& OpenCensusRegistry::Get() {
static OpenCensusRegistry* registry = new OpenCensusRegistry;
return *registry;
}
::opencensus::tags::TagMap OpenCensusRegistry::PopulateTagMapWithConstantLabels(
const ::opencensus::tags::TagMap& tag_map) {
std::vector<std::pair<::opencensus::tags::TagKey, std::string>> tags =
tag_map.tags();
for (const auto& label : constant_labels_) {
tags.emplace_back(label.tag_key, label.value);
}
return ::opencensus::tags::TagMap(std::move(tags));
}
void OpenCensusRegistry::PopulateCensusContextWithConstantAttributes(
grpc::experimental::CensusContext* context) {
// We reuse the constant labels for the attributes
for (const auto& label : constant_labels_) {
context->AddSpanAttribute(label.key, label.value);
}
}
void EnableOpenCensusStats(bool enable) {
g_open_census_stats_enabled = enable;
}

@ -23,10 +23,14 @@
#include <algorithm>
#include <functional>
#include <map>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/call_once.h"
#include "opencensus/tags/tag_key.h"
#include "opencensus/tags/tag_map.h"
#include <grpcpp/opencensus.h>
@ -133,34 +137,57 @@ void EnableOpenCensusTracing(bool enable);
bool OpenCensusStatsEnabled();
bool OpenCensusTracingEnabled();
// Registers a function that would initialize an OpenCensus exporter. This
// function would be run before the first time the OpenCensus plugin is run.
class OpenCensusExporterRegistry {
// Registers various things for the OpenCensus plugin.
class OpenCensusRegistry {
public:
static OpenCensusExporterRegistry& Get();
struct Label {
std::string key;
opencensus::tags::TagKey tag_key;
std::string value;
};
static OpenCensusRegistry& Get();
// Registers the functions to be run post-init.
void Register(std::function<void()> f) {
void RegisterFunctions(std::function<void()> f) {
exporter_registry_.push_back(std::move(f));
}
// Runs the registry post-init exactly once. Protected with an absl::CallOnce.
void RunRegistryPostInit() {
absl::call_once(
once_, &OpenCensusExporterRegistry::RunRegistryPostInitHelper, this);
void RunFunctionsPostInit() {
absl::call_once(once_, &OpenCensusRegistry::RunFunctionsPostInitHelper,
this);
}
void RegisterConstantLabels(
const std::map<std::string, std::string>& labels) {
constant_labels_.reserve(labels.size());
for (const auto& label : labels) {
auto tag_key = opencensus::tags::TagKey::Register(label.first);
constant_labels_.emplace_back(Label{label.first, tag_key, label.second});
}
}
::opencensus::tags::TagMap PopulateTagMapWithConstantLabels(
const ::opencensus::tags::TagMap& tag_map);
void PopulateCensusContextWithConstantAttributes(
grpc::experimental::CensusContext* context);
const std::vector<Label>& constant_labels() const { return constant_labels_; }
private:
void RunRegistryPostInitHelper() {
void RunFunctionsPostInitHelper() {
for (const auto& f : exporter_registry_) {
f();
}
}
OpenCensusExporterRegistry() = default;
OpenCensusRegistry() = default;
std::vector<std::function<void()>> exporter_registry_;
absl::once_flag once_;
std::vector<Label> constant_labels_;
};
} // namespace internal

@ -20,7 +20,9 @@
#include "src/cpp/ext/filters/census/server_filter.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
@ -31,6 +33,7 @@
#include "absl/types/optional.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "opencensus/tags/tag_map.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
@ -117,8 +120,10 @@ void OpenCensusServerCallData::OnDoneRecvInitialMetadataCb(
calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
}
if (OpenCensusStatsEnabled()) {
::opencensus::stats::Record({{RpcServerStartedRpcs(), 1}},
{{ServerMethodTagKey(), calld->method_}});
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
calld->context_.tags().tags();
tags.emplace_back(ServerMethodTagKey(), std::string(calld->method_));
::opencensus::stats::Record({{RpcServerStartedRpcs(), 1}}, tags);
}
}
grpc_core::Closure::Run(DEBUG_LOCATION,
@ -180,14 +185,19 @@ void OpenCensusServerCallData::Destroy(grpc_call_element* /*elem*/,
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ServerMethodTagKey(), std::string(method_));
tags.emplace_back(
ServerStatusTagKey(),
std::string(StatusCodeToString(final_info->final_status)));
::opencensus::stats::Record(
{{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
{RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
{RpcServerServerLatency(), elapsed_time_ms},
{RpcServerSentMessagesPerRpc(), sent_message_count_},
{RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
{{ServerMethodTagKey(), method_},
{ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
tags);
}
if (OpenCensusTracingEnabled()) {
context_.EndSpan();

@ -18,13 +18,15 @@
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <vector>
#include "absl/time/time.h"
#include "opencensus/stats/stats.h"
#include <grpcpp/opencensus.h>
#include "src/cpp/ext/filters/census/grpc_plugin.h"
namespace grpc {
using ::opencensus::stats::Aggregation;
@ -51,19 +53,32 @@ Aggregation MillisDistributionAggregation() {
800, 1000, 2000, 5000, 10000, 20000, 50000, 100000}));
}
void SetConstantLabels(ViewDescriptor* descriptor) {
for (const auto& label :
grpc::internal::OpenCensusRegistry::Get().constant_labels()) {
descriptor->add_column(label.tag_key);
}
}
Aggregation CountDistributionAggregation() {
return Aggregation::Distribution(BucketBoundaries::Exponential(17, 1.0, 2.0));
}
ViewDescriptor MinuteDescriptor() {
ViewDescriptor DefaultViewDescriptor() {
auto descriptor = ViewDescriptor();
SetConstantLabels(&descriptor);
return descriptor;
}
ViewDescriptor MinuteDescriptor() {
auto descriptor = DefaultViewDescriptor();
SetAggregationWindow(AggregationWindow::Interval(absl::Minutes(1)),
&descriptor);
return descriptor;
}
ViewDescriptor HourDescriptor() {
auto descriptor = ViewDescriptor();
auto descriptor = DefaultViewDescriptor();
SetAggregationWindow(AggregationWindow::Interval(absl::Hours(1)),
&descriptor);
return descriptor;
@ -91,7 +106,7 @@ namespace experimental {
// client
const ViewDescriptor& ClientStartedRpcs() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/started_rpcs")
.set_measure(kRpcClientStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
@ -101,7 +116,7 @@ const ViewDescriptor& ClientStartedRpcs() {
const ViewDescriptor& ClientCompletedRpcs() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/completed_rpcs")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(Aggregation::Count())
@ -112,7 +127,7 @@ const ViewDescriptor& ClientCompletedRpcs() {
const ViewDescriptor& ClientRoundtripLatency() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/roundtrip_latency")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
@ -122,7 +137,7 @@ const ViewDescriptor& ClientRoundtripLatency() {
const ViewDescriptor& ClientSentCompressedMessageBytesPerRpc() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/sent_compressed_message_bytes_per_rpc")
.set_measure(kRpcClientSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -133,7 +148,7 @@ const ViewDescriptor& ClientSentCompressedMessageBytesPerRpc() {
const ViewDescriptor& ClientReceivedCompressedMessageBytesPerRpc() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/received_compressed_message_bytes_per_rpc")
.set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -145,7 +160,7 @@ const ViewDescriptor& ClientReceivedCompressedMessageBytesPerRpc() {
// server
const ViewDescriptor& ServerStartedRpcs() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/started_rpcs")
.set_measure(kRpcServerStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
@ -155,7 +170,7 @@ const ViewDescriptor& ServerStartedRpcs() {
const ViewDescriptor& ServerCompletedRpcs() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/completed_rpcs")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(Aggregation::Count())
@ -166,7 +181,7 @@ const ViewDescriptor& ServerCompletedRpcs() {
const ViewDescriptor& ServerSentCompressedMessageBytesPerRpc() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/sent_compressed_message_bytes_per_rpc")
.set_measure(kRpcServerSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -177,7 +192,7 @@ const ViewDescriptor& ServerSentCompressedMessageBytesPerRpc() {
const ViewDescriptor& ServerReceivedCompressedMessageBytesPerRpc() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/received_compressed_message_bytes_per_rpc")
.set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -188,7 +203,7 @@ const ViewDescriptor& ServerReceivedCompressedMessageBytesPerRpc() {
const ViewDescriptor& ServerServerLatency() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/server_latency")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
@ -200,7 +215,7 @@ const ViewDescriptor& ServerServerLatency() {
// client cumulative
const ViewDescriptor& ClientSentBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/sent_bytes_per_rpc/cumulative")
.set_measure(kRpcClientSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -210,7 +225,7 @@ const ViewDescriptor& ClientSentBytesPerRpcCumulative() {
const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/received_bytes_per_rpc/cumulative")
.set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -220,7 +235,7 @@ const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() {
const ViewDescriptor& ClientRoundtripLatencyCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/roundtrip_latency/cumulative")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
@ -230,7 +245,7 @@ const ViewDescriptor& ClientRoundtripLatencyCumulative() {
const ViewDescriptor& ClientServerLatencyCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/server_latency/cumulative")
.set_measure(kRpcClientServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
@ -240,7 +255,7 @@ const ViewDescriptor& ClientServerLatencyCumulative() {
const ViewDescriptor& ClientStartedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/started_rpcs/cumulative")
.set_measure(kRpcClientStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
@ -250,7 +265,7 @@ const ViewDescriptor& ClientStartedRpcsCumulative() {
const ViewDescriptor& ClientCompletedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/completed_rpcs/cumulative")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(Aggregation::Count())
@ -261,7 +276,7 @@ const ViewDescriptor& ClientCompletedRpcsCumulative() {
const ViewDescriptor& ClientSentMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/received_messages_per_rpc/cumulative")
.set_measure(kRpcClientSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
@ -271,7 +286,7 @@ const ViewDescriptor& ClientSentMessagesPerRpcCumulative() {
const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/sent_messages_per_rpc/cumulative")
.set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
@ -281,7 +296,7 @@ const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() {
const ViewDescriptor& ClientRetriesPerCallCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/retries_per_call/cumulative")
.set_measure(kRpcClientRetriesPerCallMeasureName)
.set_aggregation(CountDistributionAggregation())
@ -291,7 +306,7 @@ const ViewDescriptor& ClientRetriesPerCallCumulative() {
const ViewDescriptor& ClientRetriesCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/retries/cumulative")
.set_measure(kRpcClientRetriesPerCallMeasureName)
.set_aggregation(Aggregation::Sum())
@ -301,7 +316,7 @@ const ViewDescriptor& ClientRetriesCumulative() {
const ViewDescriptor& ClientTransparentRetriesPerCallCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/transparent_retries_per_call/cumulative")
.set_measure(kRpcClientTransparentRetriesPerCallMeasureName)
.set_aggregation(CountDistributionAggregation())
@ -311,7 +326,7 @@ const ViewDescriptor& ClientTransparentRetriesPerCallCumulative() {
const ViewDescriptor& ClientTransparentRetriesCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/transparent_retries/cumulative")
.set_measure(kRpcClientTransparentRetriesPerCallMeasureName)
.set_aggregation(Aggregation::Sum())
@ -321,7 +336,7 @@ const ViewDescriptor& ClientTransparentRetriesCumulative() {
const ViewDescriptor& ClientRetryDelayPerCallCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/client/retry_delay_per_call/cumulative")
.set_measure(kRpcClientRetryDelayPerCallMeasureName)
.set_aggregation(MillisDistributionAggregation())
@ -332,7 +347,7 @@ const ViewDescriptor& ClientRetryDelayPerCallCumulative() {
// server cumulative
const ViewDescriptor& ServerSentBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/received_bytes_per_rpc/cumulative")
.set_measure(kRpcServerSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -342,7 +357,7 @@ const ViewDescriptor& ServerSentBytesPerRpcCumulative() {
const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/sent_bytes_per_rpc/cumulative")
.set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
@ -352,7 +367,7 @@ const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() {
const ViewDescriptor& ServerServerLatencyCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/elapsed_time/cumulative")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
@ -362,7 +377,7 @@ const ViewDescriptor& ServerServerLatencyCumulative() {
const ViewDescriptor& ServerStartedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/started_rpcs/cumulative")
.set_measure(kRpcServerStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
@ -372,7 +387,7 @@ const ViewDescriptor& ServerStartedRpcsCumulative() {
const ViewDescriptor& ServerCompletedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/completed_rpcs/cumulative")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(Aggregation::Count())
@ -383,7 +398,7 @@ const ViewDescriptor& ServerCompletedRpcsCumulative() {
const ViewDescriptor& ServerSentMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/received_messages_per_rpc/cumulative")
.set_measure(kRpcServerSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
@ -393,7 +408,7 @@ const ViewDescriptor& ServerSentMessagesPerRpcCumulative() {
const ViewDescriptor& ServerReceivedMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
DefaultViewDescriptor()
.set_name("grpc.io/server/sent_messages_per_rpc/cumulative")
.set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())

@ -86,10 +86,12 @@ absl::Status GcpObservabilityInit() {
!config->cloud_logging.has_value()) {
return absl::OkStatus();
}
grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
config->labels);
grpc::RegisterOpenCensusPlugin();
RegisterOpenCensusViewsForGcpObservability();
if (config->cloud_trace.has_value()) {
grpc::internal::OpenCensusExporterRegistry::Get().Register(
grpc::internal::OpenCensusRegistry::Get().RegisterFunctions(
[cloud_trace = config->cloud_trace.value(),
project_id = config->project_id]() mutable {
opencensus::trace::TraceConfig::SetCurrentTraceParams(
@ -112,7 +114,7 @@ absl::Status GcpObservabilityInit() {
grpc::internal::EnableOpenCensusTracing(false);
}
if (config->cloud_monitoring.has_value()) {
grpc::internal::OpenCensusExporterRegistry::Get().Register(
grpc::internal::OpenCensusRegistry::Get().RegisterFunctions(
[project_id = config->project_id]() mutable {
opencensus::exporters::stats::StackdriverOptions stats_opts;
stats_opts.project_id = std::move(project_id);

@ -12,12 +12,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
licenses(["notice"])
grpc_package(name = "test/cpp/ext/filters/census")
grpc_cc_library(
name = "library",
testonly = 1,
srcs = ["library.cc"],
hdrs = ["library.h"],
external_deps = [
"gtest",
"opencensus-stats-test",
"opencensus-tags",
"opencensus-with-tag-map",
],
language = "C++",
deps = [
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:test_lb_policies",
"//test/cpp/end2end:test_service_impl",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "grpc_opencensus_plugin_test",
srcs = [
@ -34,12 +58,37 @@ grpc_cc_test(
linkstatic = True,
tags = ["no_windows"], # TODO(jtattermusch): fix test on windows
deps = [
"library",
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "constant_labels_test",
srcs = [
"constant_labels_test.cc",
],
external_deps = [
"gtest",
"opencensus-stats-test",
"opencensus-tags",
"opencensus-with-tag-map",
],
flaky = True,
language = "C++",
linkstatic = True,
tags = ["no_windows"], # TODO(jtattermusch): fix test on windows
deps = [
"library",
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:test_lb_policies",
"//test/cpp/end2end:test_service_impl",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],

@ -0,0 +1,139 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "absl/strings/string_view.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opencensus/stats/stats.h"
#include "opencensus/stats/testing/test_utils.h"
#include "opencensus/tags/tag_map.h"
#include <grpc++/grpc++.h>
#include <grpcpp/opencensus.h>
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/cpp/ext/filters/census/library.h"
namespace grpc {
namespace testing {
namespace {
using ::opencensus::stats::View;
using ::opencensus::stats::testing::TestUtils;
class ConstantLabelsTest : public StatsPluginEnd2EndTest {
protected:
static void SetUpTestSuite() {
grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
{{"key", "value"}});
StatsPluginEnd2EndTest::SetUpTestSuite();
}
};
// Check that constant labels registered to OpenCensus are exported.
TEST_F(ConstantLabelsTest, ConstantLabelsTest) {
View client_completed_rpcs_view(ClientCompletedRpcsCumulative());
View server_completed_rpcs_view(ServerCompletedRpcsCumulative());
EchoRequest request;
request.set_message("foo");
EchoResponse response;
{
grpc::ClientContext context;
grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
EXPECT_THAT(
client_completed_rpcs_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre("value", client_method_name_, "OK"), 1)));
EXPECT_THAT(
server_completed_rpcs_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre("value", server_method_name_, "OK"), 1)));
}
TEST_F(ConstantLabelsTest, ConstantAttributesTest) {
{
// Client spans are ended when the ClientContext's destructor is invoked.
auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
ResetStub(channel);
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
::opencensus::trace::AlwaysSampler always_sampler;
::opencensus::trace::StartSpanOptions options;
options.sampler = &always_sampler;
auto sampling_span =
::opencensus::trace::Span::StartSpan("sampling", nullptr, options);
grpc::CensusContext app_census_context("root", &sampling_span,
::opencensus::tags::TagMap{});
context.set_census_context(
reinterpret_cast<census_context*>(&app_census_context));
context.AddMetadata(kExpectedTraceIdKey,
app_census_context.Span().context().trace_id().ToHex());
traces_recorder_->StartRecording();
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
traces_recorder_->StopRecording();
auto recorded_spans = traces_recorder_->GetAndClearSpans();
// We never ended the two spans created in the scope above, so we don't
// expect them to be exported.
for (const auto& span : recorded_spans) {
bool found = false;
for (const auto& attribute : span.attributes()) {
if (attribute.first == "key" &&
attribute.second.string_value() == "value") {
found = true;
break;
}
}
EXPECT_TRUE(found);
}
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,33 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "test/cpp/ext/filters/census/library.h"
namespace grpc {
namespace testing {
const ::opencensus::tags::TagKey TEST_TAG_KEY =
::opencensus::tags::TagKey::Register("my_key");
const char* TEST_TAG_VALUE = "my_value";
const char* kExpectedTraceIdKey = "expected_trace_id";
ExportedTracesRecorder* StatsPluginEnd2EndTest::traces_recorder_ =
new ExportedTracesRecorder();
} // namespace testing
} // namespace grpc

@ -0,0 +1,194 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "absl/strings/str_cat.h"
#include "gtest/gtest.h"
#include "opencensus/stats/stats.h"
#include "opencensus/trace/exporter/span_exporter.h"
#include <grpc++/grpc++.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/config/core_configuration.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace opencensus {
namespace trace {
namespace exporter {
class SpanExporterTestPeer {
public:
static constexpr auto& ExportForTesting = SpanExporter::ExportForTesting;
};
} // namespace exporter
} // namespace trace
} // namespace opencensus
namespace grpc {
namespace testing {
extern const ::opencensus::tags::TagKey TEST_TAG_KEY;
extern const char* TEST_TAG_VALUE;
extern const char* kExpectedTraceIdKey;
class EchoServer final : public TestServiceImpl {
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
CheckMetadata(context);
return TestServiceImpl::Echo(context, request, response);
}
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
CheckMetadata(context);
return TestServiceImpl::BidiStream(context, stream);
}
private:
void CheckMetadata(ServerContext* context) {
for (const auto& metadata : context->client_metadata()) {
if (metadata.first == kExpectedTraceIdKey) {
EXPECT_EQ(metadata.second, reinterpret_cast<const CensusContext*>(
context->census_context())
->Span()
.context()
.trace_id()
.ToHex());
break;
}
}
}
};
// A handler that records exported traces. Traces can later be retrieved and
// inspected.
class ExportedTracesRecorder
: public ::opencensus::trace::exporter::SpanExporter::Handler {
public:
ExportedTracesRecorder() : is_recording_(false) {}
void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans)
override {
absl::MutexLock lock(&mutex_);
if (is_recording_) {
for (auto const& span : spans) {
recorded_spans_.push_back(span);
}
}
}
void StartRecording() {
absl::MutexLock lock(&mutex_);
ASSERT_FALSE(is_recording_);
is_recording_ = true;
}
void StopRecording() {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(is_recording_);
is_recording_ = false;
}
std::vector<::opencensus::trace::exporter::SpanData> GetAndClearSpans() {
absl::MutexLock lock(&mutex_);
return std::move(recorded_spans_);
}
private:
// This mutex is necessary as the SpanExporter runs a loop on a separate
// thread which periodically exports spans.
absl::Mutex mutex_;
bool is_recording_ ABSL_GUARDED_BY(mutex_);
std::vector<::opencensus::trace::exporter::SpanData> recorded_spans_
ABSL_GUARDED_BY(mutex_);
};
class StatsPluginEnd2EndTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
grpc_core::CoreConfiguration::Reset();
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::RegisterQueueOnceLoadBalancingPolicy(builder);
});
RegisterOpenCensusPlugin();
// OpenCensus C++ has no API to unregister a previously-registered handler,
// therefore we register this handler once, and enable/disable recording in
// the individual tests.
::opencensus::trace::exporter::SpanExporter::RegisterHandler(
absl::WrapUnique(traces_recorder_));
}
static void TearDownTestSuite() {
grpc_shutdown();
grpc_core::CoreConfiguration::Reset();
}
void SetUp() override {
// Set up a synchronous server on a different thread to avoid the asynch
// interface.
grpc::ServerBuilder builder;
int port;
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("localhost:", port);
server_thread_ = std::thread(&StatsPluginEnd2EndTest::RunServerLoop, this);
stub_ = EchoTestService::NewStub(grpc::CreateChannel(
server_address_, grpc::InsecureChannelCredentials()));
// Clear out any previous spans
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
}
void ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(channel);
}
void TearDown() override {
server_->Shutdown();
server_thread_.join();
}
void RunServerLoop() { server_->Wait(); }
const std::string client_method_name_ = "grpc.testing.EchoTestService/Echo";
const std::string server_method_name_ = "grpc.testing.EchoTestService/Echo";
std::string server_address_;
EchoServer service_;
std::unique_ptr<grpc::Server> server_;
std::thread server_thread_;
std::unique_ptr<EchoTestService::Stub> stub_;
static ExportedTracesRecorder* traces_recorder_;
};
} // namespace testing
} // namespace grpc

@ -25,36 +25,19 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opencensus/stats/stats.h"
#include "opencensus/stats/tag_key.h"
#include "opencensus/stats/testing/test_utils.h"
#include "opencensus/tags/tag_map.h"
#include "opencensus/tags/with_tag_map.h"
#include "opencensus/trace/exporter/span_exporter.h"
#include <grpc++/grpc++.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace opencensus {
namespace trace {
namespace exporter {
class SpanExporterTestPeer {
public:
static constexpr auto& ExportForTesting = SpanExporter::ExportForTesting;
};
} // namespace exporter
} // namespace trace
} // namespace opencensus
#include "test/cpp/ext/filters/census/library.h"
namespace grpc {
namespace testing {
@ -65,154 +48,8 @@ using ::opencensus::stats::Distribution;
using ::opencensus::stats::View;
using ::opencensus::stats::ViewDescriptor;
using ::opencensus::stats::testing::TestUtils;
using ::opencensus::tags::TagKey;
using ::opencensus::tags::WithTagMap;
const auto TEST_TAG_KEY = TagKey::Register("my_key");
const auto TEST_TAG_VALUE = "my_value";
const char* kExpectedTraceIdKey = "expected_trace_id";
class EchoServer final : public TestServiceImpl {
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
CheckMetadata(context);
return TestServiceImpl::Echo(context, request, response);
}
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
CheckMetadata(context);
return TestServiceImpl::BidiStream(context, stream);
}
private:
void CheckMetadata(ServerContext* context) {
for (const auto& metadata : context->client_metadata()) {
if (metadata.first == kExpectedTraceIdKey) {
EXPECT_EQ(metadata.second, reinterpret_cast<const CensusContext*>(
context->census_context())
->Span()
.context()
.trace_id()
.ToHex());
break;
}
}
}
};
// A handler that records exported traces. Traces can later be retrieved and
// inspected.
class ExportedTracesRecorder
: public ::opencensus::trace::exporter::SpanExporter::Handler {
public:
ExportedTracesRecorder() : is_recording_(false) {}
void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans)
override {
absl::MutexLock lock(&mutex_);
if (is_recording_) {
for (auto const& span : spans) {
recorded_spans_.push_back(span);
}
}
}
void StartRecording() {
absl::MutexLock lock(&mutex_);
ASSERT_FALSE(is_recording_);
is_recording_ = true;
}
void StopRecording() {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(is_recording_);
is_recording_ = false;
}
std::vector<::opencensus::trace::exporter::SpanData> GetAndClearSpans() {
absl::MutexLock lock(&mutex_);
return std::move(recorded_spans_);
}
private:
// This mutex is necessary as the SpanExporter runs a loop on a separate
// thread which periodically exports spans.
absl::Mutex mutex_;
bool is_recording_ ABSL_GUARDED_BY(mutex_);
std::vector<::opencensus::trace::exporter::SpanData> recorded_spans_
ABSL_GUARDED_BY(mutex_);
};
class StatsPluginEnd2EndTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
grpc_core::CoreConfiguration::Reset();
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::RegisterQueueOnceLoadBalancingPolicy(builder);
});
RegisterOpenCensusPlugin();
// OpenCensus C++ has no API to unregister a previously-registered handler,
// therefore we register this handler once, and enable/disable recording in
// the individual tests.
::opencensus::trace::exporter::SpanExporter::RegisterHandler(
absl::WrapUnique(traces_recorder_));
}
static void TearDownTestSuite() {
grpc_shutdown();
grpc_core::CoreConfiguration::Reset();
}
void SetUp() override {
// Set up a synchronous server on a different thread to avoid the asynch
// interface.
grpc::ServerBuilder builder;
int port;
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("localhost:", port);
server_thread_ = std::thread(&StatsPluginEnd2EndTest::RunServerLoop, this);
stub_ = EchoTestService::NewStub(grpc::CreateChannel(
server_address_, grpc::InsecureChannelCredentials()));
// Clear out any previous spans
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
}
void ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(channel);
}
void TearDown() override {
server_->Shutdown();
server_thread_.join();
}
void RunServerLoop() { server_->Wait(); }
const std::string client_method_name_ = "grpc.testing.EchoTestService/Echo";
const std::string server_method_name_ = "grpc.testing.EchoTestService/Echo";
std::string server_address_;
EchoServer service_;
std::unique_ptr<grpc::Server> server_;
std::thread server_thread_;
std::unique_ptr<EchoTestService::Stub> stub_;
static ExportedTracesRecorder* traces_recorder_;
};
ExportedTracesRecorder* StatsPluginEnd2EndTest::traces_recorder_ =
new ExportedTracesRecorder();
TEST_F(StatsPluginEnd2EndTest, ErrorCount) {
const auto client_method_descriptor =
ViewDescriptor()

Loading…
Cancel
Save