OpenCensus server filter: Convert to promises (#32318)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32308/head^2
Yash Tibrewal 2 years ago committed by GitHub
parent f2d5c47ff3
commit b58b5cf3a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      BUILD
  2. 37
      src/cpp/ext/filters/census/channel_filter.cc
  3. 41
      src/cpp/ext/filters/census/channel_filter.h
  4. 13
      src/cpp/ext/filters/census/grpc_plugin.cc
  5. 239
      src/cpp/ext/filters/census/server_filter.cc
  6. 89
      src/cpp/ext/filters/census/server_filter.h

@ -2165,7 +2165,6 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_opencensus_plugin",
srcs = [
"src/cpp/ext/filters/census/channel_filter.cc",
"src/cpp/ext/filters/census/client_filter.cc",
"src/cpp/ext/filters/census/context.cc",
"src/cpp/ext/filters/census/grpc_plugin.cc",
@ -2176,7 +2175,6 @@ grpc_cc_library(
],
hdrs = [
"include/grpcpp/opencensus.h",
"src/cpp/ext/filters/census/channel_filter.h",
"src/cpp/ext/filters/census/client_filter.h",
"src/cpp/ext/filters/census/context.h",
"src/cpp/ext/filters/census/grpc_plugin.h",
@ -2215,11 +2213,14 @@ grpc_cc_library(
"grpc++_base",
"grpc_base",
"//src/core:arena",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_stack_type",
"//src/core:closure",
"//src/core:context",
"//src/core:experiments",
"//src/core:map",
"//src/core:pipe",
"//src/core:poll",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:slice_refcount",

@ -1,37 +0,0 @@
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/channel_filter.h"
#include "absl/status/status.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
namespace grpc {
namespace internal {
grpc_error_handle OpenCensusChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
OpenCensusRegistry::Get().RunFunctionsPostInit();
return absl::OkStatus();
}
} // namespace internal
} // namespace grpc

@ -1,41 +0,0 @@
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
#define GRPC_SRC_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/error.h"
#include "src/cpp/common/channel_filter.h"
namespace grpc {
namespace internal {
class OpenCensusChannelData : public ChannelData {
public:
grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args) override;
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H

@ -35,8 +35,6 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/common/channel_filter.h"
#include "src/cpp/ext/filters/census/channel_filter.h"
#include "src/cpp/ext/filters/census/client_filter.h"
#include "src/cpp/ext/filters/census/measures.h"
#include "src/cpp/ext/filters/census/server_filter.h"
@ -53,11 +51,14 @@ void RegisterOpenCensusPlugin() {
&grpc::internal::OpenCensusClientFilter::kFilter);
return true;
});
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, /*priority=*/INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
builder->PrependFilter(
&grpc::internal::OpenCensusServerFilter::kFilter);
return true;
});
});
RegisterChannelFilter<internal::OpenCensusChannelData,
internal::OpenCensusServerCallData>(
"opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */,
nullptr /* condition function */);
// Access measures to ensure they are initialized. Otherwise, creating a view
// before the first RPC would cause an error.

@ -20,12 +20,16 @@
#include "src/cpp/ext/filters/census/server_filter.h"
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <functional>
#include <string>
#include <utility>
#include <vector>
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
@ -35,13 +39,20 @@
#include "opencensus/tags/tag_key.h"
#include "opencensus/tags/tag_map.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/surface/call.h"
#include <grpcpp/opencensus.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/channel_filter.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
@ -49,8 +60,6 @@
namespace grpc {
namespace internal {
constexpr uint32_t OpenCensusServerCallData::kMaxServerStatsLen;
namespace {
// server metadata elements
@ -82,105 +91,68 @@ void FilterInitialMetadata(grpc_metadata_batch* b,
} // namespace
void OpenCensusServerCallData::OnDoneRecvMessageCb(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
OpenCensusServerCallData* calld =
reinterpret_cast<OpenCensusServerCallData*>(elem->call_data);
OpenCensusChannelData* channeld =
reinterpret_cast<OpenCensusChannelData*>(elem->channel_data);
GPR_ASSERT(calld != nullptr);
GPR_ASSERT(channeld != nullptr);
// Stream messages are no longer valid after receiving trailing metadata.
if (calld->recv_message_->has_value()) {
++calld->recv_message_count_;
}
grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
error);
}
// An OpenCensusServerCallData class will be created for every grpc call within
// a channel. It is used to store data and methods specific to that call.
// OpenCensusServerCallData is thread-compatible, however typically only 1
// thread should be interacting with a call at a time.
class OpenCensusServerCallData {
public:
// Maximum size of server stats that are sent on the wire.
static constexpr uint32_t kMaxServerStatsLen = 16;
explicit OpenCensusServerCallData(
grpc_metadata_batch* client_initial_metadata);
void OnSendMessage() { ++sent_message_count_; }
void OnRecvMessage() { ++recv_message_count_; }
void OnServerTrailingMetadata(grpc_metadata_batch* server_trailing_metadata);
void OnCancel() { elapsed_time_ = absl::Now() - start_time_; }
void Finalize(const grpc_call_final_info* final_info);
private:
experimental::CensusContext context_;
// server method
grpc_core::Slice path_;
absl::string_view method_;
// recv message
absl::Time start_time_;
absl::Duration elapsed_time_;
uint64_t recv_message_count_;
uint64_t sent_message_count_;
// Buffer needed for grpc_slice to reference it when adding metatdata to
// response.
char stats_buf_[kMaxServerStatsLen];
};
void OpenCensusServerCallData::OnDoneRecvInitialMetadataCb(
void* user_data, grpc_error_handle error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
OpenCensusServerCallData* calld =
reinterpret_cast<OpenCensusServerCallData*>(elem->call_data);
GPR_ASSERT(calld != nullptr);
if (error.ok()) {
grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
GPR_ASSERT(initial_metadata != nullptr);
ServerMetadataElements sml;
FilterInitialMetadata(initial_metadata, &sml);
calld->path_ = std::move(sml.path);
calld->method_ = GetMethod(calld->path_);
if (OpenCensusTracingEnabled()) {
calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
GenerateServerContext(sml.tracing_slice.as_string_view(),
calld->qualified_method_, &calld->context_);
grpc_census_call_set_context(
calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
}
if (OpenCensusStatsEnabled()) {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
calld->context_.tags().tags();
tags.emplace_back(ServerMethodTagKey(), std::string(calld->method_));
::opencensus::stats::Record({{RpcServerStartedRpcs(), 1}}, tags);
}
}
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->initial_on_done_recv_initial_metadata_, error);
}
constexpr uint32_t OpenCensusServerCallData::kMaxServerStatsLen;
void OpenCensusServerCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, TransportStreamOpBatch* op) {
if (op->recv_initial_metadata() != nullptr) {
// substitute our callback for the op callback
recv_initial_metadata_ = op->recv_initial_metadata()->batch();
initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
}
if (op->send_message() != nullptr) {
++sent_message_count_;
}
if (op->recv_message() != nullptr) {
recv_message_ = op->op()->payload->recv_message.recv_message;
initial_on_done_recv_message_ =
op->op()->payload->recv_message.recv_message_ready;
op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
OpenCensusServerCallData::OpenCensusServerCallData(
grpc_metadata_batch* client_initial_metadata)
: start_time_(absl::Now()), recv_message_count_(0), sent_message_count_(0) {
ServerMetadataElements sml;
FilterInitialMetadata(client_initial_metadata, &sml);
path_ = std::move(sml.path);
method_ = GetMethod(path_);
if (OpenCensusTracingEnabled()) {
GenerateServerContext(sml.tracing_slice.as_string_view(),
absl::StrCat("Recv.", method_), &context_);
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
call_context[GRPC_CONTEXT_TRACING].value = &context_;
}
// We need to record the time when the trailing metadata was sent to mark the
// completeness of the request.
if (OpenCensusStatsEnabled() && op->send_trailing_metadata() != nullptr) {
elapsed_time_ = absl::Now() - start_time_;
size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
stats_buf_, kMaxServerStatsLen);
if (len > 0) {
op->send_trailing_metadata()->batch()->Set(
grpc_core::GrpcServerStatsBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
}
if (OpenCensusStatsEnabled()) {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ServerMethodTagKey(), std::string(method_));
::opencensus::stats::Record({{RpcServerStartedRpcs(), 1}}, tags);
}
// Call next op.
grpc_call_next_op(elem, op->op());
}
grpc_error_handle OpenCensusServerCallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) {
start_time_ = absl::Now();
gc_ =
grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
OnDoneRecvInitialMetadataCb, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
grpc_schedule_on_exec_ctx);
auth_context_ = grpc_call_auth_context(gc_);
return absl::OkStatus();
}
void OpenCensusServerCallData::Destroy(grpc_call_element* /*elem*/,
const grpc_call_final_info* final_info,
grpc_closure* /*then_call_closure*/) {
grpc_auth_context_release(auth_context_);
void OpenCensusServerCallData::Finalize(
const grpc_call_final_info* final_info) {
if (OpenCensusStatsEnabled()) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
@ -204,5 +176,68 @@ void OpenCensusServerCallData::Destroy(grpc_call_element* /*elem*/,
}
}
void OpenCensusServerCallData::OnServerTrailingMetadata(
grpc_metadata_batch* server_trailing_metadata) {
// We need to record the time when the trailing metadata was sent to
// mark the completeness of the request.
elapsed_time_ = absl::Now() - start_time_;
if (OpenCensusStatsEnabled() && server_trailing_metadata != nullptr) {
size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
stats_buf_, kMaxServerStatsLen);
if (len > 0) {
server_trailing_metadata->Set(
grpc_core::GrpcServerStatsBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
}
}
}
//
// OpenCensusServerFilter
//
const grpc_channel_filter OpenCensusServerFilter::kFilter =
grpc_core::MakePromiseBasedFilter<
OpenCensusServerFilter, grpc_core::FilterEndpoint::kServer,
grpc_core::kFilterExaminesServerInitialMetadata |
grpc_core::kFilterExaminesInboundMessages |
grpc_core::kFilterExaminesOutboundMessages>("opencensus_server");
absl::StatusOr<OpenCensusServerFilter> OpenCensusServerFilter::Create(
const grpc_core::ChannelArgs& /*args*/,
grpc_core::ChannelFilter::Args /*filter_args*/) {
OpenCensusRegistry::Get().RunFunctionsPostInit();
return OpenCensusServerFilter();
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
OpenCensusServerFilter::MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto* calld = grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusServerCallData>(
call_args.client_initial_metadata.get());
call_args.client_to_server_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnRecvMessage();
return message;
});
call_args.server_to_client_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnSendMessage();
return message;
});
grpc_core::GetContext<grpc_core::CallFinalization>()->Add(
[calld](const grpc_call_final_info* final_info) {
calld->Finalize(final_info);
});
return grpc_core::OnCancel(Map(next_promise_factory(std::move(call_args)),
[calld](grpc_core::ServerMetadataHandle md) {
calld->OnServerTrailingMetadata(md.get());
return md;
}),
[calld]() { calld->OnCancel(); });
}
} // namespace internal
} // namespace grpc

@ -21,92 +21,31 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <string.h>
#include <string>
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpcpp/opencensus.h>
#include "absl/status/statusor.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/cpp/common/channel_filter.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/transport.h"
namespace grpc {
namespace internal {
// A CallData class will be created for every grpc call within a channel. It is
// used to store data and methods specific to that call.
// OpenCensusServerCallData is thread-compatible, however typically only 1
// thread should be interacting with a call at a time.
class OpenCensusServerCallData : public CallData {
class OpenCensusServerFilter : public grpc_core::ChannelFilter {
public:
// Maximum size of server stats that are sent on the wire.
static constexpr uint32_t kMaxServerStatsLen = 16;
OpenCensusServerCallData()
: gc_(nullptr),
auth_context_(nullptr),
recv_initial_metadata_(nullptr),
initial_on_done_recv_initial_metadata_(nullptr),
initial_on_done_recv_message_(nullptr),
recv_message_(nullptr),
recv_message_count_(0),
sent_message_count_(0) {
memset(&on_done_recv_initial_metadata_, 0, sizeof(grpc_closure));
memset(&on_done_recv_message_, 0, sizeof(grpc_closure));
}
grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args) override;
void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
static const grpc_channel_filter kFilter;
static void OnDoneRecvInitialMetadataCb(void* user_data,
grpc_error_handle error);
static absl::StatusOr<OpenCensusServerFilter> Create(
const grpc_core::ChannelArgs& /*args*/,
grpc_core::ChannelFilter::Args /*filter_args*/);
static void OnDoneRecvMessageCb(void* user_data, grpc_error_handle error);
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override;
private:
experimental::CensusContext context_;
// server method
absl::string_view method_;
std::string qualified_method_;
grpc_core::Slice path_;
// Pointer to the grpc_call element
grpc_call* gc_;
// Authorization context for the call.
grpc_auth_context* auth_context_;
// recv callback
grpc_metadata_batch* recv_initial_metadata_;
grpc_closure* initial_on_done_recv_initial_metadata_;
grpc_closure on_done_recv_initial_metadata_;
// recv message
grpc_closure* initial_on_done_recv_message_;
grpc_closure on_done_recv_message_;
absl::Time start_time_;
absl::Duration elapsed_time_;
absl::optional<grpc_core::SliceBuffer>* recv_message_;
uint64_t recv_message_count_;
uint64_t sent_message_count_;
// Buffer needed for grpc_slice to reference it when adding metatdata to
// response.
char stats_buf_[kMaxServerStatsLen];
OpenCensusServerFilter() = default;
};
} // namespace internal

Loading…
Cancel
Save