[promises] Convert http-client filter to v3 filter API (#35189)

Also start building a temporary wrapping layer so that the new style filters can execute as promise filters directly.

Closes #35189

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35189 from ctiller:cg-http-cli dbd73e8aca
PiperOrigin-RevId: 587060881
pull/35138/head
Craig Tiller 1 year ago committed by Copybara-Service
parent e481f6acc5
commit 49f7ee96d1
  1. 54
      src/core/ext/filters/http/client/http_client_filter.cc
  2. 14
      src/core/ext/filters/http/client/http_client_filter.h
  3. 285
      src/core/lib/channel/promise_based_filter.h

@ -51,6 +51,9 @@
namespace grpc_core {
const NoInterceptor HttpClientFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpClientFilter::Call::OnClientToServerMessage;
const grpc_channel_filter HttpClientFilter::kFilter =
MakePromiseBasedFilter<HttpClientFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata>("http-client");
@ -105,40 +108,27 @@ Slice UserAgentFromArgs(const ChannelArgs& args,
}
} // namespace
ArenaPromise<ServerMetadataHandle> HttpClientFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto& md = call_args.client_initial_metadata;
if (test_only_use_put_requests_) {
md->Set(HttpMethodMetadata(), HttpMethodMetadata::kPut);
void HttpClientFilter::Call::OnClientInitialMetadata(ClientMetadata& md,
HttpClientFilter* filter) {
if (filter->test_only_use_put_requests_) {
md.Set(HttpMethodMetadata(), HttpMethodMetadata::kPut);
} else {
md->Set(HttpMethodMetadata(), HttpMethodMetadata::kPost);
md.Set(HttpMethodMetadata(), HttpMethodMetadata::kPost);
}
md->Set(HttpSchemeMetadata(), scheme_);
md->Set(TeMetadata(), TeMetadata::kTrailers);
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
md->Set(UserAgentMetadata(), user_agent_.Ref());
auto* initial_metadata_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap(
[initial_metadata_err](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto r = CheckServerMetadata(md.get());
if (!r.ok()) {
initial_metadata_err->Set(ServerMetadataFromStatus(r));
return absl::nullopt;
}
return std::move(md);
});
return Race(initial_metadata_err->Wait(),
Map(next_promise_factory(std::move(call_args)),
[](ServerMetadataHandle md) -> ServerMetadataHandle {
auto r = CheckServerMetadata(md.get());
if (!r.ok()) return ServerMetadataFromStatus(r);
return md;
}));
md.Set(HttpSchemeMetadata(), filter->scheme_);
md.Set(TeMetadata(), TeMetadata::kTrailers);
md.Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
md.Set(UserAgentMetadata(), filter->user_agent_.Ref());
}
absl::Status HttpClientFilter::Call::OnServerInitialMetadata(
ServerMetadata& md) {
return CheckServerMetadata(&md);
}
absl::Status HttpClientFilter::Call::OnServerTrailingMetadata(
ServerMetadata& md) {
return CheckServerMetadata(&md);
}
HttpClientFilter::HttpClientFilter(HttpSchemeMetadata::ValueType scheme,

@ -25,23 +25,27 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
class HttpClientFilter : public ChannelFilter {
class HttpClientFilter : public ImplementChannelFilter<HttpClientFilter> {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<HttpClientFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md, HttpClientFilter* filter);
absl::Status OnServerInitialMetadata(ServerMetadata& md);
absl::Status OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
};
private:
HttpClientFilter(HttpSchemeMetadata::ValueType scheme, Slice user_agent,

@ -43,6 +43,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
@ -60,6 +61,7 @@
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
@ -122,6 +124,289 @@ class ChannelFilter {
grpc_event_engine::experimental::GetDefaultEventEngine();
};
struct NoInterceptor {};
namespace promise_filter_detail {
// Determine if a list of interceptors has any that need to asyncronously error
// the promise. If so, we need to allocate a latch for the generated promise for
// the original promise stack polyfill code that's generated.
inline constexpr bool HasAsyncErrorInterceptor() { return false; }
inline constexpr bool HasAsyncErrorInterceptor(const NoInterceptor*) {
return false;
}
template <typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(absl::Status (T::*)(A...)) {
return true;
}
template <typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(void (T::*)(A...)) {
return false;
}
// For the list case we do two interceptors to avoid amiguities with the single
// argument forms above.
template <typename I1, typename I2, typename... Interceptors>
inline constexpr bool HasAsyncErrorInterceptor(I1 i1, I2 i2,
Interceptors... interceptors) {
return HasAsyncErrorInterceptor(i1) || HasAsyncErrorInterceptor(i2) ||
HasAsyncErrorInterceptor(interceptors...);
}
// Composite for a given channel type to determine if any of its interceptors
// fall into this category: later code should use this.
template <typename Derived>
inline constexpr bool CallHasAsyncErrorInterceptor() {
return HasAsyncErrorInterceptor(&Derived::Call::OnClientToServerMessage,
&Derived::Call::OnServerInitialMetadata,
&Derived::Call::OnServerToClientMessage);
}
// Determine if an interceptor needs to access the channel via one of its
// arguments. If so, we need to allocate a pointer to the channel for the
// generated polyfill promise for the original promise stack.
inline constexpr bool HasChannelAccess() { return false; }
inline constexpr bool HasChannelAccess(const NoInterceptor*) { return false; }
template <typename T, typename R, typename A>
inline constexpr bool HasChannelAccess(R (T::*)(A)) {
return false;
}
template <typename T, typename R, typename A, typename C>
inline constexpr bool HasChannelAccess(R (T::*)(A, C)) {
return true;
}
// For the list case we do two interceptors to avoid amiguities with the single
// argument forms above.
template <typename I1, typename I2, typename... Interceptors>
inline constexpr bool HasChannelAccess(I1 i1, I2 i2,
Interceptors... interceptors) {
return HasChannelAccess(i1) || HasChannelAccess(i2) ||
HasChannelAccess(interceptors...);
}
// Composite for a given channel type to determine if any of its interceptors
// fall into this category: later code should use this.
template <typename Derived>
inline constexpr bool CallHasChannelAccess() {
return HasChannelAccess(&Derived::Call::OnClientInitialMetadata,
&Derived::Call::OnClientToServerMessage,
&Derived::Call::OnServerInitialMetadata,
&Derived::Call::OnServerToClientMessage,
&Derived::Call::OnServerTrailingMetadata);
}
// Given a boolean X export a type:
// either T if X is true
// or an empty type if it is false
template <typename T, bool X>
struct TypeIfNeeded;
template <typename T>
struct TypeIfNeeded<T, false> {
struct Type {
Type() = default;
template <typename Whatever>
explicit Type(Whatever) : Type() {}
};
};
template <typename T>
struct TypeIfNeeded<T, true> {
using Type = T;
};
// For the original promise scheme polyfill:
// If a set of interceptors might fail asynchronously, wrap the main
// promise in a race with the cancellation latch.
// If not, just return the main promise.
template <bool X>
struct RaceAsyncCompletion;
template <>
struct RaceAsyncCompletion<false> {
template <typename Promise>
static Promise Run(Promise x, void*) {
return x;
}
};
template <>
struct RaceAsyncCompletion<true> {
template <typename Promise>
static Promise Run(Promise x, Latch<ServerMetadataHandle>* latch) {
return Race(latch->Wait(), std::move(x));
}
};
// For the original promise scheme polyfill: data associated with once call.
template <typename Derived>
struct FilterCallData {
explicit FilterCallData(Derived* channel) : channel(channel) {}
GPR_NO_UNIQUE_ADDRESS typename Derived::Call call;
GPR_NO_UNIQUE_ADDRESS
typename TypeIfNeeded<Latch<ServerMetadataHandle>,
CallHasAsyncErrorInterceptor<Derived>()>::Type
error_latch;
GPR_NO_UNIQUE_ADDRESS
typename TypeIfNeeded<Derived*, CallHasChannelAccess<Derived>()>::Type
channel;
};
template <typename Promise>
auto MapResult(const NoInterceptor*, Promise x, void*) {
return x;
}
template <typename Promise, typename Derived>
auto MapResult(absl::Status (Derived::Call::*fn)(ServerMetadata&), Promise x,
FilterCallData<Derived>* call_data) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
return Map(std::move(x), [call_data](ServerMetadataHandle md) {
auto status = call_data->call.OnServerTrailingMetadata(*md);
if (!status.ok()) return ServerMetadataFromStatus(status);
return md;
});
}
inline auto RunCall(const NoInterceptor*, CallArgs call_args,
NextPromiseFactory next_promise_factory, void*) {
return next_promise_factory(std::move(call_args));
}
template <typename Derived>
inline auto RunCall(void (Derived::Call::*fn)(ClientMetadata& md),
CallArgs call_args, NextPromiseFactory next_promise_factory,
FilterCallData<Derived>* call_data) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_data->call.OnClientInitialMetadata(*call_args.client_initial_metadata);
return next_promise_factory(std::move(call_args));
}
template <typename Derived>
inline auto RunCall(void (Derived::Call::*fn)(ClientMetadata& md,
Derived* channel),
CallArgs call_args, NextPromiseFactory next_promise_factory,
FilterCallData<Derived>* call_data) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_data->call.OnClientInitialMetadata(*call_args.client_initial_metadata,
call_data->channel);
return next_promise_factory(std::move(call_args));
}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*,
CallArgs&) {}
inline void InterceptServerInitialMetadata(const NoInterceptor*, void*,
CallArgs&) {}
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
FilterCallData<Derived>* call_data, CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap(
[call_data](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call_data->call.OnServerInitialMetadata(*md);
if (!status.ok() && !call_data->error_latch.is_set()) {
call_data->error_latch.Set(ServerMetadataFromStatus(status));
return absl::nullopt;
}
return std::move(md);
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*,
CallArgs&) {}
template <typename Derived>
absl::enable_if_t<std::is_empty<FilterCallData<Derived>>::value,
FilterCallData<Derived>*>
MakeFilterCall(Derived*) {
static FilterCallData<Derived> call{nullptr};
return &call;
}
template <typename Derived>
absl::enable_if_t<!std::is_empty<FilterCallData<Derived>>::value,
FilterCallData<Derived>*>
MakeFilterCall(Derived* derived) {
return GetContext<Arena>()->ManagedNew<FilterCallData<Derived>>(derived);
}
} // namespace promise_filter_detail
// Base class for promise-based channel filters.
// Eventually this machinery will move elsewhere (the interception logic will
// move directly into the channel stack, and so filters will just directly
// derive from `ChannelFilter`)
//
// Implements new-style call filters, and polyfills them into the previous
// scheme.
//
// Call filters:
// Derived types should declare a class `Call` with the following members:
// - OnClientInitialMetadata - $VALUE_TYPE = ClientMetadata
// - OnServerInitialMetadata - $VALUE_TYPE = ServerMetadata
// - OnServerToClientMessage - $VALUE_TYPE = Message
// - OnClientToServerMessage - $VALUE_TYPE = Message
// - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata
// These members define an interception point for a particular event in
// the call lifecycle.
// The type of these members matters, and is selectable by the class
// author. For $INTERCEPTOR_NAME in the above list:
// - static const NoInterceptor $INTERCEPTOR_NAME:
// defines that this filter does not intercept this event.
// there is zero runtime cost added to handling that event by this filter.
// - void $INTERCEPTOR_NAME($VALUE_TYPE&):
// the filter intercepts this event, and can modify the value.
// it never fails.
// - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&):
// the filter intercepts this event, and can modify the value.
// it can fail, in which case the call will be aborted.
// - void $INTERCEPTOR_NAME($VALUE_TYPE&, Derived*):
// the filter intercepts this event, and can modify the value.
// it can access the channel via the second argument.
// it never fails.
// - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, Derived*):
// the filter intercepts this event, and can modify the value.
// it can access the channel via the second argument.
// it can fail, in which case the call will be aborted.
template <typename Derived>
class ImplementChannelFilter : public ChannelFilter {
public:
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) final {
auto* call = promise_filter_detail::MakeFilterCall<Derived>(
static_cast<Derived*>(this));
promise_filter_detail::InterceptClientToServerMessage(
&Derived::Call::OnClientToServerMessage, call, call_args);
promise_filter_detail::InterceptServerInitialMetadata(
&Derived::Call::OnServerInitialMetadata, call, call_args);
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call, call_args);
return promise_filter_detail::MapResult(
&Derived::Call::OnServerTrailingMetadata,
promise_filter_detail::RaceAsyncCompletion<
promise_filter_detail::CallHasAsyncErrorInterceptor<Derived>()>::
Run(promise_filter_detail::RunCall(
&Derived::Call::OnClientInitialMetadata,
std::move(call_args), std::move(next_promise_factory),
call),
&call->error_latch),
call);
}
};
// Designator for whether a filter is client side or server side.
// Please don't use this outside calls to MakePromiseBasedFilter - it's
// intended to be deleted once the promise conversion is complete.

Loading…
Cancel
Save