|
|
|
@ -23,6 +23,8 @@ |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/gprpp/ref_counted.h" |
|
|
|
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
|
|
|
#include "src/core/lib/promise/latch.h" |
|
|
|
|
#include "src/core/lib/promise/map.h" |
|
|
|
|
#include "src/core/lib/promise/promise.h" |
|
|
|
|
#include "src/core/lib/promise/status_flag.h" |
|
|
|
|
#include "src/core/lib/transport/call_final_info.h" |
|
|
|
@ -771,6 +773,55 @@ struct AddOpImpl< |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// PROMISE_RETURNING(absl::StatusOr<$VALUE_HANDLE>)
|
|
|
|
|
// $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
|
|
|
|
|
template <typename FilterType, typename T, typename R, |
|
|
|
|
R (FilterType::Call::*impl)(T, FilterType*)> |
|
|
|
|
struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl, |
|
|
|
|
absl::enable_if_t<std::is_same<absl::StatusOr<T>, |
|
|
|
|
PromiseResult<R>>::value>> { |
|
|
|
|
static void Add(FilterType* channel_data, size_t call_offset, |
|
|
|
|
Layout<FallibleOperator<T>>& to) { |
|
|
|
|
class Promise { |
|
|
|
|
public: |
|
|
|
|
Promise(T value, typename FilterType::Call* call_data, |
|
|
|
|
FilterType* channel_data) |
|
|
|
|
: impl_((call_data->*impl)(std::move(value), channel_data)) {} |
|
|
|
|
|
|
|
|
|
Poll<ResultOr<T>> PollOnce() { |
|
|
|
|
auto p = impl_(); |
|
|
|
|
auto* r = p.value_if_ready(); |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
|
this->~Promise(); |
|
|
|
|
if (r->ok()) return ResultOr<T>{std::move(**r), nullptr}; |
|
|
|
|
return ResultOr<T>{nullptr, ServerMetadataFromStatus(r->status())}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
GPR_NO_UNIQUE_ADDRESS R impl_; |
|
|
|
|
}; |
|
|
|
|
to.Add(sizeof(Promise), alignof(Promise), |
|
|
|
|
FallibleOperator<T>{ |
|
|
|
|
channel_data, |
|
|
|
|
call_offset, |
|
|
|
|
[](void* promise_data, void* call_data, void* channel_data, |
|
|
|
|
T value) -> Poll<ResultOr<T>> { |
|
|
|
|
auto* promise = new (promise_data) |
|
|
|
|
Promise(std::move(value), |
|
|
|
|
static_cast<typename FilterType::Call*>(call_data), |
|
|
|
|
static_cast<FilterType*>(channel_data)); |
|
|
|
|
return promise->PollOnce(); |
|
|
|
|
}, |
|
|
|
|
[](void* promise_data) { |
|
|
|
|
return static_cast<Promise*>(promise_data)->PollOnce(); |
|
|
|
|
}, |
|
|
|
|
[](void* promise_data) { |
|
|
|
|
static_cast<Promise*>(promise_data)->~Promise(); |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct ChannelDataDestructor { |
|
|
|
|
void (*destroy)(void* channel_data); |
|
|
|
|
void* channel_data; |
|
|
|
@ -783,7 +834,7 @@ struct ChannelDataDestructor { |
|
|
|
|
// in-flight calls.
|
|
|
|
|
struct StackData { |
|
|
|
|
// Overall size and alignment of the call data for this stack.
|
|
|
|
|
size_t call_data_alignment = 0; |
|
|
|
|
size_t call_data_alignment = 1; |
|
|
|
|
size_t call_data_size = 0; |
|
|
|
|
// A complete list of filters for this call, so that we can construct the
|
|
|
|
|
// call data for each filter.
|
|
|
|
@ -1104,14 +1155,25 @@ class PipeState { |
|
|
|
|
void DropPush(); |
|
|
|
|
// Poll for push completion: occurs after the corresponding Pull()
|
|
|
|
|
Poll<StatusFlag> PollPush(); |
|
|
|
|
Poll<StatusFlag> PollPull(); |
|
|
|
|
// Poll for pull completion; returns Failure{} if closed with error,
|
|
|
|
|
// true if a value is available, or false if the pipe was closed without
|
|
|
|
|
// error.
|
|
|
|
|
Poll<ValueOrFailure<bool>> PollPull(); |
|
|
|
|
// A pulled value has been consumed: we can unblock the push
|
|
|
|
|
void AckPull(); |
|
|
|
|
// A previously started pull operation has completed
|
|
|
|
|
void DropPull(); |
|
|
|
|
// Close sending
|
|
|
|
|
void CloseSending(); |
|
|
|
|
// Close sending with error
|
|
|
|
|
void CloseWithError(); |
|
|
|
|
// Poll for closedness - if true, closed with error
|
|
|
|
|
Poll<bool> PollClosed(); |
|
|
|
|
|
|
|
|
|
bool holds_error() const { return state_ == ValueState::kError; } |
|
|
|
|
|
|
|
|
|
std::string DebugString() const; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
enum class ValueState : uint8_t { |
|
|
|
|
// Nothing sending nor receiving
|
|
|
|
@ -1248,6 +1310,44 @@ class CallFilters { |
|
|
|
|
filters_detail::StackData data_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class NextMessage { |
|
|
|
|
public: |
|
|
|
|
NextMessage() : has_value_(false), cancelled_(false) {} |
|
|
|
|
explicit NextMessage(MessageHandle value) |
|
|
|
|
: has_value_(true), value_(std::move(value)) {} |
|
|
|
|
explicit NextMessage(bool cancelled) |
|
|
|
|
: has_value_(false), cancelled_(cancelled) {} |
|
|
|
|
NextMessage(const NextMessage&) = delete; |
|
|
|
|
NextMessage& operator=(const NextMessage&) = delete; |
|
|
|
|
NextMessage(NextMessage&& other) noexcept = default; |
|
|
|
|
NextMessage& operator=(NextMessage&& other) = default; |
|
|
|
|
|
|
|
|
|
using value_type = MessageHandle; |
|
|
|
|
|
|
|
|
|
void reset() { |
|
|
|
|
has_value_ = false; |
|
|
|
|
cancelled_ = false; |
|
|
|
|
value_.reset(); |
|
|
|
|
} |
|
|
|
|
bool has_value() const { return has_value_; } |
|
|
|
|
const MessageHandle& value() const { |
|
|
|
|
GPR_DEBUG_ASSERT(has_value_); |
|
|
|
|
return value_; |
|
|
|
|
} |
|
|
|
|
MessageHandle& value() { |
|
|
|
|
GPR_DEBUG_ASSERT(has_value_); |
|
|
|
|
return value_; |
|
|
|
|
} |
|
|
|
|
const MessageHandle& operator*() const { return value(); } |
|
|
|
|
MessageHandle& operator*() { return value(); } |
|
|
|
|
bool cancelled() const { return !has_value_ && cancelled_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
bool has_value_; |
|
|
|
|
bool cancelled_; |
|
|
|
|
MessageHandle value_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
explicit CallFilters(ClientMetadataHandle client_initial_metadata); |
|
|
|
|
~CallFilters(); |
|
|
|
|
|
|
|
|
@ -1258,25 +1358,59 @@ class CallFilters { |
|
|
|
|
|
|
|
|
|
void SetStack(RefCountedPtr<Stack> stack); |
|
|
|
|
|
|
|
|
|
// Access client initial metadata before it's processed
|
|
|
|
|
ClientMetadata* unprocessed_client_initial_metadata() { |
|
|
|
|
return client_initial_metadata_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Client: Fetch client initial metadata
|
|
|
|
|
// Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata(); |
|
|
|
|
// Server: Indicate that no server initial metadata will be sent
|
|
|
|
|
void NoServerInitialMetadata() { |
|
|
|
|
server_initial_metadata_state_.CloseSending(); |
|
|
|
|
} |
|
|
|
|
// Server: Push server initial metadata
|
|
|
|
|
// Returns a promise that resolves to a StatusFlag indicating success
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PushServerInitialMetadata(ServerMetadataHandle md); |
|
|
|
|
// Client: Fetch server initial metadata
|
|
|
|
|
// Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle>
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PullServerInitialMetadata(); |
|
|
|
|
// Client: Push client to server message
|
|
|
|
|
// Returns a promise that resolves to a StatusFlag indicating success
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message); |
|
|
|
|
// Client: Indicate that no more messages will be sent
|
|
|
|
|
void FinishClientToServerSends() { |
|
|
|
|
client_to_server_message_state_.CloseSending(); |
|
|
|
|
} |
|
|
|
|
// Server: Fetch client to server message
|
|
|
|
|
// Returns a promise that resolves to ValueOrFailure<MessageHandle>
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PullClientToServerMessage(); |
|
|
|
|
// Server: Push server to client message
|
|
|
|
|
// Returns a promise that resolves to a StatusFlag indicating success
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message); |
|
|
|
|
// Server: Fetch server to client message
|
|
|
|
|
// Returns a promise that resolves to ValueOrFailure<MessageHandle>
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PullServerToClientMessage(); |
|
|
|
|
void PushServerTrailingMetadata(ServerMetadataHandle md) { |
|
|
|
|
GPR_ASSERT(md != nullptr); |
|
|
|
|
if (server_trailing_metadata_ != nullptr) return; |
|
|
|
|
server_trailing_metadata_ = std::move(md); |
|
|
|
|
server_trailing_metadata_waiter_.Wake(); |
|
|
|
|
} |
|
|
|
|
// Server: Indicate end of response
|
|
|
|
|
// Closes the request entirely - no messages can be sent/received
|
|
|
|
|
// If no server initial metadata has been sent, implies
|
|
|
|
|
// NoServerInitialMetadata() called.
|
|
|
|
|
void PushServerTrailingMetadata(ServerMetadataHandle md); |
|
|
|
|
// Client: Fetch server trailing metadata
|
|
|
|
|
// Returns a promise that resolves to ServerMetadataHandle
|
|
|
|
|
GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata(); |
|
|
|
|
// Server: Wait for server trailing metadata to have been sent
|
|
|
|
|
// Returns a promise that resolves to a StatusFlag indicating whether the
|
|
|
|
|
// request was cancelled or not -- failure to send trailing metadata is
|
|
|
|
|
// considered a cancellation, as is actual cancellation -- but not application
|
|
|
|
|
// errors.
|
|
|
|
|
GRPC_MUST_USE_RESULT auto WasCancelled(); |
|
|
|
|
// Client & server: fill in final_info with the final status of the call.
|
|
|
|
|
void Finalize(const grpc_call_final_info* final_info); |
|
|
|
|
|
|
|
|
|
std::string DebugString() const; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
template <filters_detail::PipeState(CallFilters::*state_ptr), |
|
|
|
|
void*(CallFilters::*push_ptr), typename T, |
|
|
|
@ -1315,6 +1449,10 @@ class CallFilters { |
|
|
|
|
|
|
|
|
|
T TakeValue() { return std::move(value_); } |
|
|
|
|
|
|
|
|
|
absl::string_view DebugString() const { |
|
|
|
|
return value_ != nullptr ? " (not pulled)" : ""; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
filters_detail::PipeState& state() { return filters_->*state_ptr; } |
|
|
|
|
void*& push_slot() { return filters_->*push_ptr; } |
|
|
|
@ -1323,24 +1461,36 @@ class CallFilters { |
|
|
|
|
T value_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class Pull { |
|
|
|
|
static std::string DebugString(absl::string_view name, |
|
|
|
|
const CallFilters* filters) { |
|
|
|
|
auto* push = static_cast<Push*>(filters->*push_ptr); |
|
|
|
|
return absl::StrCat(name, ":", (filters->*state_ptr).DebugString(), |
|
|
|
|
push == nullptr ? "" : push->DebugString()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class PullMaybe { |
|
|
|
|
public: |
|
|
|
|
explicit Pull(CallFilters* filters) : filters_(filters) {} |
|
|
|
|
~Pull() { |
|
|
|
|
explicit PullMaybe(CallFilters* filters) : filters_(filters) {} |
|
|
|
|
~PullMaybe() { |
|
|
|
|
if (filters_ != nullptr) { |
|
|
|
|
state().DropPull(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Pull(const Pull&) = delete; |
|
|
|
|
Pull& operator=(const Pull&) = delete; |
|
|
|
|
Pull(Pull&& other) noexcept |
|
|
|
|
PullMaybe(const PullMaybe&) = delete; |
|
|
|
|
PullMaybe& operator=(const PullMaybe&) = delete; |
|
|
|
|
PullMaybe(PullMaybe&& other) noexcept |
|
|
|
|
: filters_(std::exchange(other.filters_, nullptr)), |
|
|
|
|
executor_(std::move(other.executor_)) {} |
|
|
|
|
Pull& operator=(Pull&&) = delete; |
|
|
|
|
PullMaybe& operator=(PullMaybe&&) = delete; |
|
|
|
|
|
|
|
|
|
Poll<ValueOrFailure<T>> operator()() { |
|
|
|
|
Poll<ValueOrFailure<absl::optional<T>>> operator()() { |
|
|
|
|
if (executor_.IsRunning()) { |
|
|
|
|
auto c = state().PollClosed(); |
|
|
|
|
if (c.ready() && c.value()) { |
|
|
|
|
filters_->CancelDueToFailedPipeOperation(); |
|
|
|
|
return Failure{}; |
|
|
|
|
} |
|
|
|
|
return FinishOperationExecutor(executor_.Step(filters_->call_data_)); |
|
|
|
|
} |
|
|
|
|
auto p = state().PollPull(); |
|
|
|
@ -1350,6 +1500,7 @@ class CallFilters { |
|
|
|
|
filters_->CancelDueToFailedPipeOperation(); |
|
|
|
|
return Failure{}; |
|
|
|
|
} |
|
|
|
|
if (!**r) return absl::nullopt; |
|
|
|
|
return FinishOperationExecutor(executor_.Start( |
|
|
|
|
layout(), push()->TakeValue(), filters_->call_data_)); |
|
|
|
|
} |
|
|
|
@ -1362,7 +1513,7 @@ class CallFilters { |
|
|
|
|
return &(filters_->stack_->data_.*layout_ptr); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Poll<ValueOrFailure<T>> FinishOperationExecutor( |
|
|
|
|
Poll<ValueOrFailure<absl::optional<T>>> FinishOperationExecutor( |
|
|
|
|
Poll<filters_detail::ResultOr<T>> p) { |
|
|
|
|
auto* r = p.value_if_ready(); |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
@ -1376,6 +1527,66 @@ class CallFilters { |
|
|
|
|
CallFilters* filters_; |
|
|
|
|
filters_detail::OperationExecutor<T> executor_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class PullMessage { |
|
|
|
|
public: |
|
|
|
|
explicit PullMessage(CallFilters* filters) : filters_(filters) {} |
|
|
|
|
~PullMessage() { |
|
|
|
|
if (filters_ != nullptr) { |
|
|
|
|
state().DropPull(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
PullMessage(const PullMessage&) = delete; |
|
|
|
|
PullMessage& operator=(const PullMessage&) = delete; |
|
|
|
|
PullMessage(PullMessage&& other) noexcept |
|
|
|
|
: filters_(std::exchange(other.filters_, nullptr)), |
|
|
|
|
executor_(std::move(other.executor_)) {} |
|
|
|
|
PullMessage& operator=(PullMessage&&) = delete; |
|
|
|
|
|
|
|
|
|
Poll<NextMessage> operator()() { |
|
|
|
|
if (executor_.IsRunning()) { |
|
|
|
|
auto c = state().PollClosed(); |
|
|
|
|
if (c.ready() && c.value()) { |
|
|
|
|
filters_->CancelDueToFailedPipeOperation(); |
|
|
|
|
return NextMessage(true); |
|
|
|
|
} |
|
|
|
|
return FinishOperationExecutor(executor_.Step(filters_->call_data_)); |
|
|
|
|
} |
|
|
|
|
auto p = state().PollPull(); |
|
|
|
|
auto* r = p.value_if_ready(); |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
|
if (!r->ok()) { |
|
|
|
|
filters_->CancelDueToFailedPipeOperation(); |
|
|
|
|
return NextMessage(true); |
|
|
|
|
} |
|
|
|
|
if (!**r) return NextMessage(false); |
|
|
|
|
return FinishOperationExecutor(executor_.Start( |
|
|
|
|
layout(), push()->TakeValue(), filters_->call_data_)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
filters_detail::PipeState& state() { return filters_->*state_ptr; } |
|
|
|
|
Push* push() { return static_cast<Push*>(filters_->*push_ptr); } |
|
|
|
|
const filters_detail::Layout<filters_detail::FallibleOperator<T>>* |
|
|
|
|
layout() { |
|
|
|
|
return &(filters_->stack_->data_.*layout_ptr); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Poll<NextMessage> FinishOperationExecutor( |
|
|
|
|
Poll<filters_detail::ResultOr<T>> p) { |
|
|
|
|
auto* r = p.value_if_ready(); |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
|
GPR_DEBUG_ASSERT(!executor_.IsRunning()); |
|
|
|
|
state().AckPull(); |
|
|
|
|
if (r->ok != nullptr) return NextMessage(std::move(r->ok)); |
|
|
|
|
filters_->PushServerTrailingMetadata(std::move(r->error)); |
|
|
|
|
return NextMessage(true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
CallFilters* filters_; |
|
|
|
|
filters_detail::OperationExecutor<T> executor_; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class PullClientInitialMetadataPromise { |
|
|
|
@ -1400,7 +1611,12 @@ class CallFilters { |
|
|
|
|
} |
|
|
|
|
auto p = state().PollPull(); |
|
|
|
|
auto* r = p.value_if_ready(); |
|
|
|
|
gpr_log(GPR_INFO, "%s", r == nullptr ? "PENDING" : r->ToString().c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { |
|
|
|
|
gpr_log(GPR_INFO, "%s", |
|
|
|
|
r == nullptr |
|
|
|
|
? "PENDING" |
|
|
|
|
: (r->ok() ? (r->value() ? "TRUE" : "FALSE") : "FAILURE")); |
|
|
|
|
} |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
|
if (!r->ok()) { |
|
|
|
|
filters_->CancelDueToFailedPipeOperation(); |
|
|
|
@ -1450,11 +1666,39 @@ class CallFilters { |
|
|
|
|
|
|
|
|
|
Poll<ServerMetadataHandle> operator()() { |
|
|
|
|
if (executor_.IsRunning()) { |
|
|
|
|
return executor_.Step(filters_->call_data_); |
|
|
|
|
auto r = executor_.Step(filters_->call_data_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { |
|
|
|
|
if (r.pending()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"%s PullServerTrailingMetadata[%p]: Pending(but executing)", |
|
|
|
|
GetContext<Activity>()->DebugTag().c_str(), filters_); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_INFO, "%s PullServerTrailingMetadata[%p]: Ready: %s", |
|
|
|
|
GetContext<Activity>()->DebugTag().c_str(), filters_, |
|
|
|
|
r.value()->DebugString().c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return r; |
|
|
|
|
} |
|
|
|
|
if (filters_->server_trailing_metadata_ == nullptr) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"%s PullServerTrailingMetadata[%p]: Pending(not pushed)", |
|
|
|
|
GetContext<Activity>()->DebugTag().c_str(), filters_); |
|
|
|
|
} |
|
|
|
|
return filters_->server_trailing_metadata_waiter_.pending(); |
|
|
|
|
} |
|
|
|
|
// If no stack has been set, we can just return the result of the call
|
|
|
|
|
if (filters_->stack_ == nullptr) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"%s PullServerTrailingMetadata[%p]: Ready(no-stack): %s", |
|
|
|
|
GetContext<Activity>()->DebugTag().c_str(), filters_, |
|
|
|
|
filters_->server_trailing_metadata_->DebugString().c_str()); |
|
|
|
|
} |
|
|
|
|
return std::move(filters_->server_trailing_metadata_); |
|
|
|
|
} |
|
|
|
|
// Otherwise we need to process it through all the filters.
|
|
|
|
|
return executor_.Start(&filters_->stack_->data_.server_trailing_metadata, |
|
|
|
|
std::move(filters_->server_trailing_metadata_), |
|
|
|
|
filters_->call_data_); |
|
|
|
@ -1465,7 +1709,7 @@ class CallFilters { |
|
|
|
|
filters_detail::InfallibleOperationExecutor<ServerMetadataHandle> executor_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
void CancelDueToFailedPipeOperation(); |
|
|
|
|
void CancelDueToFailedPipeOperation(SourceLocation but_where = {}); |
|
|
|
|
|
|
|
|
|
RefCountedPtr<Stack> stack_; |
|
|
|
|
|
|
|
|
@ -1475,6 +1719,7 @@ class CallFilters { |
|
|
|
|
filters_detail::PipeState client_to_server_message_state_; |
|
|
|
|
filters_detail::PipeState server_to_client_message_state_; |
|
|
|
|
IntraActivityWaiter server_trailing_metadata_waiter_; |
|
|
|
|
Latch<bool> cancelled_; |
|
|
|
|
|
|
|
|
|
void* call_data_; |
|
|
|
|
ClientMetadataHandle client_initial_metadata_; |
|
|
|
@ -1516,7 +1761,7 @@ inline auto CallFilters::PushServerInitialMetadata(ServerMetadataHandle md) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::PullServerInitialMetadata() { |
|
|
|
|
return ServerInitialMetadataPromises::Pull{this}; |
|
|
|
|
return ServerInitialMetadataPromises::PullMaybe{this}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::PushClientToServerMessage(MessageHandle message) { |
|
|
|
@ -1526,7 +1771,7 @@ inline auto CallFilters::PushClientToServerMessage(MessageHandle message) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::PullClientToServerMessage() { |
|
|
|
|
return ClientToServerMessagePromises::Pull{this}; |
|
|
|
|
return ClientToServerMessagePromises::PullMessage{this}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::PushServerToClientMessage(MessageHandle message) { |
|
|
|
@ -1536,13 +1781,19 @@ inline auto CallFilters::PushServerToClientMessage(MessageHandle message) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::PullServerToClientMessage() { |
|
|
|
|
return ServerToClientMessagePromises::Pull{this}; |
|
|
|
|
return ServerToClientMessagePromises::PullMessage{this}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::PullServerTrailingMetadata() { |
|
|
|
|
return PullServerTrailingMetadataPromise(this); |
|
|
|
|
return Map(PullServerTrailingMetadataPromise(this), |
|
|
|
|
[this](ServerMetadataHandle h) { |
|
|
|
|
cancelled_.Set(h->get(GrpcCallWasCancelled()).value_or(false)); |
|
|
|
|
return h; |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
inline auto CallFilters::WasCancelled() { return cancelled_.Wait(); } |
|
|
|
|
|
|
|
|
|
} // namespace grpc_core
|
|
|
|
|
|
|
|
|
|
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H
|
|
|
|
|