[call-v3] Eliminate asynchronous server trailing metadata handling (#36984)

Originally when I was implementing call-v3 I added async handling for server trailing metadata because it was symmetrical with all the other handling we had. Turns out that we really don't need this, and further it's probably harmful to our ability to reason about the stack - so I'm removing that capability.

Closes #36984

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36984 from ctiller:no-infallible-seq f3b3548685
PiperOrigin-RevId: 646146666
pull/37033/head
Craig Tiller 8 months ago committed by Copybara-Service
parent 147557ba1d
commit b90afd3017
  1. 80
      src/core/lib/transport/call_filters.cc
  2. 381
      src/core/lib/transport/call_filters.h
  3. 108
      test/core/transport/call_filters_test.cc

@ -35,6 +35,16 @@ void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data) {
}
}
ServerMetadataHandle RunServerTrailingMetadata(
absl::Span<const ServerTrailingMetadataOperator> ops, void* call_data,
ServerMetadataHandle md) {
for (auto& op : ops) {
md = op.server_trailing_metadata(Offset(call_data, op.call_offset),
op.channel_data, std::move(md));
}
return md;
}
template <typename T>
OperationExecutor<T>::~OperationExecutor() {
if (promise_data_ != nullptr) {
@ -44,8 +54,8 @@ OperationExecutor<T>::~OperationExecutor() {
}
template <typename T>
Poll<ResultOr<T>> OperationExecutor<T>::Start(
const Layout<FallibleOperator<T>>* layout, T input, void* call_data) {
Poll<ResultOr<T>> OperationExecutor<T>::Start(const Layout<T>* layout, T input,
void* call_data) {
ops_ = layout->ops.data();
end_ops_ = ops_ + layout->ops.size();
if (layout->promise_size == 0) {
@ -101,75 +111,11 @@ Poll<ResultOr<T>> OperationExecutor<T>::ContinueStep(void* call_data) {
return Pending{};
}
template <typename T>
InfallibleOperationExecutor<T>::~InfallibleOperationExecutor() {
if (promise_data_ != nullptr) {
ops_->early_destroy(promise_data_);
gpr_free_aligned(promise_data_);
}
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::Start(
const Layout<InfallibleOperator<T>>* layout, T input, void* call_data) {
ops_ = layout->ops.data();
end_ops_ = ops_ + layout->ops.size();
if (layout->promise_size == 0) {
// No call state ==> instantaneously ready
auto r = InitStep(std::move(input), call_data);
CHECK(r.ready());
return r;
}
promise_data_ =
gpr_malloc_aligned(layout->promise_size, layout->promise_alignment);
return InitStep(std::move(input), call_data);
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::InitStep(T input, void* call_data) {
while (true) {
if (ops_ == end_ops_) {
return input;
}
auto p =
ops_->promise_init(promise_data_, Offset(call_data, ops_->call_offset),
ops_->channel_data, std::move(input));
if (auto* r = p.value_if_ready()) {
input = std::move(*r);
++ops_;
continue;
}
return Pending{};
}
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::Step(void* call_data) {
DCHECK_NE(promise_data_, nullptr);
auto p = ContinueStep(call_data);
if (p.ready()) {
gpr_free_aligned(promise_data_);
promise_data_ = nullptr;
}
return p;
}
template <typename T>
Poll<T> InfallibleOperationExecutor<T>::ContinueStep(void* call_data) {
auto p = ops_->poll(promise_data_);
if (auto* r = p.value_if_ready()) {
++ops_;
return InitStep(std::move(*r), call_data);
}
return Pending{};
}
// Explicit instantiations of some types used in filters.h
// We'll need to add ServerMetadataHandle to this when it becomes different
// to ClientMetadataHandle
template class OperationExecutor<ClientMetadataHandle>;
template class OperationExecutor<MessageHandle>;
template class InfallibleOperationExecutor<ServerMetadataHandle>;
} // namespace filters_detail
@ -285,7 +231,7 @@ RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
// in the same order
data_.server_initial_metadata.Reverse();
data_.server_to_client_messages.Reverse();
data_.server_trailing_metadata.Reverse();
absl::c_reverse(data_.server_trailing_metadata);
return RefCountedPtr<Stack>(new Stack(std::move(data_)));
}

@ -172,11 +172,10 @@ struct ResultOr {
};
// One filter operation metadata
// Given a value of type V, produces a promise of type R.
template <typename R, typename V>
// Given a value of type T, produces a promise of type ResultOr<T>.
template <typename T>
struct Operator {
using Result = R;
using Arg = V;
using Arg = T;
// Pointer to corresponding channel data for this filter
void* channel_data;
// Offset of the call data for this filter within the call data memory
@ -184,13 +183,13 @@ struct Operator {
// Initialize the promise data for this filter, and poll once.
// Return the result of the poll.
// If the promise finishes, also destroy the promise data!
Poll<R> (*promise_init)(void* promise_data, void* call_data,
void* channel_data, V value);
Poll<ResultOr<T>> (*promise_init)(void* promise_data, void* call_data,
void* channel_data, T value);
// Poll the promise data for this filter.
// If the promise finishes, also destroy the promise data!
// Note that if the promise always finishes on the first poll, then supplying
// this method is unnecessary (as it will never be called).
Poll<R> (*poll)(void* promise_data);
Poll<ResultOr<T>> (*poll)(void* promise_data);
// Destroy the promise data for this filter for an in-progress operation
// before the promise finishes.
// Note that if the promise always finishes on the first poll, then supplying
@ -206,24 +205,19 @@ struct HalfCloseOperator {
void (*half_close)(void* call_data, void* channel_data);
};
void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data);
struct ServerTrailingMetadataOperator {
// Pointer to corresponding channel data for this filter
void* channel_data;
// Offset of the call data for this filter within the call data memory
size_t call_offset;
ServerMetadataHandle (*server_trailing_metadata)(
void* call_data, void* channel_data, ServerMetadataHandle metadata);
};
// We divide operations into fallible and infallible.
// Fallible operations can fail, and that failure terminates the call.
// Infallible operations cannot fail.
// Fallible operations are used for client initial, and server initial metadata,
// and messages.
// Infallible operations are used for server trailing metadata.
// (This is because server trailing metadata occurs when the call is finished -
// and so we couldn't possibly become more finished - and also because it's the
// preferred representation of failure anyway!)
// An operation that could fail: takes a T argument, produces a ResultOr<T>
template <typename T>
using FallibleOperator = Operator<ResultOr<T>, T>;
// And one that cannot: takes a T argument, produces a T
template <typename T>
using InfallibleOperator = Operator<T, T>;
void RunHalfClose(absl::Span<const HalfCloseOperator> ops, void* call_data);
ServerMetadataHandle RunServerTrailingMetadata(
absl::Span<const ServerTrailingMetadataOperator> ops, void* call_data,
ServerMetadataHandle md);
// One call finalizer
struct Finalizer {
@ -235,19 +229,20 @@ struct Finalizer {
// A layout of operations for a given filter stack
// This includes which operations, how much memory is required, what alignment.
template <typename Op>
template <typename T>
struct Layout {
size_t promise_size = 0;
size_t promise_alignment = 0;
std::vector<Op> ops;
std::vector<Operator<T>> ops;
void Add(size_t filter_promise_size, size_t filter_promise_alignment, Op op) {
void Add(size_t filter_promise_size, size_t filter_promise_alignment,
Operator<T> op) {
promise_size = std::max(promise_size, filter_promise_size);
promise_alignment = std::max(promise_alignment, filter_promise_alignment);
ops.push_back(op);
}
void Reverse() { std::reverse(ops.begin(), ops.end()); }
void Reverse() { absl::c_reverse(ops); }
};
// AddOp and friends
@ -267,16 +262,7 @@ struct AddOpImpl;
template <typename FunctionImpl, FunctionImpl impl, typename FilterType,
typename T>
void AddOp(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset,
to);
}
template <typename FunctionImpl, FunctionImpl impl, typename FilterType,
typename T>
void AddOp(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
void AddOp(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset,
to);
}
@ -307,13 +293,73 @@ template <typename FilterType>
void AddHalfClose(FilterType*, size_t, const NoInterceptor*,
std::vector<HalfCloseOperator>&) {}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
void (FilterType::Call::*)(ServerMetadata&),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void*, ServerMetadataHandle metadata) {
static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(*metadata);
return metadata;
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
void (FilterType::Call::*)(ServerMetadata&, FilterType*),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void* channel_data, ServerMetadataHandle metadata) {
static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(*metadata,
static_cast<FilterType*>(channel_data));
return metadata;
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
absl::Status (FilterType::Call::*)(ServerMetadata&),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void*, ServerMetadataHandle metadata) {
auto r = static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(*metadata);
if (r.ok()) return metadata;
return CancelledServerMetadataFromStatus(r);
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(
FilterType* channel_data, size_t call_offset,
ServerMetadataHandle (FilterType::Call::*)(ServerMetadataHandle),
std::vector<ServerTrailingMetadataOperator>& to) {
to.push_back(ServerTrailingMetadataOperator{
channel_data, call_offset,
[](void* call_data, void*, ServerMetadataHandle metadata) {
return static_cast<typename FilterType::Call*>(call_data)
->OnServerTrailingMetadata(std::move(metadata));
}});
}
template <typename FilterType>
void AddServerTrailingMetadata(FilterType*, size_t, const NoInterceptor*,
std::vector<ServerTrailingMetadataOperator>&) {}
// const NoInterceptor $EVENT
// These do nothing, and specifically DO NOT add an operation to the layout.
// Supported for fallible & infallible operations.
template <typename FilterType, typename T, const NoInterceptor* which>
struct AddOpImpl<FilterType, T, const NoInterceptor*, which> {
static void Add(FilterType*, size_t, Layout<FallibleOperator<T>>&) {}
static void Add(FilterType*, size_t, Layout<InfallibleOperator<T>>&) {}
static void Add(FilterType*, size_t, Layout<T>&) {}
};
// void $INTERCEPTOR_NAME($VALUE_TYPE&)
@ -321,10 +367,9 @@ template <typename FilterType, typename T,
void (FilterType::Call::*impl)(typename T::element_type&)>
struct AddOpImpl<FilterType, T,
void (FilterType::Call::*)(typename T::element_type&), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -336,21 +381,6 @@ struct AddOpImpl<FilterType, T,
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<T> {
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value);
return std::move(value);
},
nullptr,
nullptr,
});
}
};
// void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*)
@ -360,10 +390,9 @@ template <typename FilterType, typename T,
struct AddOpImpl<
FilterType, T,
void (FilterType::Call::*)(typename T::element_type&, FilterType*), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -376,33 +405,16 @@ struct AddOpImpl<
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(
0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data, T value) -> Poll<T> {
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value, static_cast<FilterType*>(channel_data));
return std::move(value);
},
nullptr,
nullptr,
});
}
};
// $VALUE_HANDLE $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
template <typename FilterType, typename T,
T (FilterType::Call::*impl)(T, FilterType*)>
struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -416,24 +428,6 @@ struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> {
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(
0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data, T value) -> Poll<T> {
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value, static_cast<FilterType*>(channel_data));
return (
static_cast<typename FilterType::Call*>(call_data)->*impl)(
std::move(value), static_cast<FilterType*>(channel_data));
},
nullptr,
nullptr,
});
}
};
// absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&)
@ -442,11 +436,10 @@ template <typename FilterType, typename T,
struct AddOpImpl<FilterType, T,
absl::Status (FilterType::Call::*)(typename T::element_type&),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -461,24 +454,6 @@ struct AddOpImpl<FilterType, T,
nullptr,
});
}
static void Add(FilterType* channel_data, size_t call_offset,
Layout<InfallibleOperator<T>>& to) {
to.Add(
0, 0,
InfallibleOperator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<T> {
auto r =
(static_cast<typename FilterType::Call*>(call_data)->*impl)(
*value);
if (r.ok()) return std::move(value);
return StatusCast<ServerMetadataHandle>(std::move(r));
},
nullptr,
nullptr,
});
}
};
// absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&)
@ -488,11 +463,10 @@ template <typename FilterType, typename T,
struct AddOpImpl<
FilterType, T,
absl::Status (FilterType::Call::*)(const typename T::element_type&), impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -517,11 +491,10 @@ struct AddOpImpl<FilterType, T,
absl::Status (FilterType::Call::*)(typename T::element_type&,
FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -547,11 +520,10 @@ struct AddOpImpl<FilterType, T,
absl::Status (FilterType::Call::*)(
const typename T::element_type&, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -575,11 +547,10 @@ template <typename FilterType, typename T,
struct AddOpImpl<FilterType, T,
absl::StatusOr<T> (FilterType::Call::*)(T, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -605,11 +576,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
typename T::element_type&),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -634,11 +604,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
const typename T::element_type&),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> {
@ -663,11 +632,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
typename T::element_type&, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -693,11 +661,10 @@ struct AddOpImpl<FilterType, T,
ServerMetadataHandle (FilterType::Call::*)(
const typename T::element_type&, FilterType*),
impl> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
to.Add(
0, 0,
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void*, void* call_data, void* channel_data,
@ -721,8 +688,7 @@ template <typename FilterType, typename T, typename R,
struct AddOpImpl<
FilterType, T, R (FilterType::Call::*)(typename T::element_type&), impl,
absl::enable_if_t<std::is_same<absl::Status, PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data, FilterType*)
@ -745,7 +711,7 @@ struct AddOpImpl<
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
@ -774,8 +740,7 @@ struct AddOpImpl<
R (FilterType::Call::*)(typename T::element_type&, FilterType*), impl,
absl::enable_if_t<!std::is_same<R, absl::Status>::value &&
std::is_same<absl::Status, PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data,
@ -800,7 +765,7 @@ struct AddOpImpl<
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
@ -828,8 +793,7 @@ template <typename FilterType, typename T, typename R,
struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl,
absl::enable_if_t<std::is_same<absl::StatusOr<T>,
PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
static void Add(FilterType* channel_data, size_t call_offset, Layout<T>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data,
@ -850,7 +814,7 @@ struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl,
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
Operator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
@ -892,12 +856,12 @@ struct StackData {
// For each kind of operation, a layout of the operations for this call.
// (there's some duplicate data here, and that's ok: we want to avoid
// pointer chasing as much as possible when executing a call)
Layout<FallibleOperator<ClientMetadataHandle>> client_initial_metadata;
Layout<FallibleOperator<ServerMetadataHandle>> server_initial_metadata;
Layout<FallibleOperator<MessageHandle>> client_to_server_messages;
Layout<ClientMetadataHandle> client_initial_metadata;
Layout<ServerMetadataHandle> server_initial_metadata;
Layout<MessageHandle> client_to_server_messages;
std::vector<HalfCloseOperator> client_to_server_half_close;
Layout<FallibleOperator<MessageHandle>> server_to_client_messages;
Layout<InfallibleOperator<ServerMetadataHandle>> server_trailing_metadata;
Layout<MessageHandle> server_to_client_messages;
std::vector<ServerTrailingMetadataOperator> server_trailing_metadata;
// A list of finalizers for this call.
// We use a bespoke data structure here because finalizers can never be
// asynchronous.
@ -1035,9 +999,9 @@ struct StackData {
template <typename FilterType>
void AddServerTrailingMetadataOp(FilterType* channel_data,
size_t call_offset) {
AddOp<decltype(&FilterType::Call::OnServerTrailingMetadata),
&FilterType::Call::OnServerTrailingMetadata>(
channel_data, call_offset, server_trailing_metadata);
AddServerTrailingMetadata(channel_data, call_offset,
&FilterType::Call::OnServerTrailingMetadata,
server_trailing_metadata);
}
// Finalizer interception adders
@ -1109,8 +1073,7 @@ class OperationExecutor {
// Start executing a layout. May allocate space to store the relevant promise.
// Returns the result of the first poll.
// If the promise finishes, also destroy the promise data.
Poll<ResultOr<T>> Start(const Layout<FallibleOperator<T>>* layout, T input,
void* call_data);
Poll<ResultOr<T>> Start(const Layout<T>* layout, T input, void* call_data);
// Continue executing a layout. Returns the result of the next poll.
// If the promise finishes, also destroy the promise data.
Poll<ResultOr<T>> Step(void* call_data);
@ -1132,63 +1095,8 @@ class OperationExecutor {
Poll<ResultOr<T>> ContinueStep(void* call_data);
void* promise_data_ = nullptr;
const FallibleOperator<T>* ops_;
const FallibleOperator<T>* end_ops_;
};
// Per OperationExecutor, but for infallible operation sequences.
template <typename T>
class InfallibleOperationExecutor {
public:
InfallibleOperationExecutor() = default;
~InfallibleOperationExecutor();
InfallibleOperationExecutor(const InfallibleOperationExecutor&) = delete;
InfallibleOperationExecutor& operator=(const InfallibleOperationExecutor&) =
delete;
InfallibleOperationExecutor(InfallibleOperationExecutor&& other) noexcept
: ops_(other.ops_), end_ops_(other.end_ops_) {
// Movable iff we're not running.
DCHECK_EQ(other.promise_data_, nullptr);
}
InfallibleOperationExecutor& operator=(
InfallibleOperationExecutor&& other) noexcept {
DCHECK_EQ(other.promise_data_, nullptr);
DCHECK_EQ(promise_data_, nullptr);
ops_ = other.ops_;
end_ops_ = other.end_ops_;
return *this;
}
// IsRunning() is true if we're currently executing a sequence of operations.
bool IsRunning() const { return promise_data_ != nullptr; }
// Start executing a layout. May allocate space to store the relevant promise.
// Returns the result of the first poll.
// If the promise finishes, also destroy the promise data.
Poll<T> Start(const Layout<InfallibleOperator<T>>* layout, T input,
void* call_data);
// Continue executing a layout. Returns the result of the next poll.
// If the promise finishes, also destroy the promise data.
Poll<T> Step(void* call_data);
private:
// Start polling on the current step of the layout.
// `input` is the current value (either the input to the first step, or the
// so far transformed value)
// `call_data` is the call data for the filter stack.
// If this op finishes immediately then we iterative move to the next step.
// If we reach the end up the ops, we return the overall poll result,
// otherwise we return Pending.
Poll<T> InitStep(T input, void* call_data);
// Continue polling on the current step of the layout.
// Called on the next poll after InitStep returns pending.
// If the promise is still pending, returns this.
// If the promise completes we call into InitStep to continue execution
// through the filters.
Poll<T> ContinueStep(void* call_data);
void* promise_data_ = nullptr;
const InfallibleOperator<T>* ops_;
const InfallibleOperator<T>* end_ops_;
const Operator<T>* ops_;
const Operator<T>* end_ops_;
};
class CallState {
@ -1621,8 +1529,7 @@ class CallFilters {
template <typename Output, typename Input,
Input(CallFilters::*input_location),
filters_detail::Layout<filters_detail::FallibleOperator<Input>>(
filters_detail::StackData::*layout),
filters_detail::Layout<Input>(filters_detail::StackData::*layout),
void (filters_detail::CallState::*on_done)()>
auto RunExecutor() {
DCHECK_NE((this->*input_location).get(), nullptr);
@ -1755,33 +1662,15 @@ class CallFilters {
// Client: Fetch server trailing metadata
// Returns a promise that resolves to ServerMetadataHandle
GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata() {
return Seq(
return Map(
[this]() { return call_state_.PollServerTrailingMetadataAvailable(); },
[this](Empty) {
filters_detail::InfallibleOperationExecutor<ServerMetadataHandle>
executor;
return [this, executor = std::move(
executor)]() mutable -> Poll<ServerMetadataHandle> {
auto finish_step = [this](Poll<ServerMetadataHandle> p)
-> Poll<ServerMetadataHandle> {
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
call_state_.FinishPullServerTrailingMetadata();
return std::move(*r);
};
if (push_server_trailing_metadata_ != nullptr) {
// If no stack has been set, we can just return the result of the
// call
if (stack_ == nullptr) {
call_state_.FinishPullServerTrailingMetadata();
return std::move(push_server_trailing_metadata_);
}
return finish_step(executor.Start(
&(stack_->data_.server_trailing_metadata),
std::move(push_server_trailing_metadata_), call_data_));
}
return finish_step(executor.Step(call_data_));
};
auto result = std::move(push_server_trailing_metadata_);
call_state_.FinishPullServerTrailingMetadata();
if (call_data_ == nullptr) return result;
return filters_detail::RunServerTrailingMetadata(
stack_->data_.server_trailing_metadata, call_data_,
std::move(result));
});
}
// Server: Wait for server trailing metadata to have been sent

@ -72,17 +72,16 @@ class MockActivity : public Activity, public Wakeable {
namespace filters_detail {
TEST(LayoutTest, Empty) {
Layout<FallibleOperator<ClientMetadataHandle>> l;
Layout<ClientMetadataHandle> l;
ASSERT_EQ(l.ops.size(), 0u);
EXPECT_EQ(l.promise_size, 0u);
EXPECT_EQ(l.promise_alignment, 0u);
}
TEST(LayoutTest, Add) {
Layout<FallibleOperator<ClientMetadataHandle>> l;
Layout<ClientMetadataHandle> l;
l.Add(1, 4,
FallibleOperator<ClientMetadataHandle>{&l, 120, nullptr, nullptr,
nullptr});
Operator<ClientMetadataHandle>{&l, 120, nullptr, nullptr, nullptr});
ASSERT_EQ(l.ops.size(), 1u);
EXPECT_EQ(l.promise_size, 1u);
EXPECT_EQ(l.promise_alignment, 4u);
@ -872,7 +871,7 @@ TEST(StackDataTest, InstantServerToClientMessagesReturningVoid) {
EXPECT_EQ(r.value().ok->flags(), 1u);
}
TEST(StackDataTest, InstantServerTrailingMetadataReturningVoid) {
TEST(StackDataTest, ServerTrailingMetadataReturningVoid) {
struct Filter1 {
struct Call {
void OnServerTrailingMetadata(ServerMetadata& md) {
@ -888,27 +887,20 @@ TEST(StackDataTest, InstantServerTrailingMetadataReturningVoid) {
d.AddServerTrailingMetadataOp(&f1, call_offset);
ASSERT_EQ(d.filter_constructor.size(), 0u);
ASSERT_EQ(d.filter_destructor.size(), 0u);
ASSERT_EQ(d.server_trailing_metadata.ops.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata.ops[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata.ops[0].channel_data, &f1);
// Instant => no poll/early destroy
EXPECT_EQ(d.server_trailing_metadata.ops[0].poll, nullptr);
EXPECT_EQ(d.server_trailing_metadata.ops[0].early_destroy, nullptr);
// Check promise init
ASSERT_EQ(d.server_trailing_metadata.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata[0].channel_data, &f1);
// Check operation
auto arena = SimpleArenaAllocator()->MakeArena();
auto md = Arena::MakePooled<ServerMetadata>();
EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
char call_data;
auto r = d.server_trailing_metadata.ops[0].promise_init(
nullptr, &call_data, d.server_trailing_metadata.ops[0].channel_data,
std::move(md));
EXPECT_TRUE(r.ready());
EXPECT_EQ(r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"hello");
auto r = d.server_trailing_metadata[0].server_trailing_metadata(
&call_data, d.server_trailing_metadata[0].channel_data, std::move(md));
EXPECT_EQ(r->get_pointer(HttpPathMetadata())->as_string_view(), "hello");
}
TEST(StackDataTest,
InstantServerTrailingMetadataReturningVoidTakingChannelPtr) {
TEST(StackDataTest, ServerTrailingMetadataReturningVoidTakingChannelPtr) {
struct Filter1 {
struct Call {
void OnServerTrailingMetadata(ServerMetadata& md, Filter1* p) {
@ -926,23 +918,17 @@ TEST(StackDataTest,
d.AddServerTrailingMetadataOp(&f1, call_offset);
ASSERT_EQ(d.filter_constructor.size(), 0u);
ASSERT_EQ(d.filter_destructor.size(), 0u);
ASSERT_EQ(d.server_trailing_metadata.ops.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata.ops[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata.ops[0].channel_data, &f1);
// Instant => no poll/early destroy
EXPECT_EQ(d.server_trailing_metadata.ops[0].poll, nullptr);
EXPECT_EQ(d.server_trailing_metadata.ops[0].early_destroy, nullptr);
// Check promise init
ASSERT_EQ(d.server_trailing_metadata.size(), 1u);
EXPECT_EQ(d.server_trailing_metadata[0].call_offset, call_offset);
EXPECT_EQ(d.server_trailing_metadata[0].channel_data, &f1);
// Check operation
auto arena = SimpleArenaAllocator()->MakeArena();
auto md = Arena::MakePooled<ServerMetadata>();
EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
char call_data;
auto r = d.server_trailing_metadata.ops[0].promise_init(
nullptr, &call_data, d.server_trailing_metadata.ops[0].channel_data,
std::move(md));
EXPECT_TRUE(r.ready());
EXPECT_EQ(r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"hello");
auto r = d.server_trailing_metadata[0].server_trailing_metadata(
&call_data, d.server_trailing_metadata[0].channel_data, std::move(md));
EXPECT_EQ(r->get_pointer(HttpPathMetadata())->as_string_view(), "hello");
EXPECT_THAT(f1.v, ::testing::ElementsAre(42));
}
@ -964,13 +950,13 @@ TEST(StackBuilderTest, AddOnServerTrailingMetadata) {
[x = std::make_unique<int>(42)](ServerMetadata&) { EXPECT_EQ(*x, 42); });
auto stack = b.Build();
const auto& data = CallFilters::StackTestSpouse().StackDataFrom(*stack);
ASSERT_EQ(data.server_trailing_metadata.ops.size(), 1u);
ASSERT_EQ(data.server_trailing_metadata.size(), 1u);
ASSERT_EQ(data.client_initial_metadata.ops.size(), 0u);
ASSERT_EQ(data.client_to_server_messages.ops.size(), 0u);
ASSERT_EQ(data.server_to_client_messages.ops.size(), 0u);
ASSERT_EQ(data.server_initial_metadata.ops.size(), 0u);
EXPECT_EQ(data.server_trailing_metadata.ops[0].call_offset, 0);
EXPECT_NE(data.server_trailing_metadata.ops[0].channel_data, nullptr);
EXPECT_EQ(data.server_trailing_metadata[0].call_offset, 0);
EXPECT_NE(data.server_trailing_metadata[0].channel_data, nullptr);
}
///////////////////////////////////////////////////////////////////////////////
@ -1116,56 +1102,6 @@ TEST(OperationExecutorTest, PromiseTwo) {
gpr_free_aligned(call_data1);
}
} // namespace filters_detail
///////////////////////////////////////////////////////////////////////////////
// InfallibleOperationExecutor
namespace filters_detail {
TEST(InfallibleOperationExecutor, NoOp) {
OperationExecutor<ServerMetadataHandle> pipe;
EXPECT_FALSE(pipe.IsRunning());
}
TEST(InfallibleOperationExecutor, InstantTwo) {
class Filter1 {
public:
class Call {
public:
void OnServerTrailingMetadata(ClientMetadata& md) {
if (md.get_pointer(HttpPathMetadata()) != nullptr) {
md.Set(HttpPathMetadata(), Slice::FromStaticString("world"));
} else {
md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
}
}
};
};
StackData d;
Filter1 f1;
Filter1 f2;
const size_t call_offset1 = d.AddFilter(&f1);
const size_t call_offset2 = d.AddFilter(&f2);
d.AddServerTrailingMetadataOp(&f1, call_offset1);
d.AddServerTrailingMetadataOp(&f2, call_offset2);
ASSERT_EQ(d.filter_constructor.size(), 0u);
ASSERT_EQ(d.filter_destructor.size(), 0u);
ASSERT_EQ(d.server_trailing_metadata.ops.size(), 2u);
void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
InfallibleOperationExecutor<ServerMetadataHandle> transformer;
auto arena = SimpleArenaAllocator()->MakeArena();
promise_detail::Context<Arena> ctx(arena.get());
auto md = Arena::MakePooled<ServerMetadata>();
EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
auto r =
transformer.Start(&d.server_trailing_metadata, std::move(md), call_data);
EXPECT_TRUE(r.ready());
EXPECT_EQ(r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"world");
gpr_free_aligned(call_data);
}
///////////////////////////////////////////////////////////////////////////////
// CallState

Loading…
Cancel
Save