diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 59dd4a02c23..ccb7d07438e 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -16,57 +16,23 @@ #include "src/core/lib/channel/promise_based_filter.h" -#include #include #include -#include #include "absl/base/attributes.h" -#include "absl/functional/function_ref.h" -#include "absl/strings/str_cat.h" -#include "absl/strings/str_join.h" #include "absl/types/variant.h" #include #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/slice/slice.h" -extern grpc_core::TraceFlag grpc_trace_channel; - namespace grpc_core { namespace promise_filter_detail { -namespace { -class FakeActivity final : public Activity { - public: - void Orphan() override {} - void ForceImmediateRepoll() override {} - Waker MakeOwningWaker() override { abort(); } - Waker MakeNonOwningWaker() override { abort(); } - void Run(absl::FunctionRef f) { - ScopedActivity activity(this); - f(); - } -}; - -absl::Status StatusFromMetadata(const ServerMetadata& md) { - auto status_code = md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); - if (status_code == GRPC_STATUS_OK) { - return absl::OkStatus(); - } - const auto* message = md.get_pointer(GrpcMessageMetadata()); - return grpc_error_set_int( - absl::Status(static_cast(status_code), - message == nullptr ? "" : message->as_string_view()), - StatusIntProperty::kRpcStatus, status_code); -} -} // namespace - /////////////////////////////////////////////////////////////////////////////// // BaseCallData @@ -78,33 +44,18 @@ BaseCallData::BaseCallData(grpc_call_element* elem, call_combiner_(args->call_combiner), deadline_(args->deadline), context_(args->context), - server_initial_metadata_latch_( - flags & kFilterExaminesServerInitialMetadata - ? arena_->New>() - : nullptr), - send_message_(flags & kFilterExaminesOutboundMessages - ? arena_->New(this) - : nullptr), - receive_message_(flags & kFilterExaminesInboundMessages - ? arena_->New(this) - : nullptr), event_engine_( static_cast(elem->channel_data) ->hack_until_per_channel_stack_event_engines_land_get_event_engine()) { + if (flags & kFilterExaminesServerInitialMetadata) { + server_initial_metadata_latch_ = arena_->New>(); + } } BaseCallData::~BaseCallData() { - FakeActivity().Run([this] { - if (send_message_ != nullptr) { - send_message_->~SendMessage(); - } - if (receive_message_ != nullptr) { - receive_message_->~ReceiveMessage(); - } - if (server_initial_metadata_latch_ != nullptr) { - server_initial_metadata_latch_->~Latch(); - } - }); + if (server_initial_metadata_latch_ != nullptr) { + server_initial_metadata_latch_->~Latch(); + } } // We don't form ActivityPtr's to this type, and consequently don't need @@ -132,12 +83,6 @@ void BaseCallData::Wakeup() { void BaseCallData::Drop() { GRPC_CALL_STACK_UNREF(call_stack_, "waker"); } -std::string BaseCallData::LogTag() const { - return absl::StrCat( - ClientOrServerString(), "[", elem_->filter->name, ":0x", - absl::Hex(reinterpret_cast(elem_), absl::kZeroPad8), "]"); -} - /////////////////////////////////////////////////////////////////////////////// // BaseCallData::CapturedBatch @@ -245,19 +190,11 @@ BaseCallData::Flusher::~Flusher() { auto* batch = static_cast(p); BaseCallData* call = static_cast(batch->handler_private.extra_arg); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "FLUSHER:forward batch via closure: %s", - grpc_transport_stream_op_batch_string(batch).c_str()); - } grpc_call_next_op(call->elem(), batch); GRPC_CALL_STACK_UNREF(call->call_stack(), "flusher_batch"); }; for (size_t i = 1; i < release_.size(); i++) { auto* batch = release_[i]; - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "FLUSHER:queue batch to forward in closure: %s", - grpc_transport_stream_op_batch_string(release_[i]).c_str()); - } batch->handler_private.extra_arg = call_; GRPC_CLOSURE_INIT(&batch->handler_private.closure, call_next_op, batch, nullptr); @@ -266,479 +203,10 @@ BaseCallData::Flusher::~Flusher() { "flusher_batch"); } call_closures_.RunClosuresWithoutYielding(call_->call_combiner()); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "FLUSHER:forward batch: %s", - grpc_transport_stream_op_batch_string(release_[0]).c_str()); - } grpc_call_next_op(call_->elem(), release_[0]); GRPC_CALL_STACK_UNREF(call_->call_stack(), "flusher"); } -/////////////////////////////////////////////////////////////////////////////// -// BaseCallData::SendMessage - -const char* BaseCallData::SendMessage::StateString(State state) { - switch (state) { - case State::kInitial: - return "INITIAL"; - case State::kIdle: - return "IDLE"; - case State::kGotBatchNoPipe: - return "GOT_BATCH_NO_PIPE"; - case State::kGotBatch: - return "GOT_BATCH"; - case State::kPushedToPipe: - return "PUSHED_TO_PIPE"; - case State::kForwardedBatch: - return "FORWARDED_BATCH"; - case State::kBatchCompleted: - return "BATCH_COMPLETED"; - case State::kCancelled: - return "CANCELLED"; - } - return "UNKNOWN"; -} - -void BaseCallData::SendMessage::StartOp(CapturedBatch batch) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s SendMessage.StartOp st=%s", base_->LogTag().c_str(), - StateString(state_)); - } - switch (state_) { - case State::kInitial: - state_ = State::kGotBatchNoPipe; - break; - case State::kIdle: - state_ = State::kGotBatch; - break; - case State::kGotBatch: - case State::kGotBatchNoPipe: - case State::kForwardedBatch: - case State::kBatchCompleted: - case State::kPushedToPipe: - abort(); - case State::kCancelled: - return; - } - batch_ = batch; - intercepted_on_complete_ = std::exchange(batch_->on_complete, &on_complete_); -} - -void BaseCallData::SendMessage::GotPipe(PipeReceiver* receiver) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s SendMessage.GotPipe st=%s", base_->LogTag().c_str(), - StateString(state_)); - } - GPR_ASSERT(receiver != nullptr); - switch (state_) { - case State::kInitial: - state_ = State::kIdle; - Activity::current()->ForceImmediateRepoll(); - break; - case State::kGotBatchNoPipe: - state_ = State::kGotBatch; - Activity::current()->ForceImmediateRepoll(); - break; - case State::kIdle: - case State::kGotBatch: - case State::kForwardedBatch: - case State::kBatchCompleted: - case State::kPushedToPipe: - abort(); - case State::kCancelled: - return; - } - receiver_ = receiver; -} - -bool BaseCallData::SendMessage::IsIdle() const { - switch (state_) { - case State::kInitial: - case State::kIdle: - case State::kForwardedBatch: - case State::kCancelled: - return true; - case State::kGotBatchNoPipe: - case State::kGotBatch: - case State::kBatchCompleted: - case State::kPushedToPipe: - return false; - } -} - -void BaseCallData::SendMessage::OnComplete(absl::Status status) { - Flusher flusher(base_); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s SendMessage.OnComplete st=%s status=%s", - base_->LogTag().c_str(), StateString(state_), - status.ToString().c_str()); - } - switch (state_) { - case State::kInitial: - case State::kIdle: - case State::kGotBatchNoPipe: - case State::kPushedToPipe: - case State::kGotBatch: - case State::kBatchCompleted: - abort(); - break; - case State::kCancelled: - flusher.AddClosure(intercepted_on_complete_, status, - "forward after cancel"); - break; - case State::kForwardedBatch: - completed_status_ = status; - state_ = State::kBatchCompleted; - base_->WakeInsideCombiner(&flusher); - break; - } -} - -void BaseCallData::SendMessage::Done(const ServerMetadata& metadata) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s SendMessage.Done st=%s md=%s", - base_->LogTag().c_str(), StateString(state_), - metadata.DebugString().c_str()); - } - switch (state_) { - case State::kCancelled: - break; - case State::kInitial: - case State::kIdle: - case State::kForwardedBatch: - state_ = State::kCancelled; - break; - case State::kGotBatchNoPipe: - case State::kGotBatch: - case State::kBatchCompleted: - abort(); - break; - case State::kPushedToPipe: - push_.reset(); - next_.reset(); - state_ = State::kCancelled; - break; - } -} - -void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s SendMessage.WakeInsideCombiner st=%s%s", - base_->LogTag().c_str(), StateString(state_), - state_ == State::kBatchCompleted - ? absl::StrCat(" status=", completed_status_.ToString()).c_str() - : ""); - } - switch (state_) { - case State::kInitial: - case State::kIdle: - case State::kGotBatchNoPipe: - case State::kForwardedBatch: - case State::kCancelled: - break; - case State::kGotBatch: { - state_ = State::kPushedToPipe; - auto message = GetContext()->MakePooled(); - message->payload()->Swap(batch_->payload->send_message.send_message); - message->mutable_flags() = batch_->payload->send_message.flags; - push_ = pipe_.sender.Push(std::move(message)); - next_ = receiver_->Next(); - } - ABSL_FALLTHROUGH_INTENDED; - case State::kPushedToPipe: { - GPR_ASSERT(push_.has_value()); - auto r_push = (*push_)(); - if (auto* p = absl::get_if(&r_push)) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, - "%s SendMessage.WakeInsideCombiner push complete, result=%s", - base_->LogTag().c_str(), *p ? "true" : "false"); - } - // We haven't pulled through yet, so this certainly shouldn't succeed. - GPR_ASSERT(!*p); - state_ = State::kCancelled; - batch_.CancelWith(absl::CancelledError(), flusher); - break; - } - GPR_ASSERT(next_.has_value()); - auto r_next = (*next_)(); - if (auto* p = absl::get_if>(&r_next)) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, - "%s SendMessage.WakeInsideCombiner next complete, " - "result.has_value=%s", - base_->LogTag().c_str(), p->has_value() ? "true" : "false"); - } - GPR_ASSERT(p->has_value()); - batch_->payload->send_message.send_message->Swap((**p)->payload()); - batch_->payload->send_message.flags = (**p)->flags(); - state_ = State::kForwardedBatch; - batch_.ResumeWith(flusher); - next_result_ = std::move(*p); - next_.reset(); - } - } break; - case State::kBatchCompleted: - next_result_.reset(); - // We've cleared out the NextResult on the pipe from promise to us, but - // there's also the pipe from us to the promise (so that the promise can - // intercept the sent messages). The push promise here is pushing into the - // latter pipe, and so we need to keep polling it until it's done, which - // depending on what happens inside the promise may take some time. - if (absl::holds_alternative((*push_)())) break; - if (completed_status_.ok()) { - state_ = State::kIdle; - Activity::current()->ForceImmediateRepoll(); - } else { - state_ = State::kCancelled; - } - push_.reset(); - flusher->AddClosure(intercepted_on_complete_, completed_status_, - "batch_completed"); - break; - } -} - -/////////////////////////////////////////////////////////////////////////////// -// BaseCallData::ReceiveMessage - -const char* BaseCallData::ReceiveMessage::StateString(State state) { - switch (state) { - case State::kInitial: - return "INITIAL"; - case State::kIdle: - return "IDLE"; - case State::kForwardedBatchNoPipe: - return "FORWARDED_BATCH_NO_PIPE"; - case State::kForwardedBatch: - return "FORWARDED_BATCH"; - case State::kBatchCompletedNoPipe: - return "BATCH_COMPLETED_NO_PIPE"; - case State::kBatchCompleted: - return "BATCH_COMPLETED"; - case State::kPushedToPipe: - return "PUSHED_TO_PIPE"; - case State::kPulledFromPipe: - return "PULLED_FROM_PIPE"; - case State::kCancelled: - return "CANCELLED"; - case State::kCancelledWhilstForwarding: - return "CANCELLED_WHILST_FORWARDING"; - case State::kBatchCompletedButCancelled: - return "BATCH_COMPLETED_BUT_CANCELLED"; - } - return "UNKNOWN"; -} - -void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ReceiveMessage.StartOp st=%s", - base_->LogTag().c_str(), StateString(state_)); - } - switch (state_) { - case State::kInitial: - state_ = State::kForwardedBatchNoPipe; - break; - case State::kIdle: - state_ = State::kForwardedBatch; - break; - case State::kCancelledWhilstForwarding: - case State::kBatchCompletedButCancelled: - case State::kForwardedBatch: - case State::kForwardedBatchNoPipe: - case State::kBatchCompleted: - case State::kBatchCompletedNoPipe: - case State::kPushedToPipe: - case State::kPulledFromPipe: - abort(); - case State::kCancelled: - return; - } - intercepted_slice_buffer_ = batch->payload->recv_message.recv_message; - intercepted_flags_ = batch->payload->recv_message.flags; - if (intercepted_flags_ == nullptr) { - intercepted_flags_ = &scratch_flags_; - *intercepted_flags_ = 0; - } - intercepted_on_complete_ = std::exchange( - batch->payload->recv_message.recv_message_ready, &on_complete_); -} - -void BaseCallData::ReceiveMessage::GotPipe(PipeSender* sender) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ReceiveMessage.GotPipe st=%s", - base_->LogTag().c_str(), StateString(state_)); - } - switch (state_) { - case State::kInitial: - state_ = State::kIdle; - break; - case State::kForwardedBatchNoPipe: - state_ = State::kForwardedBatch; - break; - case State::kBatchCompletedNoPipe: - state_ = State::kBatchCompleted; - Activity::current()->ForceImmediateRepoll(); - break; - case State::kIdle: - case State::kForwardedBatch: - case State::kBatchCompleted: - case State::kPushedToPipe: - case State::kPulledFromPipe: - case State::kCancelledWhilstForwarding: - case State::kBatchCompletedButCancelled: - abort(); - case State::kCancelled: - return; - } - sender_ = sender; -} - -void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ReceiveMessage.OnComplete st=%s status=%s", - base_->LogTag().c_str(), StateString(state_), - status.ToString().c_str()); - } - switch (state_) { - case State::kInitial: - case State::kIdle: - case State::kPushedToPipe: - case State::kPulledFromPipe: - case State::kBatchCompleted: - case State::kBatchCompletedNoPipe: - case State::kCancelled: - case State::kBatchCompletedButCancelled: - abort(); - case State::kForwardedBatchNoPipe: - state_ = State::kBatchCompletedNoPipe; - return; - case State::kForwardedBatch: - state_ = State::kBatchCompleted; - break; - case State::kCancelledWhilstForwarding: - state_ = State::kBatchCompletedButCancelled; - break; - } - completed_status_ = status; - Flusher flusher(base_); - ScopedContext ctx(base_); - base_->WakeInsideCombiner(&flusher); -} - -void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata, - Flusher* flusher) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ReceiveMessage.Done st=%s md=%s", - base_->LogTag().c_str(), StateString(state_), - metadata.DebugString().c_str()); - } - switch (state_) { - case State::kInitial: - case State::kIdle: - state_ = State::kCancelled; - break; - case State::kForwardedBatch: - case State::kForwardedBatchNoPipe: - state_ = State::kCancelledWhilstForwarding; - break; - case State::kPulledFromPipe: - case State::kPushedToPipe: { - auto status_code = - metadata.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_OK); - GPR_ASSERT(status_code != GRPC_STATUS_OK); - push_.reset(); - next_.reset(); - flusher->AddClosure(intercepted_on_complete_, - StatusFromMetadata(metadata), "recv_message_done"); - state_ = State::kCancelled; - } break; - case State::kBatchCompleted: - case State::kBatchCompletedNoPipe: - case State::kBatchCompletedButCancelled: - abort(); - case State::kCancelledWhilstForwarding: - case State::kCancelled: - break; - } -} - -void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ReceiveMessage.WakeInsideCombiner st=%s", - base_->LogTag().c_str(), StateString(state_)); - } - switch (state_) { - case State::kInitial: - case State::kIdle: - case State::kForwardedBatchNoPipe: - case State::kForwardedBatch: - case State::kCancelled: - case State::kCancelledWhilstForwarding: - case State::kBatchCompletedNoPipe: - break; - case State::kBatchCompletedButCancelled: - sender_->Close(); - state_ = State::kCancelled; - flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), - completed_status_, "recv_message"); - break; - case State::kBatchCompleted: - if (completed_status_.ok() && intercepted_slice_buffer_->has_value()) { - state_ = State::kPushedToPipe; - auto message = GetContext()->MakePooled(); - message->payload()->Swap(&**intercepted_slice_buffer_); - message->mutable_flags() = *intercepted_flags_; - push_ = sender_->Push(std::move(message)); - next_ = pipe_.receiver.Next(); - } else { - sender_->Close(); - state_ = State::kCancelled; - flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), - completed_status_, "recv_message"); - break; - } - GPR_ASSERT(state_ == State::kPushedToPipe); - ABSL_FALLTHROUGH_INTENDED; - case State::kPushedToPipe: { - GPR_ASSERT(push_.has_value()); - auto r_push = (*push_)(); - if (auto* p = absl::get_if(&r_push)) { - // We haven't pulled through yet, so this certainly shouldn't succeed. - GPR_ASSERT(!*p); - state_ = State::kCancelled; - break; - } - GPR_ASSERT(next_.has_value()); - auto r_next = (*next_)(); - if (auto* p = absl::get_if>(&r_next)) { - next_.reset(); - if (p->has_value()) { - *intercepted_slice_buffer_ = std::move(*(**p)->payload()); - *intercepted_flags_ = (**p)->flags(); - state_ = State::kPulledFromPipe; - } else { - *intercepted_slice_buffer_ = absl::nullopt; - *intercepted_flags_ = 0; - state_ = State::kCancelled; - } - } - } - if (state_ != State::kPulledFromPipe) break; - ABSL_FALLTHROUGH_INTENDED; - case State::kPulledFromPipe: { - GPR_ASSERT(push_.has_value()); - if (!absl::holds_alternative((*push_)())) { - state_ = State::kIdle; - push_.reset(); - flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), - absl::OkStatus(), "recv_message"); - } - break; - } - } -} - /////////////////////////////////////////////////////////////////////////////// // ClientCallData @@ -771,32 +239,6 @@ struct ClientCallData::RecvInitialMetadata final { grpc_closure on_ready; grpc_metadata_batch* metadata = nullptr; Latch* server_initial_metadata_publisher = nullptr; - - static const char* StateString(State state) { - switch (state) { - case kInitial: - return "INITIAL"; - case kGotLatch: - return "GOT_LATCH"; - case kRespondedToTrailingMetadataPriorToHook: - return "RESPONDED_TO_TRAILING_METADATA_PRIOR_TO_HOOK"; - case kHookedWaitingForLatch: - return "HOOKED_WAITING_FOR_LATCH"; - case kHookedAndGotLatch: - return "HOOKED_AND_GOT_LATCH"; - case kCompleteWaitingForLatch: - return "COMPLETE_WAITING_FOR_LATCH"; - case kCompleteAndGotLatch: - return "COMPLETE_AND_GOT_LATCH"; - case kCompleteAndSetLatch: - return "COMPLETE_AND_SET_LATCH"; - case kResponded: - return "RESPONDED"; - case kRespondedButNeedToSetLatch: - return "RESPONDED_BUT_NEED_TO_SET_LATCH"; - } - return "UNKNOWN"; - } }; class ClientCallData::PollContext { @@ -814,18 +256,8 @@ class ClientCallData::PollContext { PollContext& operator=(const PollContext&) = delete; void Run() { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ClientCallData.PollContext.Run %s", - self_->LogTag().c_str(), self_->DebugString().c_str()); - } GPR_ASSERT(have_scoped_activity_); repoll_ = false; - if (self_->send_message() != nullptr) { - self_->send_message()->WakeInsideCombiner(flusher_); - } - if (self_->receive_message() != nullptr) { - self_->receive_message()->WakeInsideCombiner(flusher_); - } if (self_->server_initial_metadata_latch() != nullptr) { switch (self_->recv_initial_metadata_->state) { case RecvInitialMetadata::kInitial: @@ -876,24 +308,14 @@ class ClientCallData::PollContext { case SendInitialState::kForwarded: { // Poll the promise once since we're waiting for it. Poll poll = self_->promise_(); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ClientCallData.PollContext.Run: poll=%s", - self_->LogTag().c_str(), - PollToString(poll, [](const ServerMetadataHandle& h) { - return h->DebugString(); - }).c_str()); - } if (auto* r = absl::get_if(&poll)) { - auto md = std::move(*r); - if (self_->send_message() != nullptr) { - self_->send_message()->Done(*md); - } - if (self_->receive_message() != nullptr) { - self_->receive_message()->Done(*md, flusher_); - } + auto* md = UnwrapMetadata(std::move(*r)); + bool destroy_md = true; if (self_->recv_trailing_state_ == RecvTrailingState::kComplete) { - if (self_->recv_trailing_metadata_ != md.get()) { + if (self_->recv_trailing_metadata_ != md) { *self_->recv_trailing_metadata_ = std::move(*md); + } else { + destroy_md = false; } self_->recv_trailing_state_ = RecvTrailingState::kResponded; flusher_->AddClosure( @@ -930,8 +352,17 @@ class ClientCallData::PollContext { } } } else { - self_->cancelled_error_ = StatusFromMetadata(*md); - GPR_ASSERT(!self_->cancelled_error_.ok()); + GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) != + GRPC_STATUS_OK); + grpc_error_handle error = grpc_error_set_int( + GRPC_ERROR_CREATE("early return from promise based filter"), + StatusIntProperty::kRpcStatus, + *md->get_pointer(GrpcStatusMetadata())); + if (auto* message = md->get_pointer(GrpcMessageMetadata())) { + error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, + message->as_string_view()); + } + self_->cancelled_error_ = error; if (self_->recv_initial_metadata_ != nullptr) { switch (self_->recv_initial_metadata_->state) { case RecvInitialMetadata::kInitial: @@ -957,19 +388,18 @@ class ClientCallData::PollContext { std::exchange( self_->recv_initial_metadata_->original_on_ready, nullptr), - self_->cancelled_error_, + error, "wake_inside_combiner:recv_initial_metadata_ready"); } } if (self_->send_initial_state_ == SendInitialState::kQueued) { self_->send_initial_state_ = SendInitialState::kCancelled; - self_->send_initial_metadata_batch_.CancelWith( - self_->cancelled_error_, flusher_); + self_->send_initial_metadata_batch_.CancelWith(error, flusher_); } else { GPR_ASSERT( self_->recv_trailing_state_ == RecvTrailingState::kInitial || self_->recv_trailing_state_ == RecvTrailingState::kForwarded); - self_->call_combiner()->Cancel(self_->cancelled_error_); + self_->call_combiner()->Cancel(error); CapturedBatch b(grpc_make_transport_stream_op(GRPC_CLOSURE_CREATE( [](void* p, grpc_error_handle) { GRPC_CALL_COMBINER_STOP(static_cast(p), @@ -977,15 +407,17 @@ class ClientCallData::PollContext { }, self_->call_combiner(), nullptr))); b->cancel_stream = true; - b->payload->cancel_stream.cancel_error = self_->cancelled_error_; + b->payload->cancel_stream.cancel_error = error; b.ResumeWith(flusher_); } - self_->cancelling_metadata_ = std::move(md); self_->recv_trailing_state_ = RecvTrailingState::kCancelled; } - self_->promise_ = ArenaPromise(); + if (destroy_md) { + md->~grpc_metadata_batch(); + } scoped_activity_.Destroy(); have_scoped_activity_ = false; + self_->promise_ = ArenaPromise(); } } break; case SendInitialState::kInitial: @@ -1071,58 +503,6 @@ void ClientCallData::ForceImmediateRepoll() { poll_ctx_->Repoll(); } -const char* ClientCallData::StateString(SendInitialState state) { - switch (state) { - case SendInitialState::kInitial: - return "INITIAL"; - case SendInitialState::kQueued: - return "QUEUED"; - case SendInitialState::kForwarded: - return "FORWARDED"; - case SendInitialState::kCancelled: - return "CANCELLED"; - } - return "UNKNOWN"; -} - -const char* ClientCallData::StateString(RecvTrailingState state) { - switch (state) { - case RecvTrailingState::kInitial: - return "INITIAL"; - case RecvTrailingState::kQueued: - return "QUEUED"; - case RecvTrailingState::kComplete: - return "COMPLETE"; - case RecvTrailingState::kForwarded: - return "FORWARDED"; - case RecvTrailingState::kCancelled: - return "CANCELLED"; - case RecvTrailingState::kResponded: - return "RESPONDED"; - } - return "UNKNOWN"; -} - -std::string ClientCallData::DebugString() const { - std::vector captured; - if (send_initial_metadata_batch_.is_captured()) { - captured.push_back("send_initial_metadata"); - } - if (send_message() != nullptr && send_message()->HaveCapturedBatch()) { - captured.push_back("send_message"); - } - return absl::StrCat( - "has_promise=", promise_.has_value() ? "true" : "false", - " sent_initial_state=", StateString(send_initial_state_), - " recv_trailing_state=", StateString(recv_trailing_state_), " captured={", - absl::StrJoin(captured, ","), "}", - server_initial_metadata_latch() == nullptr - ? "" - : absl::StrCat(" recv_initial_metadata=", - RecvInitialMetadata::StateString( - recv_initial_metadata_->state))); -} - // Handle one grpc_transport_stream_op_batch void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { // Fake out the activity based context. @@ -1130,11 +510,6 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { CapturedBatch batch(b); Flusher flusher(this); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s StartBatch %s", LogTag().c_str(), - DebugString().c_str()); - } - // If this is a cancel stream, cancel anything we have pending and propagate // the cancellation. if (batch->cancel_stream) { @@ -1142,9 +517,7 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { !batch->send_trailing_metadata && !batch->send_message && !batch->recv_initial_metadata && !batch->recv_message && !batch->recv_trailing_metadata); - PollContext poll_ctx(this, &flusher); - Cancel(batch->payload->cancel_stream.cancel_error, &flusher); - poll_ctx.Run(); + Cancel(batch->payload->cancel_stream.cancel_error); if (is_last()) { batch.CompleteWith(&flusher); } else { @@ -1190,16 +563,6 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { } } - bool wake = false; - if (send_message() != nullptr && batch->send_message) { - send_message()->StartOp(batch); - wake = true; - } - if (receive_message() != nullptr && batch->recv_message) { - receive_message()->StartOp(batch); - wake = true; - } - // send_initial_metadata: seeing this triggers the start of the promise part // of this filter. if (batch->send_initial_metadata) { @@ -1221,7 +584,6 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { send_initial_metadata_batch_ = batch; // And kick start the promise. StartPromise(&flusher); - wake = false; } } else if (batch->recv_trailing_metadata) { // recv_trailing_metadata *without* send_initial_metadata: hook it so we @@ -1237,10 +599,6 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { batch.CancelWith(cancelled_error_, &flusher); } - if (wake) { - PollContext(this, &flusher).Run(); - } - if (batch.is_captured()) { if (!is_last()) { batch.ResumeWith(&flusher); @@ -1251,11 +609,7 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) { } // Handle cancellation. -void ClientCallData::Cancel(grpc_error_handle error, Flusher* flusher) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s Cancel error=%s", LogTag().c_str(), - error.ToString().c_str()); - } +void ClientCallData::Cancel(grpc_error_handle error) { // Track the latest reason for cancellation. cancelled_error_ = error; // Stop running the promise. @@ -1267,7 +621,26 @@ void ClientCallData::Cancel(grpc_error_handle error, Flusher* flusher) { if (recv_trailing_state_ == RecvTrailingState::kQueued) { recv_trailing_state_ = RecvTrailingState::kCancelled; } - send_initial_metadata_batch_.CancelWith(error, flusher); + struct FailBatch : public grpc_closure { + CapturedBatch batch; + ClientCallData* call; + }; + auto fail = [](void* p, grpc_error_handle error) { + auto* f = static_cast(p); + { + Flusher flusher(f->call); + f->batch.CancelWith(error, &flusher); + GRPC_CALL_STACK_UNREF(f->call->call_stack(), "cancel pending batch"); + } + delete f; + }; + auto* b = new FailBatch(); + GRPC_CLOSURE_INIT(b, fail, b, nullptr); + b->batch = std::move(send_initial_metadata_batch_); + b->call = this; + GRPC_CALL_STACK_REF(call_stack(), "cancel pending batch"); + GRPC_CALL_COMBINER_START(call_combiner(), b, cancelled_error_, + "cancel pending batch"); } else { send_initial_state_ = SendInitialState::kCancelled; } @@ -1294,12 +667,6 @@ void ClientCallData::Cancel(grpc_error_handle error, Flusher* flusher) { break; } } - if (send_message() != nullptr) { - send_message()->Done(*ServerMetadataFromStatus(error)); - } - if (receive_message() != nullptr) { - receive_message()->Done(*ServerMetadataFromStatus(error), flusher); - } } // Begin running the promise - which will ultimately take some initial @@ -1313,8 +680,7 @@ void ClientCallData::StartPromise(Flusher* flusher) { promise_ = filter->MakeCallPromise( CallArgs{WrapMetadata(send_initial_metadata_batch_->payload ->send_initial_metadata.send_initial_metadata), - server_initial_metadata_latch(), outgoing_messages_pipe(), - incoming_messages_pipe()}, + server_initial_metadata_latch(), nullptr, nullptr}, [this](CallArgs call_args) { return MakeNextPromise(std::move(call_args)); }); @@ -1322,10 +688,6 @@ void ClientCallData::StartPromise(Flusher* flusher) { } void ClientCallData::RecvInitialMetadataReady(grpc_error_handle error) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ClientCallData.RecvInitialMetadataReady %s", - LogTag().c_str(), DebugString().c_str()); - } ScopedContext context(this); Flusher flusher(this); if (!error.ok()) { @@ -1398,10 +760,6 @@ void ClientCallData::HookRecvTrailingMetadata(CapturedBatch batch) { // - return a wrapper around PollTrailingMetadata as the promise. ArenaPromise ClientCallData::MakeNextPromise( CallArgs call_args) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ClientCallData.MakeNextPromise %s", LogTag().c_str(), - DebugString().c_str()); - } GPR_ASSERT(poll_ctx_ != nullptr); GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued); send_initial_metadata_batch_->payload->send_initial_metadata @@ -1440,16 +798,6 @@ ArenaPromise ClientCallData::MakeNextPromise( } else { GPR_ASSERT(call_args.server_initial_metadata == nullptr); } - if (send_message() != nullptr) { - send_message()->GotPipe(call_args.outgoing_messages); - } else { - GPR_ASSERT(call_args.outgoing_messages == nullptr); - } - if (receive_message() != nullptr) { - receive_message()->GotPipe(call_args.incoming_messages); - } else { - GPR_ASSERT(call_args.incoming_messages == nullptr); - } return ArenaPromise( [this]() { return PollTrailingMetadata(); }); } @@ -1459,10 +807,6 @@ ArenaPromise ClientCallData::MakeNextPromise( // All polls: await receiving the trailing metadata, then return it to the // application. Poll ClientCallData::PollTrailingMetadata() { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ClientCallData.PollTrailingMetadata %s", - LogTag().c_str(), DebugString().c_str()); - } GPR_ASSERT(poll_ctx_ != nullptr); if (send_initial_state_ == SendInitialState::kQueued) { // First poll: pass the send_initial_metadata op down the stack. @@ -1509,18 +853,9 @@ void ClientCallData::RecvTrailingMetadataReadyCallback( void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) { Flusher flusher(this); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, - "%s ClientCallData.RecvTrailingMetadataReady error=%s md=%s", - LogTag().c_str(), error.ToString().c_str(), - recv_trailing_metadata_->DebugString().c_str()); - } // If we were cancelled prior to receiving this callback, we should simply // forward the callback up with the same error. if (recv_trailing_state_ == RecvTrailingState::kCancelled) { - if (cancelling_metadata_.get() != nullptr) { - *recv_trailing_metadata_ = std::move(*cancelling_metadata_); - } if (grpc_closure* call_closure = std::exchange(original_recv_trailing_metadata_ready_, nullptr)) { flusher.AddClosure(call_closure, error, "propagate failure"); @@ -1580,26 +915,6 @@ struct ServerCallData::SendInitialMetadata { State state = kInitial; CapturedBatch batch; Latch* server_initial_metadata_publisher = nullptr; - - static const char* StateString(State state) { - switch (state) { - case kInitial: - return "INITIAL"; - case kGotLatch: - return "GOT_LATCH"; - case kQueuedWaitingForLatch: - return "QUEUED_WAITING_FOR_LATCH"; - case kQueuedAndGotLatch: - return "QUEUED_AND_GOT_LATCH"; - case kQueuedAndSetLatch: - return "QUEUED_AND_SET_LATCH"; - case kForwarded: - return "FORWARDED"; - case kCancelled: - return "CANCELLED"; - } - return "UNKNOWN"; - } }; class ServerCallData::PollContext { @@ -1652,36 +967,6 @@ class ServerCallData::PollContext { bool have_scoped_activity_; }; -const char* ServerCallData::StateString(RecvInitialState state) { - switch (state) { - case RecvInitialState::kInitial: - return "INITIAL"; - case RecvInitialState::kForwarded: - return "FORWARDED"; - case RecvInitialState::kComplete: - return "COMPLETE"; - case RecvInitialState::kResponded: - return "RESPONDED"; - } - return "UNKNOWN"; -} - -const char* ServerCallData::StateString(SendTrailingState state) { - switch (state) { - case SendTrailingState::kInitial: - return "INITIAL"; - case SendTrailingState::kForwarded: - return "FORWARDED"; - case SendTrailingState::kQueuedBehindSendMessage: - return "QUEUED_BEHIND_SEND_MESSAGE"; - case SendTrailingState::kQueued: - return "QUEUED"; - case SendTrailingState::kCancelled: - return "CANCELLED"; - } - return "UNKNOWN"; -} - ServerCallData::ServerCallData(grpc_call_element* elem, const grpc_call_element_args* args, uint8_t flags) @@ -1692,18 +977,9 @@ ServerCallData::ServerCallData(grpc_call_element* elem, GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReadyCallback, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, - RecvTrailingMetadataReadyCallback, this, - grpc_schedule_on_exec_ctx); } -ServerCallData::~ServerCallData() { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ~ServerCallData %s", LogTag().c_str(), - DebugString().c_str()); - } - GPR_ASSERT(poll_ctx_ == nullptr); -} +ServerCallData::~ServerCallData() { GPR_ASSERT(poll_ctx_ == nullptr); } // Activity implementation. void ServerCallData::ForceImmediateRepoll() { @@ -1719,11 +995,6 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) { Flusher flusher(this); bool wake = false; - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s StartBatch: %s", LogTag().c_str(), - DebugString().c_str()); - } - // If this is a cancel stream, cancel anything we have pending and // propagate the cancellation. if (batch->cancel_stream) { @@ -1731,8 +1002,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) { !batch->send_trailing_metadata && !batch->send_message && !batch->recv_initial_metadata && !batch->recv_message && !batch->recv_trailing_metadata); - PollContext poll_ctx(this, &flusher); - Completed(batch->payload->cancel_stream.cancel_error, &flusher); + Cancel(batch->payload->cancel_stream.cancel_error, &flusher); if (is_last()) { batch.CompleteWith(&flusher); } else { @@ -1759,16 +1029,6 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) { recv_initial_state_ = RecvInitialState::kForwarded; } - // Hook recv_trailing_metadata so we can see cancellation from the client. - if (batch->recv_trailing_metadata) { - recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - original_recv_trailing_metadata_ready_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &recv_trailing_metadata_ready_; - } - // send_initial_metadata if (send_initial_metadata_ != nullptr && batch->send_initial_metadata) { switch (send_initial_metadata_->state) { @@ -1780,9 +1040,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) { send_initial_metadata_->state = SendInitialMetadata::kQueuedAndGotLatch; break; case SendInitialMetadata::kCancelled: - batch.CancelWith( - cancelled_error_.ok() ? absl::CancelledError() : cancelled_error_, - &flusher); + batch.CancelWith(cancelled_error_, &flusher); break; case SendInitialMetadata::kQueuedAndGotLatch: case SendInitialMetadata::kQueuedWaitingForLatch: @@ -1794,36 +1052,20 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) { wake = true; } - if (send_message() != nullptr && batch->send_message) { - send_message()->StartOp(batch); - wake = true; - } - if (receive_message() != nullptr && batch->recv_message) { - receive_message()->StartOp(batch); - wake = true; - } - // send_trailing_metadata if (batch.is_captured() && batch->send_trailing_metadata) { switch (send_trailing_state_) { case SendTrailingState::kInitial: send_trailing_metadata_batch_ = batch; - if (send_message() != nullptr && !send_message()->IsIdle()) { - send_trailing_state_ = SendTrailingState::kQueuedBehindSendMessage; - } else { - send_trailing_state_ = SendTrailingState::kQueued; - wake = true; - } + send_trailing_state_ = SendTrailingState::kQueued; + wake = true; break; case SendTrailingState::kQueued: - case SendTrailingState::kQueuedBehindSendMessage: case SendTrailingState::kForwarded: abort(); // unreachable break; case SendTrailingState::kCancelled: - batch.CancelWith( - cancelled_error_.ok() ? absl::CancelledError() : cancelled_error_, - &flusher); + batch.CancelWith(cancelled_error_, &flusher); break; } } @@ -1833,7 +1075,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) { } // Handle cancellation. -void ServerCallData::Completed(grpc_error_handle error, Flusher* flusher) { +void ServerCallData::Cancel(grpc_error_handle error, Flusher* flusher) { // Track the latest reason for cancellation. cancelled_error_ = error; // Stop running the promise. @@ -1863,13 +1105,6 @@ void ServerCallData::Completed(grpc_error_handle error, Flusher* flusher) { std::exchange(original_recv_initial_metadata_ready_, nullptr)) { flusher->AddClosure(closure, error, "original_recv_initial_metadata"); } - ScopedContext ctx(this); - if (send_message() != nullptr) { - send_message()->Done(*ServerMetadataFromStatus(error)); - } - if (receive_message() != nullptr) { - receive_message()->Done(*ServerMetadataFromStatus(error), flusher); - } } // Construct a promise that will "call" the next filter. @@ -1907,16 +1142,6 @@ ArenaPromise ServerCallData::MakeNextPromise( } else { GPR_ASSERT(call_args.server_initial_metadata == nullptr); } - if (send_message() != nullptr) { - send_message()->GotPipe(call_args.outgoing_messages); - } else { - GPR_ASSERT(call_args.outgoing_messages == nullptr); - } - if (receive_message() != nullptr) { - receive_message()->GotPipe(call_args.incoming_messages); - } else { - GPR_ASSERT(call_args.incoming_messages == nullptr); - } return ArenaPromise( [this]() { return PollTrailingMetadata(); }); } @@ -1927,7 +1152,6 @@ ArenaPromise ServerCallData::MakeNextPromise( Poll ServerCallData::PollTrailingMetadata() { switch (send_trailing_state_) { case SendTrailingState::kInitial: - case SendTrailingState::kQueuedBehindSendMessage: return Pending{}; case SendTrailingState::kQueued: return WrapMetadata(send_trailing_metadata_batch_->payload @@ -1943,36 +1167,13 @@ Poll ServerCallData::PollTrailingMetadata() { GPR_UNREACHABLE_CODE(return Pending{}); } -void ServerCallData::RecvTrailingMetadataReadyCallback( - void* arg, grpc_error_handle error) { - static_cast(arg)->RecvTrailingMetadataReady( - std::move(error)); -} - -void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) { - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s: RecvTrailingMetadataReady error=%s md=%s", - LogTag().c_str(), error.ToString().c_str(), - recv_trailing_metadata_->DebugString().c_str()); - } - Flusher flusher(this); - PollContext poll_ctx(this, &flusher); - Completed(error, &flusher); - flusher.AddClosure(original_recv_trailing_metadata_ready_, std::move(error), - "continue recv trailing"); -} - void ServerCallData::RecvInitialMetadataReadyCallback(void* arg, grpc_error_handle error) { - static_cast(arg)->RecvInitialMetadataReady(std::move(error)); + static_cast(arg)->RecvInitialMetadataReady(error); } void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) { Flusher flusher(this); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s: RecvInitialMetadataReady %s", LogTag().c_str(), - error.ToString().c_str()); - } GPR_ASSERT(recv_initial_state_ == RecvInitialState::kForwarded); // If there was an error we just propagate that through if (!error.ok()) { @@ -1989,15 +1190,12 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) { ScopedContext context(this); // Construct the promise. ChannelFilter* filter = static_cast(elem()->channel_data); - FakeActivity().Run([this, filter] { - promise_ = filter->MakeCallPromise( - CallArgs{WrapMetadata(recv_initial_metadata_), - server_initial_metadata_latch(), outgoing_messages_pipe(), - incoming_messages_pipe()}, - [this](CallArgs call_args) { - return MakeNextPromise(std::move(call_args)); - }); - }); + promise_ = filter->MakeCallPromise( + CallArgs{WrapMetadata(recv_initial_metadata_), + server_initial_metadata_latch(), nullptr, nullptr}, + [this](CallArgs call_args) { + return MakeNextPromise(std::move(call_args)); + }); // Poll once. WakeInsideCombiner(&flusher); if (auto* closure = @@ -2007,34 +1205,9 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) { } } -std::string ServerCallData::DebugString() const { - std::vector captured; - if (send_message() != nullptr && send_message()->HaveCapturedBatch()) { - captured.push_back("send_message"); - } - if (send_trailing_metadata_batch_.is_captured()) { - captured.push_back("send_trailing_metadata"); - } - return absl::StrCat( - "have_promise=", promise_.has_value() ? "true" : "false", - " recv_initial_state=", StateString(recv_initial_state_), - " send_trailing_state=", StateString(send_trailing_state_), " captured={", - absl::StrJoin(captured, ","), "}", - send_initial_metadata_ == nullptr - ? "" - : absl::StrCat( - " send_initial_metadata=", - SendInitialMetadata::StateString(send_initial_metadata_->state)) - .c_str()); -} - // Wakeup and poll the promise if appropriate. void ServerCallData::WakeInsideCombiner(Flusher* flusher) { PollContext poll_ctx(this, flusher); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s: WakeInsideCombiner %s", LogTag().c_str(), - DebugString().c_str()); - } if (send_initial_metadata_ != nullptr && send_initial_metadata_->state == SendInitialMetadata::kQueuedAndGotLatch) { @@ -2044,25 +1217,9 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) { .send_initial_metadata); } poll_ctx.ClearRepoll(); - if (send_message() != nullptr) { - send_message()->WakeInsideCombiner(flusher); - if (send_trailing_state_ == SendTrailingState::kQueuedBehindSendMessage && - send_message()->IsIdle()) { - send_trailing_state_ = SendTrailingState::kQueued; - } - } - if (receive_message() != nullptr) { - receive_message()->WakeInsideCombiner(flusher); - } if (promise_.has_value()) { Poll poll; poll = promise_(); - if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s: WakeInsideCombiner poll=%s", LogTag().c_str(), - PollToString(poll, [](const ServerMetadataHandle& h) { - return h->DebugString(); - }).c_str()); - } if (send_initial_metadata_ != nullptr && send_initial_metadata_->state == SendInitialMetadata::kQueuedAndSetLatch) { @@ -2082,14 +1239,7 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) { promise_ = ArenaPromise(); auto* md = UnwrapMetadata(std::move(*r)); bool destroy_md = true; - if (send_message() != nullptr) { - send_message()->Done(*md); - } - if (receive_message() != nullptr) { - receive_message()->Done(*md, flusher); - } switch (send_trailing_state_) { - case SendTrailingState::kQueuedBehindSendMessage: case SendTrailingState::kQueued: { if (send_trailing_metadata_batch_->payload->send_trailing_metadata .send_trailing_metadata != md) { @@ -2106,7 +1256,15 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) { break; case SendTrailingState::kInitial: { GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) != GRPC_STATUS_OK); - Completed(StatusFromMetadata(*md), flusher); + grpc_error_handle error = grpc_error_set_int( + GRPC_ERROR_CREATE("early return from promise based filter"), + StatusIntProperty::kRpcStatus, + *md->get_pointer(GrpcStatusMetadata())); + if (auto* message = md->get_pointer(GrpcMessageMetadata())) { + error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, + message->as_string_view()); + } + Cancel(error, flusher); } break; case SendTrailingState::kCancelled: // Nothing to do. diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 84458b5271b..910849cac30 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -36,8 +36,6 @@ #include "absl/container/inlined_vector.h" #include "absl/meta/type_traits.h" #include "absl/status/status.h" -#include "absl/strings/string_view.h" -#include "absl/types/optional.h" #include #include @@ -60,10 +58,8 @@ #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/latch.h" -#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/resource_quota/arena.h" -#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" @@ -134,8 +130,6 @@ enum class FilterEndpoint { // Flags for MakePromiseBasedFilter. static constexpr uint8_t kFilterExaminesServerInitialMetadata = 1; static constexpr uint8_t kFilterIsLast = 2; -static constexpr uint8_t kFilterExaminesOutboundMessages = 4; -static constexpr uint8_t kFilterExaminesInboundMessages = 8; namespace promise_filter_detail { @@ -269,159 +263,6 @@ class BaseCallData : public Activity, private Wakeable { return p.release(); } - // State machine for sending messages: handles intercepting send_message ops - // and forwarding them through pipes to the promise, then getting the result - // down the stack. - // Split into its own class so that we don't spend the memory instantiating - // these members for filters that don't need to intercept sent messages. - class SendMessage { - public: - explicit SendMessage(BaseCallData* base) - : base_(base), pipe_(base->arena()) {} - PipeReceiver* outgoing_pipe() { return &pipe_.receiver; } - - // Start a send_message op. - void StartOp(CapturedBatch batch); - // Publish the outbound pipe to the filter. - // This happens when the promise requests to call the next filter: until - // this occurs messages can't be sent as we don't know the pipe that the - // promise expects to send on. - void GotPipe(PipeReceiver* receiver); - // Called from client/server polling to do the send message part of the - // work. - void WakeInsideCombiner(Flusher* flusher); - // Call is completed, we have trailing metadata. Close things out. - void Done(const ServerMetadata& metadata); - // Return true if we have a batch captured (for debug logs) - bool HaveCapturedBatch() const { return batch_.is_captured(); } - // Return true if we're not actively sending a message. - bool IsIdle() const; - - private: - enum class State : uint8_t { - // Starting state: no batch started, no outgoing pipe configured. - kInitial, - // We have an outgoing pipe, but no batch started. - // (this is the steady state). - kIdle, - // We have a batch started, but no outgoing pipe configured. - // Stall until we have one. - kGotBatchNoPipe, - // We have a batch, and an outgoing pipe. On the next poll we'll push the - // message into the pipe to the promise. - kGotBatch, - // We've pushed a message into the promise, and we're now waiting for it - // to pop out the other end so we can forward it down the stack. - kPushedToPipe, - // We've forwarded a message down the stack, and now we're waiting for - // completion. - kForwardedBatch, - // We've got the completion callback, we'll close things out during poll - // and then forward completion callbacks up and transition back to idle. - kBatchCompleted, - // We're done. - kCancelled, - }; - static const char* StateString(State); - - void OnComplete(absl::Status status); - - BaseCallData* const base_; - State state_ = State::kInitial; - Pipe pipe_; - PipeReceiver* receiver_ = nullptr; - absl::optional::PushType> push_; - absl::optional::NextType> next_; - absl::optional> next_result_; - CapturedBatch batch_; - grpc_closure* intercepted_on_complete_; - grpc_closure on_complete_ = - MakeMemberClosure(this); - absl::Status completed_status_; - }; - - // State machine for receiving messages: handles intercepting recv_message - // ops, forwarding them down the stack, and then publishing the result via - // pipes to the promise (and ultimately calling the right callbacks for the - // batch when our promise has completed processing of them). - // Split into its own class so that we don't spend the memory instantiating - // these members for filters that don't need to intercept sent messages. - class ReceiveMessage { - public: - explicit ReceiveMessage(BaseCallData* base) - : base_(base), pipe_(base->arena()) {} - PipeSender* incoming_pipe() { return &pipe_.sender; } - - // Start a recv_message op. - void StartOp(CapturedBatch& batch); - // Publish the inbound pipe to the filter. - // This happens when the promise requests to call the next filter: until - // this occurs messages can't be received as we don't know the pipe that the - // promise expects to forward them with. - void GotPipe(PipeSender* sender); - // Called from client/server polling to do the receive message part of the - // work. - void WakeInsideCombiner(Flusher* flusher); - // Call is completed, we have trailing metadata. Close things out. - void Done(const ServerMetadata& metadata, Flusher* flusher); - - private: - enum class State : uint8_t { - // Starting state: no batch started, no incoming pipe configured. - kInitial, - // We have an incoming pipe, but no batch started. - // (this is the steady state). - kIdle, - // We received a batch and forwarded it on, but have not got an incoming - // pipe configured. - kForwardedBatchNoPipe, - // We received a batch and forwarded it on. - kForwardedBatch, - // We got the completion for the recv_message, but we don't yet have a - // pipe configured. Stall until this changes. - kBatchCompletedNoPipe, - // We got the completion for the recv_message, and we have a pipe - // configured: next poll will push the message into the pipe for the - // filter to process. - kBatchCompleted, - // We've pushed a message into the promise, and we're now waiting for it - // to pop out the other end so we can forward it up the stack. - kPushedToPipe, - // We've got a message out of the pipe, now we need to wait for processing - // to completely quiesce in the promise prior to forwarding the completion - // up the stack. - kPulledFromPipe, - // We're done. - kCancelled, - // Call got terminated whilst we had forwarded a recv_message down the - // stack: we need to keep track of that until we get the completion so - // that we do the right thing in OnComplete. - kCancelledWhilstForwarding, - // Call got terminated whilst we had a recv_message batch completed, and - // we've now received the completion. - // On the next poll we'll close things out and forward on completions, - // then transition to cancelled. - kBatchCompletedButCancelled, - }; - static const char* StateString(State); - - void OnComplete(absl::Status status); - - BaseCallData* const base_; - Pipe pipe_; - PipeSender* sender_; - State state_ = State::kInitial; - uint32_t scratch_flags_; - absl::optional* intercepted_slice_buffer_; - uint32_t* intercepted_flags_; - absl::optional::PushType> push_; - absl::optional::NextType> next_; - absl::Status completed_status_; - grpc_closure* intercepted_on_complete_; - grpc_closure on_complete_ = - MakeMemberClosure(this); - }; - Arena* arena() { return arena_; } grpc_call_element* elem() const { return elem_; } CallCombiner* call_combiner() const { return call_combiner_; } @@ -430,26 +271,12 @@ class BaseCallData : public Activity, private Wakeable { Latch* server_initial_metadata_latch() const { return server_initial_metadata_latch_; } - PipeReceiver* outgoing_messages_pipe() const { - return send_message_ == nullptr ? nullptr : send_message_->outgoing_pipe(); - } - PipeSender* incoming_messages_pipe() const { - return receive_message_ == nullptr ? nullptr - : receive_message_->incoming_pipe(); - } - SendMessage* send_message() const { return send_message_; } - ReceiveMessage* receive_message() const { return receive_message_; } bool is_last() const { return grpc_call_stack_element(call_stack_, call_stack_->count - 1) == elem_; } - virtual void WakeInsideCombiner(Flusher* flusher) = 0; - - virtual absl::string_view ClientOrServerString() const = 0; - std::string LogTag() const; - private: // Wakeable implementation. void Wakeup() final; @@ -465,9 +292,7 @@ class BaseCallData : public Activity, private Wakeable { CallFinalization finalization_; grpc_call_context_element* const context_; std::atomic pollent_{nullptr}; - Latch* const server_initial_metadata_latch_; - SendMessage* const send_message_; - ReceiveMessage* const receive_message_; + Latch* server_initial_metadata_latch_ = nullptr; grpc_event_engine::experimental::EventEngine* event_engine_; }; @@ -516,15 +341,11 @@ class ClientCallData : public BaseCallData { kCancelled }; - static const char* StateString(SendInitialState); - static const char* StateString(RecvTrailingState); - std::string DebugString() const; - struct RecvInitialMetadata; class PollContext; // Handle cancellation. - void Cancel(grpc_error_handle error, Flusher* flusher); + void Cancel(grpc_error_handle error); // Begin running the promise - which will ultimately take some initial // metadata and return some trailing metadata. void StartPromise(Flusher* flusher); @@ -550,20 +371,15 @@ class ClientCallData : public BaseCallData { void SetStatusFromError(grpc_metadata_batch* metadata, grpc_error_handle error); // Wakeup and poll the promise if appropriate. - void WakeInsideCombiner(Flusher* flusher) override; + void WakeInsideCombiner(Flusher* flusher); void OnWakeup() override; - absl::string_view ClientOrServerString() const override { return "CLI"; } - // Contained promise ArenaPromise promise_; // Queued batch containing at least a send_initial_metadata op. CapturedBatch send_initial_metadata_batch_; // Pointer to where trailing metadata will be stored. grpc_metadata_batch* recv_trailing_metadata_ = nullptr; - // Trailing metadata as returned by the promise, if we hadn't received - // trailing metadata from below yet (so we can substitute it in). - ServerMetadataHandle cancelling_metadata_; // State tracking recv initial metadata for filters that care about it. RecvInitialMetadata* recv_initial_metadata_ = nullptr; // Closure to call when we're done with the trailing metadata. @@ -591,9 +407,6 @@ class ServerCallData : public BaseCallData { // Handle one grpc_transport_stream_op_batch void StartBatch(grpc_transport_stream_op_batch* batch) override; - protected: - absl::string_view ClientOrServerString() const override { return "SVR"; } - private: // At what stage is our handling of recv initial metadata? enum class RecvInitialState { @@ -612,10 +425,6 @@ class ServerCallData : public BaseCallData { enum class SendTrailingState { // Start state: no op seen kInitial, - // We saw the op, but it was with a send message op (or one was in progress) - // - so we'll wait for that to complete before processing the trailing - // metadata. - kQueuedBehindSendMessage, // We saw the op, and are waiting for the promise to complete // to forward it. kQueued, @@ -625,15 +434,11 @@ class ServerCallData : public BaseCallData { kCancelled }; - static const char* StateString(RecvInitialState state); - static const char* StateString(SendTrailingState state); - std::string DebugString() const; - class PollContext; struct SendInitialMetadata; - // Shut things down when the call completes. - void Completed(grpc_error_handle error, Flusher* flusher); + // Handle cancellation. + void Cancel(grpc_error_handle error, Flusher* flusher); // Construct a promise that will "call" the next filter. // Effectively: // - put the modified initial metadata into the batch being sent up. @@ -646,29 +451,20 @@ class ServerCallData : public BaseCallData { static void RecvInitialMetadataReadyCallback(void* arg, grpc_error_handle error); void RecvInitialMetadataReady(grpc_error_handle error); - static void RecvTrailingMetadataReadyCallback(void* arg, - grpc_error_handle error); - void RecvTrailingMetadataReady(grpc_error_handle error); // Wakeup and poll the promise if appropriate. - void WakeInsideCombiner(Flusher* flusher) override; + void WakeInsideCombiner(Flusher* flusher); void OnWakeup() override; // Contained promise ArenaPromise promise_; // Pointer to where initial metadata will be stored. grpc_metadata_batch* recv_initial_metadata_ = nullptr; - // Pointer to where trailing metadata will be stored. - grpc_metadata_batch* recv_trailing_metadata_ = nullptr; // State for sending initial metadata. SendInitialMetadata* send_initial_metadata_ = nullptr; - // Closure to call when we're done with the initial metadata. + // Closure to call when we're done with the trailing metadata. grpc_closure* original_recv_initial_metadata_ready_ = nullptr; // Our closure pointing to RecvInitialMetadataReadyCallback. grpc_closure recv_initial_metadata_ready_; - // Closure to call when we're done with the trailing metadata. - grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; - // Our closure pointing to RecvTrailingMetadataReadyCallback. - grpc_closure recv_trailing_metadata_ready_; // Error received during cancellation. grpc_error_handle cancelled_error_; // Trailing metadata batch diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 3977ee771d7..f2d3d70f44f 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -103,7 +103,6 @@ class Message { Message& operator=(const Message&) = delete; uint32_t flags() const { return flags_; } - uint32_t& mutable_flags() { return flags_; } SliceBuffer* payload() { return &payload_; } const SliceBuffer* payload() const { return &payload_; }