From 6d249c0af29b39b050c22d01c65fcf7d5e9af8e9 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 27 Sep 2022 14:34:07 -0700 Subject: [PATCH] Observability: Experimental arg to disable client side tracing (#31093) * Observability: Experimental arg to disable client side tracing * Fix IWYU * Reviewer comments * Reviewer comments * Reviewer comment: Move experimental/internal arg to different file * Fix build * Fix IWYU --- src/cpp/ext/filters/census/client_filter.cc | 108 +++++++++++------- src/cpp/ext/filters/census/client_filter.h | 29 +++-- src/cpp/ext/filters/census/grpc_plugin.cc | 3 +- .../filters/census/open_census_call_tracer.h | 18 ++- .../census/stats_plugin_end2end_test.cc | 51 +++++++++ 5 files changed, 156 insertions(+), 53 deletions(-) diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 26da86247f9..fadcbbd20b2 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -46,6 +46,7 @@ #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/resource_quota/arena.h" @@ -64,9 +65,27 @@ constexpr uint32_t constexpr uint32_t OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen; -grpc_error_handle CensusClientCallData::Init( - grpc_call_element* /* elem */, const grpc_call_element_args* args) { - tracer_ = args->arena->New(args); +// +// CensusClientChannelData +// + +grpc_error_handle CensusClientChannelData::Init( + grpc_channel_element* /*elem*/, grpc_channel_element_args* args) { + tracing_enabled_ = grpc_core::ChannelArgs::FromC(args->channel_args) + .GetInt(GRPC_ARG_ENABLE_OBSERVABILITY) + .value_or(true); + return GRPC_ERROR_NONE; +} + +// +// CensusClientChannelData::CensusClientCallData +// + +grpc_error_handle CensusClientChannelData::CensusClientCallData::Init( + grpc_call_element* elem, const grpc_call_element_args* args) { + tracer_ = args->arena->New( + args, (static_cast(elem->channel_data)) + ->tracing_enabled_); GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr); args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_; args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) { @@ -75,14 +94,16 @@ grpc_error_handle CensusClientCallData::Init( return GRPC_ERROR_NONE; } -void CensusClientCallData::StartTransportStreamOpBatch( +void CensusClientChannelData::CensusClientCallData::StartTransportStreamOpBatch( grpc_call_element* elem, TransportStreamOpBatch* op) { // Note that we are generating the overall call context here instead of in // the constructor of `OpenCensusCallTracer` due to the semantics of // `grpc_census_call_set_context` which allows the application to set the // census context for a call anytime before the first call to // `grpc_call_start_batch`. - if (op->op()->send_initial_metadata) { + if (op->op()->send_initial_metadata && + (static_cast(elem->channel_data)) + ->tracing_enabled_) { tracer_->GenerateContext(); } grpc_call_next_op(elem, op->op()); @@ -92,27 +113,17 @@ void CensusClientCallData::StartTransportStreamOpBatch( // OpenCensusCallTracer::OpenCensusCallAttemptTracer // -namespace { - -CensusContext CreateCensusContextForCallAttempt( - absl::string_view method, const CensusContext& parent_context) { - GPR_DEBUG_ASSERT(parent_context.Context().IsValid()); - return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(), - parent_context.tags()); -} - -} // namespace - OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer( OpenCensusCallTracer* parent, uint64_t attempt_num, bool is_transparent_retry, bool arena_allocated) : parent_(parent), arena_allocated_(arena_allocated), - context_(CreateCensusContextForCallAttempt(parent_->method_, - parent_->context_)), + context_(parent_->CreateCensusContextForCallAttempt()), start_time_(absl::Now()) { - context_.AddSpanAttribute("previous-rpc-attempts", attempt_num); - context_.AddSpanAttribute("transparent-retry", is_transparent_retry); + if (parent_->tracing_enabled_) { + context_.AddSpanAttribute("previous-rpc-attempts", attempt_num); + context_.AddSpanAttribute("transparent-retry", is_transparent_retry); + } std::vector> tags = context_.tags().tags(); tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_)); @@ -121,20 +132,22 @@ OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer( void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) { - char tracing_buf[kMaxTraceContextLen]; - size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf, - kMaxTraceContextLen); - if (tracing_len > 0) { - send_initial_metadata->Set( - grpc_core::GrpcTraceBinMetadata(), - grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len)); - } - grpc_slice tags = grpc_empty_slice(); - // TODO(unknown): Add in tagging serialization. - size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); - if (encoded_tags_len > 0) { - send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(), - grpc_core::Slice(tags)); + if (parent_->tracing_enabled_) { + char tracing_buf[kMaxTraceContextLen]; + size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf, + kMaxTraceContextLen); + if (tracing_len > 0) { + send_initial_metadata->Set( + grpc_core::GrpcTraceBinMetadata(), + grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len)); + } + grpc_slice tags = grpc_empty_slice(); + // TODO(unknown): Add in tagging serialization. + size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); + if (encoded_tags_len > 0) { + send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(), + grpc_core::Slice(tags)); + } } } @@ -204,11 +217,13 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd( {RpcClientSentMessagesPerRpc(), sent_message_count_}, {RpcClientReceivedMessagesPerRpc(), recv_message_count_}}, tags); - if (status_code_ != absl::StatusCode::kOk) { - context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_), - StatusCodeToString(status_code_)); + if (parent_->tracing_enabled_) { + if (status_code_ != absl::StatusCode::kOk) { + context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_), + StatusCodeToString(status_code_)); + } + context_.EndSpan(); } - context_.EndSpan(); grpc_core::MutexLock lock(&parent_->mu_); if (--parent_->num_active_rpcs_ == 0) { parent_->time_at_last_attempt_end_ = absl::Now(); @@ -224,11 +239,13 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd( // OpenCensusCallTracer // -OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args) +OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args, + bool tracing_enabled) : call_context_(args->context), path_(grpc_slice_ref(args->path)), method_(GetMethod(path_)), - arena_(args->arena) {} + arena_(args->arena), + tracing_enabled_(tracing_enabled) {} OpenCensusCallTracer::~OpenCensusCallTracer() { std::vector> tags = @@ -239,7 +256,9 @@ OpenCensusCallTracer::~OpenCensusCallTracer() { {RpcClientTransparentRetriesPerCall(), transparent_retries_}, {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}}, tags); - context_.EndSpan(); + if (tracing_enabled_) { + context_.EndSpan(); + } } void OpenCensusCallTracer::GenerateContext() { @@ -280,4 +299,11 @@ OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) { this, attempt_num, is_transparent_retry, false /* arena_allocated */); } +CensusContext OpenCensusCallTracer::CreateCensusContextForCallAttempt() { + if (!tracing_enabled_) return CensusContext(); + GPR_DEBUG_ASSERT(context_.Context().IsValid()); + return CensusContext(absl::StrCat("Attempt.", method_), &(context_.Span()), + context_.tags()); +} + } // namespace grpc diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h index 54342e19794..e88cd69efc9 100644 --- a/src/cpp/ext/filters/census/client_filter.h +++ b/src/cpp/ext/filters/census/client_filter.h @@ -29,19 +29,28 @@ namespace grpc { -// A CallData class will be created for every grpc call within a channel. It is -// used to store data and methods specific to that call. CensusClientCallData is -// thread-compatible, however typically only 1 thread should be interacting with -// a call at a time. -class CensusClientCallData : public CallData { +class CensusClientChannelData : public ChannelData { public: - grpc_error_handle Init(grpc_call_element* /* elem */, - const grpc_call_element_args* args) override; - void StartTransportStreamOpBatch(grpc_call_element* elem, - TransportStreamOpBatch* op) override; + // A CallData class will be created for every grpc call within a channel. It + // is used to store data and methods specific to that call. + // CensusClientCallData is thread-compatible, however typically only 1 thread + // should be interacting with a call at a time. + class CensusClientCallData : public CallData { + public: + grpc_error_handle Init(grpc_call_element* /* elem */, + const grpc_call_element_args* args) override; + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + private: + OpenCensusCallTracer* tracer_ = nullptr; + }; + + grpc_error_handle Init(grpc_channel_element* elem, + grpc_channel_element_args* args) override; private: - OpenCensusCallTracer* tracer_ = nullptr; + bool tracing_enabled_ = true; }; } // namespace grpc diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc index a4a06873fa3..cda77a18fd3 100644 --- a/src/cpp/ext/filters/census/grpc_plugin.cc +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -40,7 +40,8 @@ namespace grpc { void RegisterOpenCensusPlugin() { - RegisterChannelFilter( + RegisterChannelFilter( "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, nullptr /* condition function */); RegisterChannelFilter( diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index f07b06ba503..1c8fd265c64 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -43,6 +43,18 @@ #include "src/core/lib/transport/transport.h" #include "src/cpp/ext/filters/census/context.h" +// TODO(yashykt): This might not be the right place for this channel arg, but we +// don't have a better place for this right now. + +// EXPERIMENTAL. If zero, disables observability tracing and observability +// logging (not yet implemented) on the client channel, defaults to true. Note +// that this does not impact metrics/stats collection. This channel arg is +// intended as a way to avoid cyclic execution of observability logging and +// trace especially when the sampling rate of RPCs is very high which would +// generate a lot of data. +// +#define GRPC_ARG_ENABLE_OBSERVABILITY "grpc.experimental.enable_observability" + namespace grpc { class OpenCensusCallTracer : public grpc_core::CallTracer { @@ -89,7 +101,8 @@ class OpenCensusCallTracer : public grpc_core::CallTracer { absl::StatusCode status_code_; }; - explicit OpenCensusCallTracer(const grpc_call_element_args* args); + explicit OpenCensusCallTracer(const grpc_call_element_args* args, + bool tracing_enabled); ~OpenCensusCallTracer() override; void GenerateContext(); @@ -97,12 +110,15 @@ class OpenCensusCallTracer : public grpc_core::CallTracer { bool is_transparent_retry) override; private: + CensusContext CreateCensusContextForCallAttempt(); + const grpc_call_context_element* call_context_; // Client method. grpc_core::Slice path_; absl::string_view method_; CensusContext context_; grpc_core::Arena* arena_; + bool tracing_enabled_; grpc_core::Mutex mu_; // Non-transparent attempts per call uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0; diff --git a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc index 62beb7560af..3ee0ffd85fe 100644 --- a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc +++ b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc @@ -34,8 +34,10 @@ #include #include +#include "src/core/lib/channel/call_tracer.h" #include "src/cpp/ext/filters/census/context.h" #include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/open_census_call_tracer.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/test_config.h" #include "test/cpp/end2end/test_service_impl.h" @@ -730,6 +732,55 @@ TEST_F(StatsPluginEnd2EndTest, TestAllSpansAreExported) { recv_span_data->parent_span_id()); } +// Test the working of GRPC_ARG_DISABLE_OBSERVABILITY. +TEST_F(StatsPluginEnd2EndTest, TestObservabilityDisabledChannelArg) { + { + // Client spans are ended when the ClientContext's destructor is invoked. + ChannelArguments args; + args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); + auto channel = CreateCustomChannel(server_address_, + InsecureChannelCredentials(), args); + ResetStub(channel); + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + + grpc::ClientContext context; + ::opencensus::trace::AlwaysSampler always_sampler; + ::opencensus::trace::StartSpanOptions options; + options.sampler = &always_sampler; + auto sampling_span = + ::opencensus::trace::Span::StartSpan("sampling", nullptr, options); + grpc::CensusContext app_census_context("root", &sampling_span, + ::opencensus::tags::TagMap{}); + context.set_census_context( + reinterpret_cast(&app_census_context)); + traces_recorder_->StartRecording(); + grpc::Status status = stub_->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()); + } + absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor())); + ::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting(); + traces_recorder_->StopRecording(); + auto recorded_spans = traces_recorder_->GetAndClearSpans(); + auto GetSpanByName = [&recorded_spans](absl::string_view name) { + return std::find_if( + recorded_spans.begin(), recorded_spans.end(), + [name](auto const& span_data) { return span_data.name() == name; }); + }; + + // The size might be 0 or 1, depending on whether the server-side ends up + // getting sampled or not. + ASSERT_LE(recorded_spans.size(), 1); + // Make sure that the client-side traces are not collected. + auto sent_span_data = + GetSpanByName(absl::StrCat("Sent.", client_method_name_)); + ASSERT_EQ(sent_span_data, recorded_spans.end()); + auto attempt_span_data = + GetSpanByName(absl::StrCat("Attempt.", client_method_name_)); + ASSERT_EQ(attempt_span_data, recorded_spans.end()); +} + } // namespace } // namespace testing