[call-v3] Expand suite of passing tests: enable cancel_after_client_done (#37326)

Closes #37326

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37326 from ctiller:terrible-tommy 846fa76522
PiperOrigin-RevId: 659607451
pull/37400/head
Craig Tiller 6 months ago committed by Copybara-Service
parent 781fcf9d10
commit 1514a1cf89
  1. 41
      src/core/ext/transport/chaotic_good/client_transport.cc
  2. 3
      src/core/ext/transport/chaotic_good/frame.h
  3. 58
      src/core/ext/transport/chaotic_good/server_transport.cc
  4. 5
      src/core/ext/transport/chaotic_good/server_transport.h
  5. 6
      src/core/lib/surface/call.cc
  6. 36
      src/core/lib/transport/call_spine.h
  7. 2
      test/core/end2end/tests/cancel_after_client_done.cc

@ -254,7 +254,11 @@ uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
const uint32_t stream_id = next_stream_id_++;
stream_map_.emplace(stream_id, call_handler);
lock.Release();
call_handler.OnDone([this, stream_id]() {
call_handler.OnDone([this, stream_id](bool cancelled) {
if (cancelled) {
outgoing_frames_.MakeSender().UnbufferedImmediateSend(
CancelFrame{stream_id});
}
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
});
@ -317,24 +321,23 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
"outbound_loop", [self = RefAsSubclass<ChaoticGoodClientTransport>(),
call_handler]() mutable {
const uint32_t stream_id = self->MakeStream(call_handler);
return Map(self->CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = self->outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id
<< " finished with " << result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
CancelFrame frame;
frame.stream_id = stream_id;
if (!sender.UnbufferedImmediateSend(std::move(frame))) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});
return Map(
self->CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = self->outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id << " finished with "
<< result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
if (!sender.UnbufferedImmediateSend(CancelFrame{stream_id})) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});
});
}

@ -156,6 +156,9 @@ struct ServerFragmentFrame final : public FrameInterface {
};
struct CancelFrame final : public FrameInterface {
CancelFrame() = default;
explicit CancelFrame(uint32_t stream_id) : stream_id(stream_id) {}
absl::Status Deserialize(HPackParser* parser, const FrameHeader& header,
absl::BitGenRef bitsrc, Arena* arena,
BufferPair buffers, FrameLimits limits) override;

@ -72,8 +72,7 @@ auto ChaoticGoodServerTransport::TransportWriteLoop(
}
auto ChaoticGoodServerTransport::PushFragmentIntoCall(
CallInitiator call_initiator, ClientFragmentFrame frame,
uint32_t stream_id) {
CallInitiator call_initiator, ClientFragmentFrame frame) {
DCHECK(frame.headers == nullptr);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: PushFragmentIntoCall: frame=" << frame.ToString();
@ -84,17 +83,15 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall(
std::move(frame.message->message));
},
[]() -> StatusFlag { return Success{}; }),
[this, call_initiator, end_of_stream = frame.end_of_stream,
stream_id](StatusFlag status) mutable -> StatusFlag {
[call_initiator, end_of_stream = frame.end_of_stream](
StatusFlag status) mutable -> StatusFlag {
if (!status.ok() && GRPC_TRACE_FLAG_ENABLED(chaotic_good)) {
LOG(INFO) << "CHAOTIC_GOOD: Failed PushFragmentIntoCall";
}
if (end_of_stream || !status.ok()) {
call_initiator.FinishSends();
// We have received end_of_stream. It is now safe to remove
// the call from the stream map.
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
// Note that we cannot remove from the stream map yet, as we
// may yet receive a cancellation.
}
return Success{};
});
@ -102,17 +99,16 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall(
auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
absl::optional<CallInitiator> call_initiator, absl::Status error,
ClientFragmentFrame frame, uint32_t stream_id) {
ClientFragmentFrame frame) {
return If(
call_initiator.has_value() && error.ok(),
[this, &call_initiator, &frame, &stream_id]() {
[this, &call_initiator, &frame]() {
return Map(
call_initiator->SpawnWaitable(
"push-fragment",
[call_initiator, frame = std::move(frame), stream_id,
this]() mutable {
return call_initiator->CancelIfFails(PushFragmentIntoCall(
*call_initiator, std::move(frame), stream_id));
[call_initiator, frame = std::move(frame), this]() mutable {
return call_initiator->CancelIfFails(
PushFragmentIntoCall(*call_initiator, std::move(frame)));
}),
[](StatusFlag status) { return StatusCast<absl::Status>(status); });
},
@ -255,8 +251,7 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
}
}
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
std::move(fragment_frame),
frame_header.stream_id);
std::move(fragment_frame));
}
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
@ -271,8 +266,7 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
frame_header, std::move(buffers), arena, fragment_frame,
FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
std::move(fragment_frame),
frame_header.stream_id);
std::move(fragment_frame));
}
auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
@ -305,6 +299,10 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
[this, &frame_header]() {
absl::optional<CallInitiator> call_initiator =
ExtractStream(frame_header.stream_id);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "Cancel stream " << frame_header.stream_id
<< (call_initiator.has_value() ? " (active)"
: " (not found)");
return If(
call_initiator.has_value(),
[&call_initiator]() {
@ -410,6 +408,8 @@ void ChaoticGoodServerTransport::AbortWithError() {
absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
uint32_t stream_id) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " LookupStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) return absl::nullopt;
@ -418,6 +418,8 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
uint32_t stream_id) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " ExtractStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) return absl::nullopt;
@ -428,6 +430,8 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
absl::Status ChaoticGoodServerTransport::NewStream(
uint32_t stream_id, CallInitiator call_initiator) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " NewStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
@ -437,10 +441,20 @@ absl::Status ChaoticGoodServerTransport::NewStream(
return absl::InternalError("Stream id is not increasing");
}
stream_map_.emplace(stream_id, call_initiator);
call_initiator.OnDone([this, stream_id]() {
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
});
call_initiator.OnDone(
[self = RefAsSubclass<ChaoticGoodServerTransport>(), stream_id](bool) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << self.get() << " OnDone " << stream_id;
absl::optional<CallInitiator> call_initiator =
self->ExtractStream(stream_id);
if (call_initiator.has_value()) {
auto c = std::move(*call_initiator);
c.SpawnInfallible("cancel", [c]() mutable {
c.Cancel();
return Empty{};
});
}
});
return absl::OkStatus();
}

@ -131,10 +131,9 @@ class ChaoticGoodServerTransport final : public ServerTransport {
FrameHeader frame_header, BufferPair buffers,
ChaoticGoodTransport& transport);
auto MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator,
absl::Status error, ClientFragmentFrame frame,
uint32_t stream_id);
absl::Status error, ClientFragmentFrame frame);
auto PushFragmentIntoCall(CallInitiator call_initiator,
ClientFragmentFrame frame, uint32_t stream_id);
ClientFragmentFrame frame);
RefCountedPtr<UnstartedCallDestination> call_destination_;
const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;

@ -337,9 +337,9 @@ void Call::HandleCompressionAlgorithmDisabled(
void Call::UpdateDeadline(Timestamp deadline) {
ReleasableMutexLock lock(&deadline_mu_);
if (GRPC_TRACE_FLAG_ENABLED(call)) {
VLOG(2) << "[call " << this
<< "] UpdateDeadline from=" << deadline_.ToString()
<< " to=" << deadline.ToString();
LOG(INFO) << "[call " << this
<< "] UpdateDeadline from=" << deadline_.ToString()
<< " to=" << deadline.ToString();
}
if (deadline >= deadline_) return;
if (deadline < Timestamp::Now()) {

@ -51,23 +51,24 @@ class CallSpine final : public Party {
std::move(client_initial_metadata), std::move(arena)));
}
~CallSpine() override {}
~CallSpine() override { CallOnDone(true); }
CallFilters& call_filters() { return call_filters_; }
// Add a callback to be called when server trailing metadata is received.
void OnDone(absl::AnyInvocable<void()> fn) {
void OnDone(absl::AnyInvocable<void(bool)> fn) {
if (on_done_ == nullptr) {
on_done_ = std::move(fn);
return;
}
on_done_ = [first = std::move(fn), next = std::move(on_done_)]() mutable {
first();
next();
on_done_ = [first = std::move(fn),
next = std::move(on_done_)](bool cancelled) mutable {
first(cancelled);
next(cancelled);
};
}
void CallOnDone() {
if (on_done_ != nullptr) std::exchange(on_done_, nullptr)();
void CallOnDone(bool cancelled) {
if (on_done_ != nullptr) std::exchange(on_done_, nullptr)(cancelled);
}
auto PullServerInitialMetadata() {
@ -75,7 +76,12 @@ class CallSpine final : public Party {
}
auto PullServerTrailingMetadata() {
return call_filters().PullServerTrailingMetadata();
return Map(
call_filters().PullServerTrailingMetadata(),
[this](ServerMetadataHandle result) {
CallOnDone(result->get(GrpcCallWasCancelled()).value_or(false));
return result;
});
}
auto PushClientToServerMessage(MessageHandle message) {
@ -190,7 +196,7 @@ class CallSpine final : public Party {
// Call filters/pipes part of the spine
CallFilters call_filters_;
absl::AnyInvocable<void()> on_done_{nullptr};
absl::AnyInvocable<void(bool)> on_done_{nullptr};
};
class CallInitiator {
@ -227,7 +233,9 @@ class CallInitiator {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
void OnDone(absl::AnyInvocable<void(bool)> fn) {
spine_->OnDone(std::move(fn));
}
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
@ -274,7 +282,9 @@ class CallHandler {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
void OnDone(absl::AnyInvocable<void(bool)> fn) {
spine_->OnDone(std::move(fn));
}
template <typename Promise>
auto CancelIfFails(Promise promise) {
@ -327,7 +337,9 @@ class UnstartedCallHandler {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
void OnDone(absl::AnyInvocable<void(bool)> fn) {
spine_->OnDone(std::move(fn));
}
template <typename Promise>
auto CancelIfFails(Promise promise) {

@ -67,12 +67,10 @@ void CancelAfterClientDone(
}
CORE_END2END_TEST(CoreEnd2endTest, CancelAfterClientDone) {
SKIP_IF_V3();
CancelAfterClientDone(*this, std::make_unique<CancelCancellationMode>());
}
CORE_END2END_TEST(CoreDeadlineTest, DeadlineAfterClientDone) {
SKIP_IF_V3();
CancelAfterClientDone(*this, std::make_unique<DeadlineCancellationMode>());
}

Loading…
Cancel
Save