@ -36,8 +36,6 @@ |
#include "absl/container/inlined_vector.h" |
#include "absl/meta/type_traits.h" |
#include "absl/status/status.h" |
#include "absl/strings/string_view.h" |
#include "absl/types/optional.h" |
#include <grpc/event_engine/event_engine.h> |
#include <grpc/impl/codegen/grpc_types.h> |
@ -60,10 +58,8 @@ |
#include "src/core/lib/promise/arena_promise.h" |
#include "src/core/lib/promise/context.h" |
#include "src/core/lib/promise/latch.h" |
#include "src/core/lib/promise/pipe.h" |
#include "src/core/lib/promise/poll.h" |
#include "src/core/lib/resource_quota/arena.h" |
#include "src/core/lib/slice/slice_buffer.h" |
#include "src/core/lib/transport/error_utils.h" |
#include "src/core/lib/transport/metadata_batch.h" |
#include "src/core/lib/transport/transport.h" |
@ -134,8 +130,6 @@ enum class FilterEndpoint { |
// Flags for MakePromiseBasedFilter.
static constexpr uint8_t kFilterExaminesServerInitialMetadata = 1; |
static constexpr uint8_t kFilterIsLast = 2; |
static constexpr uint8_t kFilterExaminesOutboundMessages = 4; |
static constexpr uint8_t kFilterExaminesInboundMessages = 8; |
namespace promise_filter_detail { |
@ -269,159 +263,6 @@ class BaseCallData : public Activity, private Wakeable { |
return p.release(); |
} |
// State machine for sending messages: handles intercepting send_message ops
// and forwarding them through pipes to the promise, then getting the result
// down the stack.
// Split into its own class so that we don't spend the memory instantiating
// these members for filters that don't need to intercept sent messages.
class SendMessage { |
public: |
explicit SendMessage(BaseCallData* base) |
: base_(base), pipe_(base->arena()) {} |
PipeReceiver<MessageHandle>* outgoing_pipe() { return &pipe_.receiver; } |
// Start a send_message op.
void StartOp(CapturedBatch batch); |
// Publish the outbound pipe to the filter.
// This happens when the promise requests to call the next filter: until
// this occurs messages can't be sent as we don't know the pipe that the
// promise expects to send on.
void GotPipe(PipeReceiver<MessageHandle>* receiver); |
// Called from client/server polling to do the send message part of the
// work.
void WakeInsideCombiner(Flusher* flusher); |
// Call is completed, we have trailing metadata. Close things out.
void Done(const ServerMetadata& metadata); |
// Return true if we have a batch captured (for debug logs)
bool HaveCapturedBatch() const { return batch_.is_captured(); } |
// Return true if we're not actively sending a message.
bool IsIdle() const; |
private: |
enum class State : uint8_t { |
// Starting state: no batch started, no outgoing pipe configured.
kInitial, |
// We have an outgoing pipe, but no batch started.
// (this is the steady state).
kIdle, |
// We have a batch started, but no outgoing pipe configured.
// Stall until we have one.
kGotBatchNoPipe, |
// We have a batch, and an outgoing pipe. On the next poll we'll push the
// message into the pipe to the promise.
kGotBatch, |
// We've pushed a message into the promise, and we're now waiting for it
// to pop out the other end so we can forward it down the stack.
kPushedToPipe, |
// We've forwarded a message down the stack, and now we're waiting for
// completion.
kForwardedBatch, |
// We've got the completion callback, we'll close things out during poll
// and then forward completion callbacks up and transition back to idle.
kBatchCompleted, |
// We're done.
kCancelled, |
}; |
static const char* StateString(State); |
void OnComplete(absl::Status status); |
BaseCallData* const base_; |
State state_ = State::kInitial; |
Pipe<MessageHandle> pipe_; |
PipeReceiver<MessageHandle>* receiver_ = nullptr; |
absl::optional<PipeSender<MessageHandle>::PushType> push_; |
absl::optional<PipeReceiver<MessageHandle>::NextType> next_; |
absl::optional<NextResult<MessageHandle>> next_result_; |
CapturedBatch batch_; |
grpc_closure* intercepted_on_complete_; |
grpc_closure on_complete_ = |
MakeMemberClosure<SendMessage, &SendMessage::OnComplete>(this); |
absl::Status completed_status_; |
}; |
// State machine for receiving messages: handles intercepting recv_message
// ops, forwarding them down the stack, and then publishing the result via
// pipes to the promise (and ultimately calling the right callbacks for the
// batch when our promise has completed processing of them).
// Split into its own class so that we don't spend the memory instantiating
// these members for filters that don't need to intercept sent messages.
class ReceiveMessage { |
public: |
explicit ReceiveMessage(BaseCallData* base) |
: base_(base), pipe_(base->arena()) {} |
PipeSender<MessageHandle>* incoming_pipe() { return &pipe_.sender; } |
// Start a recv_message op.
void StartOp(CapturedBatch& batch); |
// Publish the inbound pipe to the filter.
// This happens when the promise requests to call the next filter: until
// this occurs messages can't be received as we don't know the pipe that the
// promise expects to forward them with.
void GotPipe(PipeSender<MessageHandle>* sender); |
// Called from client/server polling to do the receive message part of the
// work.
void WakeInsideCombiner(Flusher* flusher); |
// Call is completed, we have trailing metadata. Close things out.
void Done(const ServerMetadata& metadata, Flusher* flusher); |
private: |
enum class State : uint8_t { |
// Starting state: no batch started, no incoming pipe configured.
kInitial, |
// We have an incoming pipe, but no batch started.
// (this is the steady state).
kIdle, |
// We received a batch and forwarded it on, but have not got an incoming
// pipe configured.
kForwardedBatchNoPipe, |
// We received a batch and forwarded it on.
kForwardedBatch, |
// We got the completion for the recv_message, but we don't yet have a
// pipe configured. Stall until this changes.
kBatchCompletedNoPipe, |
// We got the completion for the recv_message, and we have a pipe
// configured: next poll will push the message into the pipe for the
// filter to process.
kBatchCompleted, |
// We've pushed a message into the promise, and we're now waiting for it
// to pop out the other end so we can forward it up the stack.
kPushedToPipe, |
// We've got a message out of the pipe, now we need to wait for processing
// to completely quiesce in the promise prior to forwarding the completion
// up the stack.
kPulledFromPipe, |
// We're done.
kCancelled, |
// Call got terminated whilst we had forwarded a recv_message down the
// stack: we need to keep track of that until we get the completion so
// that we do the right thing in OnComplete.
kCancelledWhilstForwarding, |
// Call got terminated whilst we had a recv_message batch completed, and
// we've now received the completion.
// On the next poll we'll close things out and forward on completions,
// then transition to cancelled.
kBatchCompletedButCancelled, |
}; |
static const char* StateString(State); |
void OnComplete(absl::Status status); |
BaseCallData* const base_; |
Pipe<MessageHandle> pipe_; |
PipeSender<MessageHandle>* sender_; |
State state_ = State::kInitial; |
uint32_t scratch_flags_; |
absl::optional<SliceBuffer>* intercepted_slice_buffer_; |
uint32_t* intercepted_flags_; |
absl::optional<PipeSender<MessageHandle>::PushType> push_; |
absl::optional<PipeReceiver<MessageHandle>::NextType> next_; |
absl::Status completed_status_; |
grpc_closure* intercepted_on_complete_; |
grpc_closure on_complete_ = |
MakeMemberClosure<ReceiveMessage, &ReceiveMessage::OnComplete>(this); |
}; |
Arena* arena() { return arena_; } |
grpc_call_element* elem() const { return elem_; } |
CallCombiner* call_combiner() const { return call_combiner_; } |
@ -430,26 +271,12 @@ class BaseCallData : public Activity, private Wakeable { |
Latch<ServerMetadata*>* server_initial_metadata_latch() const { |
return server_initial_metadata_latch_; |
} |
PipeReceiver<MessageHandle>* outgoing_messages_pipe() const { |
return send_message_ == nullptr ? nullptr : send_message_->outgoing_pipe(); |
} |
PipeSender<MessageHandle>* incoming_messages_pipe() const { |
return receive_message_ == nullptr ? nullptr |
: receive_message_->incoming_pipe(); |
} |
SendMessage* send_message() const { return send_message_; } |
ReceiveMessage* receive_message() const { return receive_message_; } |
bool is_last() const { |
return grpc_call_stack_element(call_stack_, call_stack_->count - 1) == |
elem_; |
} |
virtual void WakeInsideCombiner(Flusher* flusher) = 0; |
virtual absl::string_view ClientOrServerString() const = 0; |
std::string LogTag() const; |
private: |
// Wakeable implementation.
void Wakeup() final; |
@ -465,9 +292,7 @@ class BaseCallData : public Activity, private Wakeable { |
CallFinalization finalization_; |
grpc_call_context_element* const context_; |
std::atomic<grpc_polling_entity*> pollent_{nullptr}; |
Latch<ServerMetadata*>* const server_initial_metadata_latch_; |
SendMessage* const send_message_; |
ReceiveMessage* const receive_message_; |
Latch<ServerMetadata*>* server_initial_metadata_latch_ = nullptr; |
grpc_event_engine::experimental::EventEngine* event_engine_; |
}; |
@ -516,15 +341,11 @@ class ClientCallData : public BaseCallData { |
kCancelled |
}; |
static const char* StateString(SendInitialState); |
static const char* StateString(RecvTrailingState); |
std::string DebugString() const; |
struct RecvInitialMetadata; |
class PollContext; |
// Handle cancellation.
void Cancel(grpc_error_handle error, Flusher* flusher); |
void Cancel(grpc_error_handle error); |
// Begin running the promise - which will ultimately take some initial
// metadata and return some trailing metadata.
void StartPromise(Flusher* flusher); |
@ -550,20 +371,15 @@ class ClientCallData : public BaseCallData { |
void SetStatusFromError(grpc_metadata_batch* metadata, |
grpc_error_handle error); |
// Wakeup and poll the promise if appropriate.
void WakeInsideCombiner(Flusher* flusher) override; |
void WakeInsideCombiner(Flusher* flusher); |
void OnWakeup() override; |
absl::string_view ClientOrServerString() const override { return "CLI"; } |
// Contained promise
ArenaPromise<ServerMetadataHandle> promise_; |
// Queued batch containing at least a send_initial_metadata op.
CapturedBatch send_initial_metadata_batch_; |
// Pointer to where trailing metadata will be stored.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr; |
// Trailing metadata as returned by the promise, if we hadn't received
// trailing metadata from below yet (so we can substitute it in).
ServerMetadataHandle cancelling_metadata_; |
// State tracking recv initial metadata for filters that care about it.
RecvInitialMetadata* recv_initial_metadata_ = nullptr; |
// Closure to call when we're done with the trailing metadata.
@ -591,9 +407,6 @@ class ServerCallData : public BaseCallData { |
// Handle one grpc_transport_stream_op_batch
void StartBatch(grpc_transport_stream_op_batch* batch) override; |
protected: |
absl::string_view ClientOrServerString() const override { return "SVR"; } |
private: |
// At what stage is our handling of recv initial metadata?
enum class RecvInitialState { |
@ -612,10 +425,6 @@ class ServerCallData : public BaseCallData { |
enum class SendTrailingState { |
// Start state: no op seen
kInitial, |
// We saw the op, but it was with a send message op (or one was in progress)
// - so we'll wait for that to complete before processing the trailing
// metadata.
kQueuedBehindSendMessage, |
// We saw the op, and are waiting for the promise to complete
// to forward it.
kQueued, |
@ -625,15 +434,11 @@ class ServerCallData : public BaseCallData { |
kCancelled |
}; |
static const char* StateString(RecvInitialState state); |
static const char* StateString(SendTrailingState state); |
std::string DebugString() const; |
class PollContext; |
struct SendInitialMetadata; |
// Shut things down when the call completes.
void Completed(grpc_error_handle error, Flusher* flusher); |
// Handle cancellation.
void Cancel(grpc_error_handle error, Flusher* flusher); |
// Construct a promise that will "call" the next filter.
// Effectively:
// - put the modified initial metadata into the batch being sent up.
@ -646,29 +451,20 @@ class ServerCallData : public BaseCallData { |
static void RecvInitialMetadataReadyCallback(void* arg, |
grpc_error_handle error); |
void RecvInitialMetadataReady(grpc_error_handle error); |
static void RecvTrailingMetadataReadyCallback(void* arg, |
grpc_error_handle error); |
void RecvTrailingMetadataReady(grpc_error_handle error); |
// Wakeup and poll the promise if appropriate.
void WakeInsideCombiner(Flusher* flusher) override; |
void WakeInsideCombiner(Flusher* flusher); |
void OnWakeup() override; |
// Contained promise
ArenaPromise<ServerMetadataHandle> promise_; |
// Pointer to where initial metadata will be stored.
grpc_metadata_batch* recv_initial_metadata_ = nullptr; |
// Pointer to where trailing metadata will be stored.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr; |
// State for sending initial metadata.
SendInitialMetadata* send_initial_metadata_ = nullptr; |
// Closure to call when we're done with the initial metadata.
// Closure to call when we're done with the trailing metadata.
grpc_closure* original_recv_initial_metadata_ready_ = nullptr; |
// Our closure pointing to RecvInitialMetadataReadyCallback.
grpc_closure recv_initial_metadata_ready_; |
// Closure to call when we're done with the trailing metadata.
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; |
// Our closure pointing to RecvTrailingMetadataReadyCallback.
grpc_closure recv_trailing_metadata_ready_; |
// Error received during cancellation.
grpc_error_handle cancelled_error_; |
// Trailing metadata batch