From a749d07acfcb1a59d7833c9fa113dd698cde7703 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Aug 2023 14:05:31 -0700 Subject: [PATCH] [promises] Add an unbuffered/immediate send to mpsc for cancellations (#34208) --- src/core/lib/promise/mpsc.h | 14 ++++++++++++++ test/core/promise/mpsc_test.cc | 26 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h index 1607cf9bc6c..8525739068f 100644 --- a/src/core/lib/promise/mpsc.h +++ b/src/core/lib/promise/mpsc.h @@ -91,6 +91,16 @@ class Center : public RefCounted> { return Pending{}; } + bool ImmediateSend(T t) { + ReleasableMutexLock lock(&mu_); + if (receiver_closed_) return false; + queue_.push_back(std::move(t)); + auto receive_waker = std::move(receive_waker_); + lock.Release(); + receive_waker.Wakeup(); + return true; + } + // Mark that the receiver is closed. void ReceiverClosed() { MutexLock lock(&mu_); @@ -127,6 +137,10 @@ class MpscSender { return [this, t = std::move(t)]() mutable { return center_->PollSend(t); }; } + bool UnbufferedImmediateSend(T t) { + return center_->ImmediateSend(std::move(t)); + } + private: friend class MpscReceiver; explicit MpscSender(RefCountedPtr> center) diff --git a/test/core/promise/mpsc_test.cc b/test/core/promise/mpsc_test.cc index 24296ca77b4..38baa68cb60 100644 --- a/test/core/promise/mpsc_test.cc +++ b/test/core/promise/mpsc_test.cc @@ -147,6 +147,32 @@ TEST(MpscTest, ClosureIsVisibleToSenders) { EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), false); } +TEST(MpscTest, ImmediateSendWorks) { + StrictMock activity; + MpscReceiver receiver(1); + MpscSender sender = receiver.MakeSender(); + + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true); + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true); + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true); + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true); + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true); + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true); + EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true); + + activity.Activate(); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1)); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(2)); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(3)); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(4)); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(5)); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(6)); + EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(7)); + auto receive2 = receiver.Next(); + EXPECT_EQ(receive2(), Poll(Pending{})); + activity.Deactivate(); +} + } // namespace } // namespace grpc_core