@ -28,7 +28,9 @@
# include "absl/log/check.h"
# include "src/core/lib/promise/activity.h"
# include "src/core/lib/promise/poll.h"
# include "src/core/lib/promise/status_flag.h"
# include "src/core/lib/promise/wait_set.h"
# include "src/core/util/dump_args.h"
# include "src/core/util/ref_counted.h"
# include "src/core/util/ref_counted_ptr.h"
# include "src/core/util/sync.h"
@ -54,18 +56,23 @@ class Center : public RefCounted<Center<T>> {
// - Returns true if new items were obtained, in which case they are contained
// in dest in the order they were added. Wakes up all pending senders since
// there will now be space to send.
// - If receives have been closed, returns false.
// - If no new items are available, returns
// false and sets up a waker to be awoken when more items are available.
// Pending and sets up a waker to be awoken when more items are available.
// TODO(ctiller): consider the problem of thundering herds here. There may be
// more senders than there are queue spots, and so the strategy of waking up
// all senders is ill-advised.
// That said, some senders may have been cancelled by the time we wake them,
// and so waking a subset could cause starvation.
bool PollReceiveBatch ( std : : vector < T > & dest ) {
Poll < bool > PollReceiveBatch ( std : : vector < T > & dest ) {
ReleasableMutexLock lock ( & mu_ ) ;
GRPC_TRACE_LOG ( promise_primitives , INFO )
< < " MPSC::PollReceiveBatch: "
< < GRPC_DUMP_ARGS ( this , batch_ , queue_ . size ( ) ) ;
if ( queue_ . empty ( ) ) {
if ( batch_ = = kClosedBatch ) return false ;
receive_waker_ = GetContext < Activity > ( ) - > MakeNonOwningWaker ( ) ;
return false ;
return Pending { } ;
}
dest . swap ( queue_ ) ;
queue_ . clear ( ) ;
@ -97,18 +104,24 @@ class Center : public RefCounted<Center<T>> {
// Poll until a particular batch number is received.
Poll < Empty > PollReceiveBatch ( uint64_t batch ) {
ReleasableMutexLock lock ( & mu_ ) ;
GRPC_TRACE_LOG ( promise_primitives , INFO )
< < " MPSC::PollReceiveBatch: " < < GRPC_DUMP_ARGS ( this , batch_ , batch ) ;
if ( batch_ > = batch ) return Empty { } ;
send_wakers_ . AddPending ( GetContext < Activity > ( ) - > MakeNonOwningWaker ( ) ) ;
return Pending { } ;
}
// Mark that the receiver is closed.
void ReceiverClosed ( ) {
void ReceiverClosed ( bool wake_receiver ) {
ReleasableMutexLock lock ( & mu_ ) ;
GRPC_TRACE_LOG ( promise_primitives , INFO )
< < " MPSC::ReceiverClosed: " < < GRPC_DUMP_ARGS ( this , batch_ ) ;
if ( batch_ = = kClosedBatch ) return ;
batch_ = kClosedBatch ;
auto wakeups = send_wakers_ . TakeWakeupSet ( ) ;
auto receive_waker = std : : move ( receive_waker_ ) ;
lock . Release ( ) ;
if ( wake_receiver ) receive_waker . Wakeup ( ) ;
wakeups . Wakeup ( ) ;
}
@ -188,10 +201,10 @@ class MpscReceiver {
: center_ ( MakeRefCounted < mpscpipe_detail : : Center < T > > (
std : : max ( static_cast < size_t > ( 1 ) , max_buffer_hint / 2 ) ) ) { }
~ MpscReceiver ( ) {
if ( center_ ! = nullptr ) center_ - > ReceiverClosed ( ) ;
if ( center_ ! = nullptr ) center_ - > ReceiverClosed ( false ) ;
}
void MarkClosed ( ) {
if ( center_ ! = nullptr ) center_ - > ReceiverClosed ( ) ;
if ( center_ ! = nullptr ) center_ - > ReceiverClosed ( true ) ;
}
MpscReceiver ( const MpscReceiver & ) = delete ;
MpscReceiver & operator = ( const MpscReceiver & ) = delete ;
@ -210,15 +223,19 @@ class MpscReceiver {
// Construct a new sender for this receiver.
MpscSender < T > MakeSender ( ) { return MpscSender < T > ( center_ ) ; }
// Return a promise that will resolve to the next item (and remove said item).
// Return a promise that will resolve to ValueOrFailure<T>.
// If receiving is closed, it will resolve to failure.
// Otherwise, resolves to the next item (and removes said item).
auto Next ( ) {
return [ this ] ( ) - > Poll < T > {
return [ this ] ( ) - > Poll < ValueOrFailure < T > > {
if ( buffer_it_ ! = buffer_ . end ( ) ) {
return Poll < T > ( std : : move ( * buffer_it_ + + ) ) ;
return Poll < ValueOrFailure < T > > ( std : : move ( * buffer_it_ + + ) ) ;
}
if ( center_ - > PollReceiveBatch ( buffer_ ) ) {
auto p = center_ - > PollReceiveBatch ( buffer_ ) ;
if ( bool * r = p . value_if_ready ( ) ) {
if ( ! * r ) return Failure { } ;
buffer_it_ = buffer_ . begin ( ) ;
return Poll < T > ( std : : move ( * buffer_it_ + + ) ) ;
return Poll < ValueOrFailure < T > > ( std : : move ( * buffer_it_ + + ) ) ;
}
return Pending { } ;
} ;