[call-v3] Convert server-call-tracer filter (#35249)

Closes #35249

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35249 from ctiller:cg-server-call-tracer ad52acb64f
PiperOrigin-RevId: 588885615
pull/35253/head
Craig Tiller 1 year ago committed by Copybara-Service
parent de6c437c89
commit 22537cb193
  1. 1
      src/core/ext/filters/http/client/http_client_filter.cc
  2. 1
      src/core/ext/filters/http/client/http_client_filter.h
  3. 1
      src/core/ext/filters/http/server/http_server_filter.cc
  4. 1
      src/core/ext/filters/http/server/http_server_filter.h
  5. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  6. 2
      src/core/ext/filters/message_size/message_size_filter.h
  7. 31
      src/core/lib/channel/promise_based_filter.h
  8. 74
      src/core/lib/channel/server_call_tracer_filter.cc

@ -53,6 +53,7 @@ namespace grpc_core {
const NoInterceptor HttpClientFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpClientFilter::Call::OnClientToServerMessage;
const NoInterceptor HttpClientFilter::Call::OnFinalize;
const grpc_channel_filter HttpClientFilter::kFilter =
MakePromiseBasedFilter<HttpClientFilter, FilterEndpoint::kClient,

@ -45,6 +45,7 @@ class HttpClientFilter : public ImplementChannelFilter<HttpClientFilter> {
absl::Status OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
private:

@ -51,6 +51,7 @@ namespace grpc_core {
const NoInterceptor HttpServerFilter::Call::OnClientToServerMessage;
const NoInterceptor HttpServerFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpServerFilter::Call::OnFinalize;
const grpc_channel_filter HttpServerFilter::kFilter =
MakePromiseBasedFilter<HttpServerFilter, FilterEndpoint::kServer,

@ -47,6 +47,7 @@ class HttpServerFilter : public ImplementChannelFilter<HttpServerFilter> {
void OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
private:

@ -53,9 +53,11 @@ namespace grpc_core {
const NoInterceptor ClientMessageSizeFilter::Call::OnClientInitialMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnServerInitialMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnFinalize;
const NoInterceptor ServerMessageSizeFilter::Call::OnClientInitialMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnFinalize;
//
// MessageSizeParsedConfig

@ -99,6 +99,7 @@ class ServerMessageSizeFilter final
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
ServerMetadataHandle OnClientToServerMessage(
const Message& message, ServerMessageSizeFilter* filter);
ServerMetadataHandle OnServerToClientMessage(
@ -126,6 +127,7 @@ class ClientMessageSizeFilter final
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
ServerMetadataHandle OnClientToServerMessage(const Message& message);
ServerMetadataHandle OnServerToClientMessage(const Message& message);

@ -186,6 +186,11 @@ inline constexpr bool HasChannelAccess(R (T::*)(A)) {
return false;
}
template <typename T, typename R, typename A>
inline constexpr bool HasChannelAccess(R (T::*)()) {
return false;
}
template <typename T, typename R, typename A, typename C>
inline constexpr bool HasChannelAccess(R (T::*)(A, C)) {
return true;
@ -208,7 +213,8 @@ inline constexpr bool CallHasChannelAccess() {
&Derived::Call::OnClientToServerMessage,
&Derived::Call::OnServerInitialMetadata,
&Derived::Call::OnServerToClientMessage,
&Derived::Call::OnServerTrailingMetadata);
&Derived::Call::OnServerTrailingMetadata,
&Derived::Call::OnFinalize);
}
// Given a boolean X export a type:
@ -642,6 +648,18 @@ inline void InterceptServerTrailingMetadata(
});
}
inline void InterceptFinalize(const NoInterceptor*, void*) {}
template <class Call>
inline void InterceptFinalize(void (Call::*fn)(const grpc_call_final_info*),
Call* call) {
GPR_DEBUG_ASSERT(fn == &Call::OnFinalize);
GetContext<CallFinalization>()->Add(
[call](const grpc_call_final_info* final_info) {
call->OnFinalize(final_info);
});
}
template <typename Derived>
absl::enable_if_t<std::is_empty<FilterCallData<Derived>>::value,
FilterCallData<Derived>*>
@ -674,6 +692,7 @@ MakeFilterCall(Derived* derived) {
// - OnServerToClientMessage - $VALUE_TYPE = Message
// - OnClientToServerMessage - $VALUE_TYPE = Message
// - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata
// - OnFinalize - special, see below
// These members define an interception point for a particular event in
// the call lifecycle.
// The type of these members matters, and is selectable by the class
@ -706,6 +725,12 @@ MakeFilterCall(Derived* derived) {
// the filter can return nullptr for success, or a metadata handle for
// failure (in which case the call will be aborted).
// useful for cases where the exact metadata returned needs to be customized.
// Finally, OnFinalize can be added to intecept call finalization.
// It must have one of the signatures:
// - static const NoInterceptor OnFinalize:
// the filter does not intercept call finalization.
// - void OnFinalize(const grpc_call_final_info*):
// the filter intercepts call finalization.
template <typename Derived>
class ImplementChannelFilter : public ChannelFilter {
public:
@ -730,6 +755,7 @@ class ImplementChannelFilter : public ChannelFilter {
promise_filter_detail::InterceptServerTrailingMetadata(
&Derived::Call::OnServerTrailingMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, call);
}
// Polyfill for the original promise scheme.
@ -745,6 +771,9 @@ class ImplementChannelFilter : public ChannelFilter {
&Derived::Call::OnServerInitialMetadata, call, call_args);
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call, call_args);
promise_filter_detail::InterceptFinalize(
&Derived::Call::OnFinalize,
static_cast<typename Derived::Call*>(&call->call));
return promise_filter_detail::MapResult(
&Derived::Call::OnServerTrailingMetadata,
promise_filter_detail::RaceAsyncCompletion<

@ -42,19 +42,55 @@ namespace grpc_core {
namespace {
// TODO(yashykt): This filter is not really needed. We should be able to move
// this to the connected filter.
class ServerCallTracerFilter : public ChannelFilter {
class ServerCallTracerFilter
: public ImplementChannelFilter<ServerCallTracerFilter> {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerCallTracerFilter> Create(
const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/);
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& client_initial_metadata) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordReceivedInitialMetadata(&client_initial_metadata);
}
void OnServerInitialMetadata(ServerMetadata& server_initial_metadata) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordSendInitialMetadata(&server_initial_metadata);
}
void OnFinalize(const grpc_call_final_info* final_info) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordEnd(final_info);
}
void OnServerTrailingMetadata(ServerMetadata& server_trailing_metadata) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordSendTrailingMetadata(&server_trailing_metadata);
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
private:
static ServerCallTracer* CallTracer() {
auto* call_context = GetContext<grpc_call_context_element>();
return static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
}
};
};
const NoInterceptor ServerCallTracerFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerCallTracerFilter::Call::OnServerToClientMessage;
const grpc_channel_filter ServerCallTracerFilter::kFilter =
MakePromiseBasedFilter<ServerCallTracerFilter, FilterEndpoint::kServer,
kFilterExaminesServerInitialMetadata>(
@ -65,34 +101,6 @@ absl::StatusOr<ServerCallTracerFilter> ServerCallTracerFilter::Create(
return ServerCallTracerFilter();
}
ArenaPromise<ServerMetadataHandle> ServerCallTracerFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) {
return next_promise_factory(std::move(call_args));
}
call_tracer->RecordReceivedInitialMetadata(
call_args.client_initial_metadata.get());
call_args.server_initial_metadata->InterceptAndMap(
[call_tracer](ServerMetadataHandle metadata) {
call_tracer->RecordSendInitialMetadata(metadata.get());
return metadata;
});
GetContext<CallFinalization>()->Add(
[call_tracer](const grpc_call_final_info* final_info) {
call_tracer->RecordEnd(final_info);
});
return OnCancel(
Map(next_promise_factory(std::move(call_args)),
[call_tracer](ServerMetadataHandle md) {
call_tracer->RecordSendTrailingMetadata(md.get());
return md;
}),
[call_tracer]() { call_tracer->RecordCancel(absl::CancelledError()); });
}
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {

Loading…
Cancel
Save