OpenCensus: Convert client filter to promises (#32303)

* OpenCensus: Convert client filter to promises

* Reviewer comments

* Remove unnecessary header
pull/32313/head
Yash Tibrewal 2 years ago committed by GitHub
parent 135f6e84b6
commit ff8c89f313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 88
      src/cpp/ext/filters/census/client_filter.cc
  3. 35
      src/cpp/ext/filters/census/client_filter.h
  4. 17
      src/cpp/ext/filters/census/grpc_plugin.cc
  5. 4
      src/cpp/ext/filters/census/open_census_call_tracer.h

@ -2206,6 +2206,8 @@ grpc_cc_library(
visibility = ["@grpc:grpc_opencensus_plugin"], visibility = ["@grpc:grpc_opencensus_plugin"],
deps = [ deps = [
"census", "census",
"channel_stack_builder",
"config",
"debug_location", "debug_location",
"gpr", "gpr",
"grpc++", "grpc++",
@ -2215,6 +2217,7 @@ grpc_cc_library(
"//src/core:channel_args", "//src/core:channel_args",
"//src/core:channel_stack_type", "//src/core:channel_stack_type",
"//src/core:closure", "//src/core:closure",
"//src/core:context",
"//src/core:experiments", "//src/core:experiments",
"//src/core:slice", "//src/core:slice",
"//src/core:slice_buffer", "//src/core:slice_buffer",

@ -24,6 +24,8 @@
#include <stdint.h> #include <stdint.h>
#include <algorithm> #include <algorithm>
#include <functional>
#include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -47,9 +49,11 @@
#include <grpcpp/opencensus.h> #include <grpcpp/opencensus.h>
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
@ -58,6 +62,7 @@
#include "src/cpp/ext/filters/census/context.h" #include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h" #include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h" #include "src/cpp/ext/filters/census/measures.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
namespace grpc { namespace grpc {
namespace internal { namespace internal {
@ -68,53 +73,42 @@ constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen; OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
// //
// OpenCensusClientChannelData // OpenCensusClientFilter
// //
grpc_error_handle OpenCensusClientChannelData::Init( const grpc_channel_filter OpenCensusClientFilter::kFilter =
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) { grpc_core::MakePromiseBasedFilter<OpenCensusClientFilter,
grpc_core::FilterEndpoint::kClient, 0>(
"opencensus_client");
absl::StatusOr<OpenCensusClientFilter> OpenCensusClientFilter::Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
bool observability_enabled = bool observability_enabled =
args->channel_args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true); args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true);
// Only run the Post-Init Registry if observability is enabled to avoid // Only run the Post-Init Registry if observability is enabled to avoid
// running into a cyclic loop for exporter channels. // running into a cyclic loop for exporter channels.
if (observability_enabled) { if (observability_enabled) {
OpenCensusRegistry::Get().RunFunctionsPostInit(); OpenCensusRegistry::Get().RunFunctionsPostInit();
} }
tracing_enabled_ = observability_enabled; return OpenCensusClientFilter(/*tracing_enabled=*/observability_enabled);
return absl::OkStatus();
}
//
// OpenCensusClientChannelData::OpenCensusClientCallData
//
grpc_error_handle OpenCensusClientChannelData::OpenCensusClientCallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) {
tracer_ = args->arena->New<OpenCensusCallTracer>(
args, (static_cast<OpenCensusClientChannelData*>(elem->channel_data))
->tracing_enabled_);
GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_;
args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
(static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
};
return absl::OkStatus();
} }
void OpenCensusClientChannelData::OpenCensusClientCallData:: grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
StartTransportStreamOpBatch(grpc_call_element* elem, OpenCensusClientFilter::MakeCallPromise(
TransportStreamOpBatch* op) { grpc_core::CallArgs call_args,
// Note that we are generating the overall call context here instead of in grpc_core::NextPromiseFactory next_promise_factory) {
// the constructor of `OpenCensusCallTracer` due to the semantics of auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
// `grpc_census_call_set_context` which allows the application to set the auto* path = call_args.client_initial_metadata->get_pointer(
// census context for a call anytime before the first call to grpc_core::HttpPathMetadata());
// `grpc_call_start_batch`. auto* tracer =
if (op->op()->send_initial_metadata && OpenCensusTracingEnabled() && grpc_core::GetContext<grpc_core::Arena>()
(static_cast<OpenCensusClientChannelData*>(elem->channel_data)) ->ManagedNew<OpenCensusCallTracer>(
->tracing_enabled_) { call_context, path != nullptr ? path->Ref() : grpc_core::Slice(),
tracer_->GenerateContext(); grpc_core::GetContext<grpc_core::Arena>(), tracing_enabled_);
} GPR_DEBUG_ASSERT(call_context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
grpc_call_next_op(elem, op->op()); call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER].destroy = nullptr;
return next_promise_factory(std::move(call_args));
} }
// //
@ -226,7 +220,7 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
} }
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel( void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
grpc_error_handle /*cancel_error*/) { absl::Status /*cancel_error*/) {
status_code_ = absl::StatusCode::kCancelled; status_code_ = absl::StatusCode::kCancelled;
} }
@ -273,13 +267,19 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation(
// OpenCensusCallTracer // OpenCensusCallTracer
// //
OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args, OpenCensusCallTracer::OpenCensusCallTracer(
bool tracing_enabled) grpc_call_context_element* call_context, grpc_core::Slice path,
: call_context_(args->context), grpc_core::Arena* arena, bool tracing_enabled)
path_(grpc_slice_ref(args->path)), : call_context_(call_context),
path_(std::move(path)),
method_(GetMethod(path_)), method_(GetMethod(path_)),
arena_(args->arena), arena_(arena),
tracing_enabled_(tracing_enabled) {} tracing_enabled_(tracing_enabled) {
auto* parent_context = reinterpret_cast<CensusContext*>(
call_context_[GRPC_CONTEXT_TRACING].value);
GenerateClientContext(absl::StrCat("Sent.", method_), &context_,
(parent_context == nullptr) ? nullptr : parent_context);
}
OpenCensusCallTracer::~OpenCensusCallTracer() { OpenCensusCallTracer::~OpenCensusCallTracer() {
if (OpenCensusStatsEnabled()) { if (OpenCensusStatsEnabled()) {

@ -21,36 +21,31 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "absl/status/statusor.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/promise/arena_promise.h"
#include "src/cpp/common/channel_filter.h" #include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
namespace grpc { namespace grpc {
namespace internal { namespace internal {
class OpenCensusClientChannelData : public ChannelData { class OpenCensusClientFilter : public grpc_core::ChannelFilter {
public:
// A CallData class will be created for every grpc call within a channel. It
// is used to store data and methods specific to that call.
// OpenCensusClientCallData is thread-compatible, however typically only 1
// thread should be interacting with a call at a time.
class OpenCensusClientCallData : public CallData {
public: public:
grpc_error_handle Init(grpc_call_element* /* elem */, static const grpc_channel_filter kFilter;
const grpc_call_element_args* args) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
private: static absl::StatusOr<OpenCensusClientFilter> Create(
OpenCensusCallTracer* tracer_ = nullptr; const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/);
};
grpc_error_handle Init(grpc_channel_element* elem, grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_channel_element_args* args) override; grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override;
private: private:
explicit OpenCensusClientFilter(bool tracing_enabled)
: tracing_enabled_(tracing_enabled) {}
bool tracing_enabled_ = true; bool tracing_enabled_ = true;
}; };

@ -32,6 +32,8 @@
#include <grpcpp/opencensus.h> #include <grpcpp/opencensus.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/common/channel_filter.h" #include "src/cpp/common/channel_filter.h"
#include "src/cpp/ext/filters/census/channel_filter.h" #include "src/cpp/ext/filters/census/channel_filter.h"
@ -42,11 +44,16 @@
namespace grpc { namespace grpc {
void RegisterOpenCensusPlugin() { void RegisterOpenCensusPlugin() {
RegisterChannelFilter< grpc_core::CoreConfiguration::RegisterBuilder(
internal::OpenCensusClientChannelData, [](grpc_core::CoreConfiguration::Builder* builder) {
internal::OpenCensusClientChannelData::OpenCensusClientCallData>( builder->channel_init()->RegisterStage(
"opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, GRPC_CLIENT_CHANNEL, /*priority=*/INT_MAX,
nullptr /* condition function */); [](grpc_core::ChannelStackBuilder* builder) {
builder->PrependFilter(
&grpc::internal::OpenCensusClientFilter::kFilter);
return true;
});
});
RegisterChannelFilter<internal::OpenCensusChannelData, RegisterChannelFilter<internal::OpenCensusChannelData,
internal::OpenCensusServerCallData>( internal::OpenCensusServerCallData>(
"opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */, "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */,

@ -33,7 +33,6 @@
#include <grpcpp/opencensus.h> #include <grpcpp/opencensus.h>
#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
@ -103,7 +102,8 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
absl::StatusCode status_code_; absl::StatusCode status_code_;
}; };
explicit OpenCensusCallTracer(const grpc_call_element_args* args, explicit OpenCensusCallTracer(grpc_call_context_element* call_context,
grpc_core::Slice path, grpc_core::Arena* arena,
bool tracing_enabled); bool tracing_enabled);
~OpenCensusCallTracer() override; ~OpenCensusCallTracer() override;

Loading…
Cancel
Save