[promises] Add an unbuffered/immediate send to mpsc for cancellations (#34208)

pull/34211/head
Craig Tiller 1 year ago committed by GitHub
parent 7ebcb6e564
commit a749d07acf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      src/core/lib/promise/mpsc.h
  2. 26
      test/core/promise/mpsc_test.cc

@ -91,6 +91,16 @@ class Center : public RefCounted<Center<T>> {
return Pending{};
}
bool ImmediateSend(T t) {
ReleasableMutexLock lock(&mu_);
if (receiver_closed_) return false;
queue_.push_back(std::move(t));
auto receive_waker = std::move(receive_waker_);
lock.Release();
receive_waker.Wakeup();
return true;
}
// Mark that the receiver is closed.
void ReceiverClosed() {
MutexLock lock(&mu_);
@ -127,6 +137,10 @@ class MpscSender {
return [this, t = std::move(t)]() mutable { return center_->PollSend(t); };
}
bool UnbufferedImmediateSend(T t) {
return center_->ImmediateSend(std::move(t));
}
private:
friend class MpscReceiver<T>;
explicit MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center)

@ -147,6 +147,32 @@ TEST(MpscTest, ClosureIsVisibleToSenders) {
EXPECT_EQ(NowOrNever(sender.Send(MakePayload(1))), false);
}
TEST(MpscTest, ImmediateSendWorks) {
StrictMock<MockActivity> activity;
MpscReceiver<Payload> receiver(1);
MpscSender<Payload> sender = receiver.MakeSender();
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(1)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(2)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(3)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(4)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(5)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(6)), true);
EXPECT_EQ(sender.UnbufferedImmediateSend(MakePayload(7)), true);
activity.Activate();
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(1));
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(2));
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(3));
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(4));
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(5));
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(6));
EXPECT_EQ(NowOrNever(receiver.Next()), MakePayload(7));
auto receive2 = receiver.Next();
EXPECT_EQ(receive2(), Poll<Payload>(Pending{}));
activity.Deactivate();
}
} // namespace
} // namespace grpc_core

Loading…
Cancel
Save