diff --git a/BUILD b/BUILD index ae04a406d63..6df06670d43 100644 --- a/BUILD +++ b/BUILD @@ -2206,6 +2206,8 @@ grpc_cc_library( visibility = ["@grpc:grpc_opencensus_plugin"], deps = [ "census", + "channel_stack_builder", + "config", "debug_location", "gpr", "grpc++", @@ -2215,6 +2217,7 @@ grpc_cc_library( "//src/core:channel_args", "//src/core:channel_stack_type", "//src/core:closure", + "//src/core:context", "//src/core:experiments", "//src/core:slice", "//src/core:slice_buffer", diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 1d06892980e..564738a8437 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -24,6 +24,8 @@ #include #include +#include +#include #include #include #include @@ -47,9 +49,11 @@ #include #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/promise/context.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -58,6 +62,7 @@ #include "src/cpp/ext/filters/census/context.h" #include "src/cpp/ext/filters/census/grpc_plugin.h" #include "src/cpp/ext/filters/census/measures.h" +#include "src/cpp/ext/filters/census/open_census_call_tracer.h" namespace grpc { namespace internal { @@ -68,53 +73,42 @@ constexpr uint32_t OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen; // -// OpenCensusClientChannelData +// OpenCensusClientFilter // -grpc_error_handle OpenCensusClientChannelData::Init( - grpc_channel_element* /*elem*/, grpc_channel_element_args* args) { +const grpc_channel_filter OpenCensusClientFilter::kFilter = + grpc_core::MakePromiseBasedFilter( + "opencensus_client"); + +absl::StatusOr OpenCensusClientFilter::Create( + const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) { bool observability_enabled = - args->channel_args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true); + args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true); // Only run the Post-Init Registry if observability is enabled to avoid // running into a cyclic loop for exporter channels. if (observability_enabled) { OpenCensusRegistry::Get().RunFunctionsPostInit(); } - tracing_enabled_ = observability_enabled; - return absl::OkStatus(); -} - -// -// OpenCensusClientChannelData::OpenCensusClientCallData -// - -grpc_error_handle OpenCensusClientChannelData::OpenCensusClientCallData::Init( - grpc_call_element* elem, const grpc_call_element_args* args) { - tracer_ = args->arena->New( - args, (static_cast(elem->channel_data)) - ->tracing_enabled_); - GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr); - args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer_; - args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) { - (static_cast(tracer))->~OpenCensusCallTracer(); - }; - return absl::OkStatus(); + return OpenCensusClientFilter(/*tracing_enabled=*/observability_enabled); } -void OpenCensusClientChannelData::OpenCensusClientCallData:: - StartTransportStreamOpBatch(grpc_call_element* elem, - TransportStreamOpBatch* op) { - // Note that we are generating the overall call context here instead of in - // the constructor of `OpenCensusCallTracer` due to the semantics of - // `grpc_census_call_set_context` which allows the application to set the - // census context for a call anytime before the first call to - // `grpc_call_start_batch`. - if (op->op()->send_initial_metadata && OpenCensusTracingEnabled() && - (static_cast(elem->channel_data)) - ->tracing_enabled_) { - tracer_->GenerateContext(); - } - grpc_call_next_op(elem, op->op()); +grpc_core::ArenaPromise +OpenCensusClientFilter::MakeCallPromise( + grpc_core::CallArgs call_args, + grpc_core::NextPromiseFactory next_promise_factory) { + auto* call_context = grpc_core::GetContext(); + auto* path = call_args.client_initial_metadata->get_pointer( + grpc_core::HttpPathMetadata()); + auto* tracer = + grpc_core::GetContext() + ->ManagedNew( + call_context, path != nullptr ? path->Ref() : grpc_core::Slice(), + grpc_core::GetContext(), tracing_enabled_); + GPR_DEBUG_ASSERT(call_context[GRPC_CONTEXT_CALL_TRACER].value == nullptr); + call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer; + call_context[GRPC_CONTEXT_CALL_TRACER].destroy = nullptr; + return next_promise_factory(std::move(call_args)); } // @@ -226,7 +220,7 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer:: } void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel( - grpc_error_handle /*cancel_error*/) { + absl::Status /*cancel_error*/) { status_code_ = absl::StatusCode::kCancelled; } @@ -273,13 +267,19 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation( // OpenCensusCallTracer // -OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args, - bool tracing_enabled) - : call_context_(args->context), - path_(grpc_slice_ref(args->path)), +OpenCensusCallTracer::OpenCensusCallTracer( + grpc_call_context_element* call_context, grpc_core::Slice path, + grpc_core::Arena* arena, bool tracing_enabled) + : call_context_(call_context), + path_(std::move(path)), method_(GetMethod(path_)), - arena_(args->arena), - tracing_enabled_(tracing_enabled) {} + arena_(arena), + tracing_enabled_(tracing_enabled) { + auto* parent_context = reinterpret_cast( + call_context_[GRPC_CONTEXT_TRACING].value); + GenerateClientContext(absl::StrCat("Sent.", method_), &context_, + (parent_context == nullptr) ? nullptr : parent_context); +} OpenCensusCallTracer::~OpenCensusCallTracer() { if (OpenCensusStatsEnabled()) { diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h index bf0c5079819..85db1efc5cd 100644 --- a/src/cpp/ext/filters/census/client_filter.h +++ b/src/cpp/ext/filters/census/client_filter.h @@ -21,36 +21,31 @@ #include +#include "absl/status/statusor.h" + +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/iomgr/error.h" -#include "src/cpp/common/channel_filter.h" -#include "src/cpp/ext/filters/census/open_census_call_tracer.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/transport/transport.h" namespace grpc { namespace internal { -class OpenCensusClientChannelData : public ChannelData { +class OpenCensusClientFilter : public grpc_core::ChannelFilter { public: - // A CallData class will be created for every grpc call within a channel. It - // is used to store data and methods specific to that call. - // OpenCensusClientCallData is thread-compatible, however typically only 1 - // thread should be interacting with a call at a time. - class OpenCensusClientCallData : public CallData { - public: - grpc_error_handle Init(grpc_call_element* /* elem */, - const grpc_call_element_args* args) override; - void StartTransportStreamOpBatch(grpc_call_element* elem, - TransportStreamOpBatch* op) override; - - private: - OpenCensusCallTracer* tracer_ = nullptr; - }; - - grpc_error_handle Init(grpc_channel_element* elem, - grpc_channel_element_args* args) override; + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/); + + grpc_core::ArenaPromise MakeCallPromise( + grpc_core::CallArgs call_args, + grpc_core::NextPromiseFactory next_promise_factory) override; private: + explicit OpenCensusClientFilter(bool tracing_enabled) + : tracing_enabled_(tracing_enabled) {} bool tracing_enabled_ = true; }; diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc index 85f8150f72c..91d274b4f64 100644 --- a/src/cpp/ext/filters/census/grpc_plugin.cc +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -32,6 +32,8 @@ #include #include +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/cpp/common/channel_filter.h" #include "src/cpp/ext/filters/census/channel_filter.h" @@ -42,11 +44,16 @@ namespace grpc { void RegisterOpenCensusPlugin() { - RegisterChannelFilter< - internal::OpenCensusClientChannelData, - internal::OpenCensusClientChannelData::OpenCensusClientCallData>( - "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, - nullptr /* condition function */); + grpc_core::CoreConfiguration::RegisterBuilder( + [](grpc_core::CoreConfiguration::Builder* builder) { + builder->channel_init()->RegisterStage( + GRPC_CLIENT_CHANNEL, /*priority=*/INT_MAX, + [](grpc_core::ChannelStackBuilder* builder) { + builder->PrependFilter( + &grpc::internal::OpenCensusClientFilter::kFilter); + return true; + }); + }); RegisterChannelFilter( "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */, diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index b34ce982dac..d393910101a 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -33,7 +33,6 @@ #include #include "src/core/lib/channel/call_tracer.h" -#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/error.h" @@ -103,7 +102,8 @@ class OpenCensusCallTracer : public grpc_core::CallTracer { absl::StatusCode status_code_; }; - explicit OpenCensusCallTracer(const grpc_call_element_args* args, + explicit OpenCensusCallTracer(grpc_call_context_element* call_context, + grpc_core::Slice path, grpc_core::Arena* arena, bool tracing_enabled); ~OpenCensusCallTracer() override;