OpenCensus: Add annotations for messages (#32646)

This PR adds annotations to client attempt spans and server spans on
messages of the form -
* `Send message: 1026 bytes`
* `Send compressed message: 31 bytes` (if message was compressed)
* `Received message: 31 bytes`
* `Received decompressed message: 1026 bytes` (if message needed to be
decompressed)

Note that the compressed and decompressed annotations are not present if
compression/decompression was not performed.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32687/head
Yash Tibrewal 2 years ago committed by GitHub
parent e4825a5aa2
commit f04e1a9589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 30
      src/cpp/ext/filters/census/client_filter.cc
  3. 9
      src/cpp/ext/filters/census/open_census_call_tracer.h
  4. 22
      src/cpp/ext/filters/census/server_call_tracer.cc
  5. 2
      test/cpp/ext/filters/census/library.h
  6. 118
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc

@ -2214,6 +2214,7 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/time",
"absl/types:optional",
"opencensus-stats",

@ -25,6 +25,7 @@
#include <algorithm>
#include <functional>
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
@ -32,6 +33,7 @@
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
@ -156,15 +158,33 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) {
const grpc_core::SliceBuffer& send_message) {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
++sent_message_count_;
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) {
const grpc_core::SliceBuffer& recv_message) {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
++recv_message_count_;
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
}
namespace {
void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
@ -298,9 +318,9 @@ OpenCensusCallTracer::~OpenCensusCallTracer() {
OpenCensusCallTracer::OpenCensusCallAttemptTracer*
OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
// We allocate the first attempt on the arena and all subsequent attempts on
// the heap, so that in the common case we don't require a heap allocation,
// nor do we unnecessarily grow the arena.
// We allocate the first attempt on the arena and all subsequent attempts
// on the heap, so that in the common case we don't require a heap
// allocation, nor do we unnecessarily grow the arena.
bool is_first_attempt = true;
uint64_t attempt_num;
{

@ -67,16 +67,15 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
grpc_metadata_batch* send_initial_metadata) override;
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) override;
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& /*send_compressed_message*/) override {}
const grpc_core::SliceBuffer& send_compressed_message) override;
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) override;
const grpc_core::SliceBuffer& recv_message) override;
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {}
const grpc_core::SliceBuffer& recv_decompressed_message) override;
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;

@ -24,12 +24,14 @@
#include <string.h>
#include <algorithm>
#include <initializer_list>
#include <string>
#include <utility>
#include <vector>
#include "absl/meta/type_traits.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
@ -107,22 +109,32 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) override;
void RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) override {
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
++sent_message_count_;
}
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& /*send_compressed_message*/) override {}
const grpc_core::SliceBuffer& send_compressed_message) override {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;
void RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) override {
const grpc_core::SliceBuffer& recv_message) override {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
++recv_message_count_;
}
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {}
const grpc_core::SliceBuffer& recv_decompressed_message) override {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}

@ -59,6 +59,8 @@ class EchoServer final : public TestServiceImpl {
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
CheckMetadata(context);
// Enabled for compression trace annotation tests.
context->set_compression_algorithm(GRPC_COMPRESS_GZIP);
return TestServiceImpl::Echo(context, request, response);
}

@ -676,6 +676,124 @@ TEST_F(StatsPluginEnd2EndTest,
IsAnnotationPresent(attempt_span_data, "Delayed LB pick complete."));
}
// Tests that the message size trace annotations are present.
TEST_F(StatsPluginEnd2EndTest, TestMessageSizeAnnotations) {
{
// Client spans are ended when the ClientContext's destructor is invoked.
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
::opencensus::trace::AlwaysSampler always_sampler;
::opencensus::trace::StartSpanOptions options;
options.sampler = &always_sampler;
auto sampling_span =
::opencensus::trace::Span::StartSpan("sampling", nullptr, options);
grpc::CensusContext app_census_context("root", &sampling_span,
::opencensus::tags::TagMap{});
context.set_census_context(
reinterpret_cast<census_context*>(&app_census_context));
context.AddMetadata(kExpectedTraceIdKey,
app_census_context.Span().context().trace_id().ToHex());
traces_recorder_->StartRecording();
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
traces_recorder_->StopRecording();
auto recorded_spans = traces_recorder_->GetAndClearSpans();
// Check presence of message size annotations in attempt span
auto attempt_span_data = GetSpanByName(
recorded_spans, absl::StrCat("Attempt.", client_method_name_));
ASSERT_NE(attempt_span_data, recorded_spans.end());
EXPECT_TRUE(IsAnnotationPresent(attempt_span_data, "Send message: 5 bytes"));
EXPECT_FALSE(IsAnnotationPresent(attempt_span_data,
"Send compressed message: 5 bytes"));
EXPECT_TRUE(
IsAnnotationPresent(attempt_span_data, "Received message: 5 bytes"));
EXPECT_FALSE(IsAnnotationPresent(attempt_span_data,
"Received decompressed message: 5 bytes"));
// Check presence of message size annotations in server span
auto server_span_data =
GetSpanByName(recorded_spans, absl::StrCat("Recv.", client_method_name_));
ASSERT_NE(attempt_span_data, recorded_spans.end());
EXPECT_TRUE(IsAnnotationPresent(server_span_data, "Send message: 5 bytes"));
EXPECT_FALSE(IsAnnotationPresent(attempt_span_data,
"Send compressed message: 5 bytes"));
EXPECT_TRUE(
IsAnnotationPresent(server_span_data, "Received message: 5 bytes"));
EXPECT_FALSE(IsAnnotationPresent(server_span_data,
"Received decompressed message: 5 bytes"));
}
std::string CreateLargeMessage() {
char str[1024];
for (int i = 0; i < 1023; ++i) {
str[i] = 'a';
}
str[1023] = '\0';
return std::string(str);
}
// Tests that the message size with compression trace annotations are present.
TEST_F(StatsPluginEnd2EndTest, TestMessageSizeWithCompressionAnnotations) {
{
// Client spans are ended when the ClientContext's destructor is invoked.
EchoRequest request;
request.set_message(CreateLargeMessage());
EchoResponse response;
grpc::ClientContext context;
context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
::opencensus::trace::AlwaysSampler always_sampler;
::opencensus::trace::StartSpanOptions options;
options.sampler = &always_sampler;
auto sampling_span =
::opencensus::trace::Span::StartSpan("sampling", nullptr, options);
grpc::CensusContext app_census_context("root", &sampling_span,
::opencensus::tags::TagMap{});
context.set_census_context(
reinterpret_cast<census_context*>(&app_census_context));
context.AddMetadata(kExpectedTraceIdKey,
app_census_context.Span().context().trace_id().ToHex());
traces_recorder_->StartRecording();
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
traces_recorder_->StopRecording();
auto recorded_spans = traces_recorder_->GetAndClearSpans();
// Check presence of message size annotations in attempt span
auto attempt_span_data = GetSpanByName(
recorded_spans, absl::StrCat("Attempt.", client_method_name_));
ASSERT_NE(attempt_span_data, recorded_spans.end());
EXPECT_TRUE(
IsAnnotationPresent(attempt_span_data, "Send message: 1026 bytes"));
EXPECT_TRUE(IsAnnotationPresent(attempt_span_data,
"Send compressed message: 31 bytes"));
EXPECT_TRUE(
IsAnnotationPresent(attempt_span_data, "Received message: 31 bytes"));
EXPECT_TRUE(IsAnnotationPresent(attempt_span_data,
"Received decompressed message: 1026 bytes"));
// Check presence of message size annotations in server span
auto server_span_data =
GetSpanByName(recorded_spans, absl::StrCat("Recv.", client_method_name_));
ASSERT_NE(attempt_span_data, recorded_spans.end());
EXPECT_TRUE(
IsAnnotationPresent(server_span_data, "Send message: 1026 bytes"));
EXPECT_TRUE(IsAnnotationPresent(attempt_span_data,
"Send compressed message: 31 bytes"));
EXPECT_TRUE(
IsAnnotationPresent(server_span_data, "Received message: 31 bytes"));
EXPECT_TRUE(IsAnnotationPresent(server_span_data,
"Received decompressed message: 1026 bytes"));
}
// Test the working of GRPC_ARG_DISABLE_OBSERVABILITY.
TEST_F(StatsPluginEnd2EndTest, TestObservabilityDisabledChannelArg) {
{

Loading…
Cancel
Save