Observability: Experimental arg to disable client side tracing (#31093)

* Observability: Experimental arg to disable client side tracing

* Fix IWYU

* Reviewer comments

* Reviewer comments

* Reviewer comment: Move experimental/internal arg to different file

* Fix build

* Fix IWYU
pull/31142/head
Yash Tibrewal 2 years ago committed by GitHub
parent ac585b8f7e
commit 6d249c0af2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 108
      src/cpp/ext/filters/census/client_filter.cc
  2. 29
      src/cpp/ext/filters/census/client_filter.h
  3. 3
      src/cpp/ext/filters/census/grpc_plugin.cc
  4. 18
      src/cpp/ext/filters/census/open_census_call_tracer.h
  5. 51
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc

@ -46,6 +46,7 @@
#include <grpc/support/log.h>
#include <grpcpp/support/config.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/resource_quota/arena.h"
@ -64,9 +65,27 @@ constexpr uint32_t
constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
grpc_error_handle CensusClientCallData::Init(
grpc_call_element* /* elem */, const grpc_call_element_args* args) {
tracer_ = args->arena->New<OpenCensusCallTracer>(args);
//
// CensusClientChannelData
//
grpc_error_handle CensusClientChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
tracing_enabled_ = grpc_core::ChannelArgs::FromC(args->channel_args)
.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY)
.value_or(true);
return GRPC_ERROR_NONE;
}
//
// CensusClientChannelData::CensusClientCallData
//
grpc_error_handle CensusClientChannelData::CensusClientCallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) {
tracer_ = args->arena->New<OpenCensusCallTracer>(
args, (static_cast<CensusClientChannelData*>(elem->channel_data))
->tracing_enabled_);
GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_;
args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
@ -75,14 +94,16 @@ grpc_error_handle CensusClientCallData::Init(
return GRPC_ERROR_NONE;
}
void CensusClientCallData::StartTransportStreamOpBatch(
void CensusClientChannelData::CensusClientCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, TransportStreamOpBatch* op) {
// Note that we are generating the overall call context here instead of in
// the constructor of `OpenCensusCallTracer` due to the semantics of
// `grpc_census_call_set_context` which allows the application to set the
// census context for a call anytime before the first call to
// `grpc_call_start_batch`.
if (op->op()->send_initial_metadata) {
if (op->op()->send_initial_metadata &&
(static_cast<CensusClientChannelData*>(elem->channel_data))
->tracing_enabled_) {
tracer_->GenerateContext();
}
grpc_call_next_op(elem, op->op());
@ -92,27 +113,17 @@ void CensusClientCallData::StartTransportStreamOpBatch(
// OpenCensusCallTracer::OpenCensusCallAttemptTracer
//
namespace {
CensusContext CreateCensusContextForCallAttempt(
absl::string_view method, const CensusContext& parent_context) {
GPR_DEBUG_ASSERT(parent_context.Context().IsValid());
return CensusContext(absl::StrCat("Attempt.", method), &parent_context.Span(),
parent_context.tags());
}
} // namespace
OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
OpenCensusCallTracer* parent, uint64_t attempt_num,
bool is_transparent_retry, bool arena_allocated)
: parent_(parent),
arena_allocated_(arena_allocated),
context_(CreateCensusContextForCallAttempt(parent_->method_,
parent_->context_)),
context_(parent_->CreateCensusContextForCallAttempt()),
start_time_(absl::Now()) {
context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
if (parent_->tracing_enabled_) {
context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
}
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
@ -121,20 +132,22 @@ OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
char tracing_buf[kMaxTraceContextLen];
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
kMaxTraceContextLen);
if (tracing_len > 0) {
send_initial_metadata->Set(
grpc_core::GrpcTraceBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
}
grpc_slice tags = grpc_empty_slice();
// TODO(unknown): Add in tagging serialization.
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
grpc_core::Slice(tags));
if (parent_->tracing_enabled_) {
char tracing_buf[kMaxTraceContextLen];
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
kMaxTraceContextLen);
if (tracing_len > 0) {
send_initial_metadata->Set(
grpc_core::GrpcTraceBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
}
grpc_slice tags = grpc_empty_slice();
// TODO(unknown): Add in tagging serialization.
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
grpc_core::Slice(tags));
}
}
}
@ -204,11 +217,13 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
{RpcClientSentMessagesPerRpc(), sent_message_count_},
{RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
tags);
if (status_code_ != absl::StatusCode::kOk) {
context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
StatusCodeToString(status_code_));
if (parent_->tracing_enabled_) {
if (status_code_ != absl::StatusCode::kOk) {
context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
StatusCodeToString(status_code_));
}
context_.EndSpan();
}
context_.EndSpan();
grpc_core::MutexLock lock(&parent_->mu_);
if (--parent_->num_active_rpcs_ == 0) {
parent_->time_at_last_attempt_end_ = absl::Now();
@ -224,11 +239,13 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
// OpenCensusCallTracer
//
OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args)
OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args,
bool tracing_enabled)
: call_context_(args->context),
path_(grpc_slice_ref(args->path)),
method_(GetMethod(path_)),
arena_(args->arena) {}
arena_(args->arena),
tracing_enabled_(tracing_enabled) {}
OpenCensusCallTracer::~OpenCensusCallTracer() {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
@ -239,7 +256,9 @@ OpenCensusCallTracer::~OpenCensusCallTracer() {
{RpcClientTransparentRetriesPerCall(), transparent_retries_},
{RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
tags);
context_.EndSpan();
if (tracing_enabled_) {
context_.EndSpan();
}
}
void OpenCensusCallTracer::GenerateContext() {
@ -280,4 +299,11 @@ OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
this, attempt_num, is_transparent_retry, false /* arena_allocated */);
}
CensusContext OpenCensusCallTracer::CreateCensusContextForCallAttempt() {
if (!tracing_enabled_) return CensusContext();
GPR_DEBUG_ASSERT(context_.Context().IsValid());
return CensusContext(absl::StrCat("Attempt.", method_), &(context_.Span()),
context_.tags());
}
} // namespace grpc

@ -29,19 +29,28 @@
namespace grpc {
// A CallData class will be created for every grpc call within a channel. It is
// used to store data and methods specific to that call. CensusClientCallData is
// thread-compatible, however typically only 1 thread should be interacting with
// a call at a time.
class CensusClientCallData : public CallData {
class CensusClientChannelData : public ChannelData {
public:
grpc_error_handle Init(grpc_call_element* /* elem */,
const grpc_call_element_args* args) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
// A CallData class will be created for every grpc call within a channel. It
// is used to store data and methods specific to that call.
// CensusClientCallData is thread-compatible, however typically only 1 thread
// should be interacting with a call at a time.
class CensusClientCallData : public CallData {
public:
grpc_error_handle Init(grpc_call_element* /* elem */,
const grpc_call_element_args* args) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
private:
OpenCensusCallTracer* tracer_ = nullptr;
};
grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args) override;
private:
OpenCensusCallTracer* tracer_ = nullptr;
bool tracing_enabled_ = true;
};
} // namespace grpc

@ -40,7 +40,8 @@
namespace grpc {
void RegisterOpenCensusPlugin() {
RegisterChannelFilter<CensusChannelData, CensusClientCallData>(
RegisterChannelFilter<CensusClientChannelData,
CensusClientChannelData::CensusClientCallData>(
"opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */,
nullptr /* condition function */);
RegisterChannelFilter<CensusChannelData, CensusServerCallData>(

@ -43,6 +43,18 @@
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/context.h"
// TODO(yashykt): This might not be the right place for this channel arg, but we
// don't have a better place for this right now.
// EXPERIMENTAL. If zero, disables observability tracing and observability
// logging (not yet implemented) on the client channel, defaults to true. Note
// that this does not impact metrics/stats collection. This channel arg is
// intended as a way to avoid cyclic execution of observability logging and
// trace especially when the sampling rate of RPCs is very high which would
// generate a lot of data.
//
#define GRPC_ARG_ENABLE_OBSERVABILITY "grpc.experimental.enable_observability"
namespace grpc {
class OpenCensusCallTracer : public grpc_core::CallTracer {
@ -89,7 +101,8 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
absl::StatusCode status_code_;
};
explicit OpenCensusCallTracer(const grpc_call_element_args* args);
explicit OpenCensusCallTracer(const grpc_call_element_args* args,
bool tracing_enabled);
~OpenCensusCallTracer() override;
void GenerateContext();
@ -97,12 +110,15 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
bool is_transparent_retry) override;
private:
CensusContext CreateCensusContextForCallAttempt();
const grpc_call_context_element* call_context_;
// Client method.
grpc_core::Slice path_;
absl::string_view method_;
CensusContext context_;
grpc_core::Arena* arena_;
bool tracing_enabled_;
grpc_core::Mutex mu_;
// Non-transparent attempts per call
uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;

@ -34,8 +34,10 @@
#include <grpc++/grpc++.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
@ -730,6 +732,55 @@ TEST_F(StatsPluginEnd2EndTest, TestAllSpansAreExported) {
recv_span_data->parent_span_id());
}
// Test the working of GRPC_ARG_DISABLE_OBSERVABILITY.
TEST_F(StatsPluginEnd2EndTest, TestObservabilityDisabledChannelArg) {
{
// Client spans are ended when the ClientContext's destructor is invoked.
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
auto channel = CreateCustomChannel(server_address_,
InsecureChannelCredentials(), args);
ResetStub(channel);
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
::opencensus::trace::AlwaysSampler always_sampler;
::opencensus::trace::StartSpanOptions options;
options.sampler = &always_sampler;
auto sampling_span =
::opencensus::trace::Span::StartSpan("sampling", nullptr, options);
grpc::CensusContext app_census_context("root", &sampling_span,
::opencensus::tags::TagMap{});
context.set_census_context(
reinterpret_cast<census_context*>(&app_census_context));
traces_recorder_->StartRecording();
grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
::opencensus::trace::exporter::SpanExporterTestPeer::ExportForTesting();
traces_recorder_->StopRecording();
auto recorded_spans = traces_recorder_->GetAndClearSpans();
auto GetSpanByName = [&recorded_spans](absl::string_view name) {
return std::find_if(
recorded_spans.begin(), recorded_spans.end(),
[name](auto const& span_data) { return span_data.name() == name; });
};
// The size might be 0 or 1, depending on whether the server-side ends up
// getting sampled or not.
ASSERT_LE(recorded_spans.size(), 1);
// Make sure that the client-side traces are not collected.
auto sent_span_data =
GetSpanByName(absl::StrCat("Sent.", client_method_name_));
ASSERT_EQ(sent_span_data, recorded_spans.end());
auto attempt_span_data =
GetSpanByName(absl::StrCat("Attempt.", client_method_name_));
ASSERT_EQ(attempt_span_data, recorded_spans.end());
}
} // namespace
} // namespace testing

Loading…
Cancel
Save