From f04e1a95895c5b31526d4495d68c755921e4bef6 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 22 Mar 2023 13:41:05 -0700 Subject: [PATCH] OpenCensus: Add annotations for messages (#32646) This PR adds annotations to client attempt spans and server spans on messages of the form - * `Send message: 1026 bytes` * `Send compressed message: 31 bytes` (if message was compressed) * `Received message: 31 bytes` * `Received decompressed message: 1026 bytes` (if message needed to be decompressed) Note that the compressed and decompressed annotations are not present if compression/decompression was not performed. --- BUILD | 1 + src/cpp/ext/filters/census/client_filter.cc | 30 ++++- .../filters/census/open_census_call_tracer.h | 9 +- .../ext/filters/census/server_call_tracer.cc | 22 +++- test/cpp/ext/filters/census/library.h | 2 + .../census/stats_plugin_end2end_test.cc | 118 ++++++++++++++++++ 6 files changed, 167 insertions(+), 15 deletions(-) diff --git a/BUILD b/BUILD index 7cdea65f6d6..b7c1f14fb75 100644 --- a/BUILD +++ b/BUILD @@ -2214,6 +2214,7 @@ grpc_cc_library( "absl/status", "absl/status:statusor", "absl/strings", + "absl/strings:str_format", "absl/time", "absl/types:optional", "opencensus-stats", diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 0fdcc884a8f..c8448cbaad1 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include "absl/status/status.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "absl/time/clock.h" #include "absl/time/time.h" @@ -156,15 +158,33 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: } void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage( - const grpc_core::SliceBuffer& /*send_message*/) { + const grpc_core::SliceBuffer& send_message) { + RecordAnnotation( + absl::StrFormat("Send message: %ld bytes", send_message.Length())); ++sent_message_count_; } +void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: + RecordSendCompressedMessage( + const grpc_core::SliceBuffer& send_compressed_message) { + RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.Length())); +} + void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage( - const grpc_core::SliceBuffer& /*recv_message*/) { + const grpc_core::SliceBuffer& recv_message) { + RecordAnnotation( + absl::StrFormat("Received message: %ld bytes", recv_message.Length())); ++recv_message_count_; } +void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: + RecordReceivedDecompressedMessage( + const grpc_core::SliceBuffer& recv_decompressed_message) { + RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.Length())); +} + namespace { void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) { @@ -298,9 +318,9 @@ OpenCensusCallTracer::~OpenCensusCallTracer() { OpenCensusCallTracer::OpenCensusCallAttemptTracer* OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) { - // We allocate the first attempt on the arena and all subsequent attempts on - // the heap, so that in the common case we don't require a heap allocation, - // nor do we unnecessarily grow the arena. + // We allocate the first attempt on the arena and all subsequent attempts + // on the heap, so that in the common case we don't require a heap + // allocation, nor do we unnecessarily grow the arena. bool is_first_attempt = true; uint64_t attempt_num; { diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 9385794f2d4..56192e809ed 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -67,16 +67,15 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer { grpc_metadata_batch* send_initial_metadata) override; void RecordSendTrailingMetadata( grpc_metadata_batch* /*send_trailing_metadata*/) override {} - void RecordSendMessage( - const grpc_core::SliceBuffer& /*send_message*/) override; + void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override; void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} + const grpc_core::SliceBuffer& send_compressed_message) override; void RecordReceivedInitialMetadata( grpc_metadata_batch* /*recv_initial_metadata*/) override {} void RecordReceivedMessage( - const grpc_core::SliceBuffer& /*recv_message*/) override; + const grpc_core::SliceBuffer& recv_message) override; void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} + const grpc_core::SliceBuffer& recv_decompressed_message) override; void RecordReceivedTrailingMetadata( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, const grpc_transport_stream_stats* transport_stream_stats) override; diff --git a/src/cpp/ext/filters/census/server_call_tracer.cc b/src/cpp/ext/filters/census/server_call_tracer.cc index af7f3865dda..c5351f04bea 100644 --- a/src/cpp/ext/filters/census/server_call_tracer.cc +++ b/src/cpp/ext/filters/census/server_call_tracer.cc @@ -24,12 +24,14 @@ #include #include +#include #include #include #include #include "absl/meta/type_traits.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "absl/time/clock.h" #include "absl/time/time.h" @@ -107,22 +109,32 @@ class OpenCensusServerCallTracer : public grpc_core::ServerCallTracer { void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) override; - void RecordSendMessage( - const grpc_core::SliceBuffer& /*send_message*/) override { + void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override { + RecordAnnotation( + absl::StrFormat("Send message: %ld bytes", send_message.Length())); ++sent_message_count_; } void RecordSendCompressedMessage( - const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} + const grpc_core::SliceBuffer& send_compressed_message) override { + RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", + send_compressed_message.Length())); + } void RecordReceivedInitialMetadata( grpc_metadata_batch* recv_initial_metadata) override; void RecordReceivedMessage( - const grpc_core::SliceBuffer& /*recv_message*/) override { + const grpc_core::SliceBuffer& recv_message) override { + RecordAnnotation( + absl::StrFormat("Received message: %ld bytes", recv_message.Length())); ++recv_message_count_; } void RecordReceivedDecompressedMessage( - const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} + const grpc_core::SliceBuffer& recv_decompressed_message) override { + RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", + recv_decompressed_message.Length())); + } + void RecordReceivedTrailingMetadata( grpc_metadata_batch* /*recv_trailing_metadata*/) override {} diff --git a/test/cpp/ext/filters/census/library.h b/test/cpp/ext/filters/census/library.h index 4d6e328c543..c3c6fa94a3f 100644 --- a/test/cpp/ext/filters/census/library.h +++ b/test/cpp/ext/filters/census/library.h @@ -59,6 +59,8 @@ class EchoServer final : public TestServiceImpl { Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { CheckMetadata(context); + // Enabled for compression trace annotation tests. + context->set_compression_algorithm(GRPC_COMPRESS_GZIP); return TestServiceImpl::Echo(context, request, response); } diff --git a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc index 9ffe97ccd4d..a2d777560b9 100644 --- a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc +++ b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc @@ -676,6 +676,124 @@ TEST_F(StatsPluginEnd2EndTest, IsAnnotationPresent(attempt_span_data, "Delayed LB pick complete.")); } +// Tests that the message size trace annotations are present. +TEST_F(StatsPluginEnd2EndTest, TestMessageSizeAnnotations) { + { + // Client spans are ended when the ClientContext's destructor is invoked. + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + + grpc::ClientContext context; + ::opencensus::trace::AlwaysSampler always_sampler; + ::opencensus::trace::StartSpanOptions options; + options.sampler = &always_sampler; + auto sampling_span = + ::opencensus::trace::Span::StartSpan("sampling", nullptr, options); + grpc::CensusContext app_census_context("root", &sampling_span, + ::opencensus::tags::TagMap{}); + context.set_census_context( + reinterpret_cast(&app_census_context)); + context.AddMetadata(kExpectedTraceIdKey, + app_census_context.Span().context().trace_id().ToHex()); + traces_recorder_->StartRecording(); + grpc::Status status = stub_->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + TestUtils::Flush(); + ::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting(); + traces_recorder_->StopRecording(); + auto recorded_spans = traces_recorder_->GetAndClearSpans(); + // Check presence of message size annotations in attempt span + auto attempt_span_data = GetSpanByName( + recorded_spans, absl::StrCat("Attempt.", client_method_name_)); + ASSERT_NE(attempt_span_data, recorded_spans.end()); + EXPECT_TRUE(IsAnnotationPresent(attempt_span_data, "Send message: 5 bytes")); + EXPECT_FALSE(IsAnnotationPresent(attempt_span_data, + "Send compressed message: 5 bytes")); + EXPECT_TRUE( + IsAnnotationPresent(attempt_span_data, "Received message: 5 bytes")); + EXPECT_FALSE(IsAnnotationPresent(attempt_span_data, + "Received decompressed message: 5 bytes")); + // Check presence of message size annotations in server span + auto server_span_data = + GetSpanByName(recorded_spans, absl::StrCat("Recv.", client_method_name_)); + ASSERT_NE(attempt_span_data, recorded_spans.end()); + EXPECT_TRUE(IsAnnotationPresent(server_span_data, "Send message: 5 bytes")); + EXPECT_FALSE(IsAnnotationPresent(attempt_span_data, + "Send compressed message: 5 bytes")); + EXPECT_TRUE( + IsAnnotationPresent(server_span_data, "Received message: 5 bytes")); + EXPECT_FALSE(IsAnnotationPresent(server_span_data, + "Received decompressed message: 5 bytes")); +} + +std::string CreateLargeMessage() { + char str[1024]; + for (int i = 0; i < 1023; ++i) { + str[i] = 'a'; + } + str[1023] = '\0'; + return std::string(str); +} + +// Tests that the message size with compression trace annotations are present. +TEST_F(StatsPluginEnd2EndTest, TestMessageSizeWithCompressionAnnotations) { + { + // Client spans are ended when the ClientContext's destructor is invoked. + EchoRequest request; + request.set_message(CreateLargeMessage()); + EchoResponse response; + + grpc::ClientContext context; + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + ::opencensus::trace::AlwaysSampler always_sampler; + ::opencensus::trace::StartSpanOptions options; + options.sampler = &always_sampler; + auto sampling_span = + ::opencensus::trace::Span::StartSpan("sampling", nullptr, options); + grpc::CensusContext app_census_context("root", &sampling_span, + ::opencensus::tags::TagMap{}); + context.set_census_context( + reinterpret_cast(&app_census_context)); + context.AddMetadata(kExpectedTraceIdKey, + app_census_context.Span().context().trace_id().ToHex()); + traces_recorder_->StartRecording(); + grpc::Status status = stub_->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + TestUtils::Flush(); + ::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting(); + traces_recorder_->StopRecording(); + auto recorded_spans = traces_recorder_->GetAndClearSpans(); + // Check presence of message size annotations in attempt span + auto attempt_span_data = GetSpanByName( + recorded_spans, absl::StrCat("Attempt.", client_method_name_)); + ASSERT_NE(attempt_span_data, recorded_spans.end()); + EXPECT_TRUE( + IsAnnotationPresent(attempt_span_data, "Send message: 1026 bytes")); + EXPECT_TRUE(IsAnnotationPresent(attempt_span_data, + "Send compressed message: 31 bytes")); + EXPECT_TRUE( + IsAnnotationPresent(attempt_span_data, "Received message: 31 bytes")); + EXPECT_TRUE(IsAnnotationPresent(attempt_span_data, + "Received decompressed message: 1026 bytes")); + // Check presence of message size annotations in server span + auto server_span_data = + GetSpanByName(recorded_spans, absl::StrCat("Recv.", client_method_name_)); + ASSERT_NE(attempt_span_data, recorded_spans.end()); + EXPECT_TRUE( + IsAnnotationPresent(server_span_data, "Send message: 1026 bytes")); + EXPECT_TRUE(IsAnnotationPresent(attempt_span_data, + "Send compressed message: 31 bytes")); + EXPECT_TRUE( + IsAnnotationPresent(server_span_data, "Received message: 31 bytes")); + EXPECT_TRUE(IsAnnotationPresent(server_span_data, + "Received decompressed message: 1026 bytes")); +} + // Test the working of GRPC_ARG_DISABLE_OBSERVABILITY. TEST_F(StatsPluginEnd2EndTest, TestObservabilityDisabledChannelArg) { {