@ -20,6 +20,7 @@
# include <stddef.h>
# include <stdint.h>
# include <atomic>
# include <functional>
# include <memory>
# include <utility>
@ -37,6 +38,8 @@
# include "src/core/lib/gprpp/sync.h"
# include "src/core/lib/promise/activity.h"
# include "src/core/lib/promise/if.h"
# include "src/core/lib/promise/map.h"
# include "src/core/lib/promise/poll.h"
# include "src/core/lib/slice/slice.h"
# include "src/core/lib/slice/slice_buffer.h"
@ -50,10 +53,12 @@ class PromiseEndpoint {
std : : unique_ptr < grpc_event_engine : : experimental : : EventEngine : : Endpoint >
endpoint ,
SliceBuffer already_received ) ;
~ PromiseEndpoint ( ) ;
/// Prevent copying and moving of PromiseEndpoint.
~ PromiseEndpoint ( ) = default ;
/// Prevent copying of PromiseEndpoint; moving is fine .
PromiseEndpoint ( const PromiseEndpoint & ) = delete ;
PromiseEndpoint ( PromiseEndpoint & & ) = delete ;
PromiseEndpoint & operator = ( const PromiseEndpoint & ) = delete ;
PromiseEndpoint ( PromiseEndpoint & & ) = default ;
PromiseEndpoint & operator = ( PromiseEndpoint & & ) = default ;
// Returns a promise that resolves to a `absl::Status` indicating the result
// of the write operation.
@ -62,36 +67,37 @@ class PromiseEndpoint {
// `Write()` before the previous write finishes. Doing that results in
// undefined behavior.
auto Write ( SliceBuffer data ) {
{
MutexLock lock ( & write_mutex_ ) ;
// Assert previous write finishes.
GPR_ASSERT ( ! write_result_ . has_value ( ) ) ;
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap ( write_buffer_ . c_slice_buffer ( ) ,
data . c_slice_buffer ( ) ) ;
}
// Assert previous write finishes.
GPR_ASSERT ( ! write_state_ - > complete . load ( std : : memory_order_relaxed ) ) ;
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap ( write_state_ - > buffer . c_slice_buffer ( ) ,
data . c_slice_buffer ( ) ) ;
// If `Write()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result.
if ( endpoint_ - > Write ( std : : bind ( & PromiseEndpoint : : WriteCallback , this ,
std : : placeholders : : _1 ) ,
& write_buffer_ ,
nullptr /* uses default arguments */ ) ) {
WriteCallback ( absl : : OkStatus ( ) ) ;
}
return [ this ] ( ) - > Poll < absl : : Status > {
MutexLock lock ( & write_mutex_ ) ;
// If current write isn't finished return `Pending()`, else return write
// result.
if ( ! write_result_ . has_value ( ) ) {
write_waker_ = Activity : : current ( ) - > MakeNonOwningWaker ( ) ;
return Pending ( ) ;
} else {
const auto ret = * write_result_ ;
write_result_ . reset ( ) ;
return ret ;
}
} ;
write_state_ - > waker = Activity : : current ( ) - > MakeNonOwningWaker ( ) ;
const bool completed = endpoint_ - > Write (
[ write_state = write_state_ ] ( absl : : Status status ) {
write_state - > Complete ( std : : move ( status ) ) ;
} ,
& write_state_ - > buffer , nullptr /* uses default arguments */ ) ;
return If (
completed ,
[ this ] ( ) {
write_state_ - > waker = Waker ( ) ;
return [ ] ( ) { return absl : : OkStatus ( ) ; } ;
} ,
[ this ] ( ) {
return [ write_state = write_state_ ] ( ) - > Poll < absl : : Status > {
// If current write isn't finished return `Pending()`, else return
// write result.
if ( ! write_state - > complete . load ( std : : memory_order_acquire ) ) {
return Pending ( ) ;
}
write_state - > complete . store ( false , std : : memory_order_relaxed ) ;
return std : : move ( write_state - > result ) ;
} ;
} ) ;
}
// Returns a promise that resolves to `SliceBuffer` with
@ -101,47 +107,62 @@ class PromiseEndpoint {
// `Read()` before the previous read finishes. Doing that results in
// undefined behavior.
auto Read ( size_t num_bytes ) {
ReleasableMutexLock lock ( & read_mutex_ ) ;
// Assert previous read finishes.
GPR_ASSERT ( ! read_result_ . has_value ( ) ) ;
GPR_ASSERT ( ! read_state_ - > complete . load ( std : : memory_order_relaxed ) ) ;
// Should not have pending reads.
GPR_ASSERT ( pending_read_ buffer_ . Count ( ) = = 0u ) ;
if ( read_buffer_ . Length ( ) < num_bytes ) {
lock . Release ( ) ;
GPR_ASSERT ( read_state_ - > pending_buffer . Count ( ) = = 0u ) ;
bool complete = true ;
while ( read_state_ - > buffer . Length ( ) < num_bytes ) {
// Set read args with hinted bytes.
grpc_event_engine : : experimental : : EventEngine : : Endpoint : : ReadArgs
read_args = { static_cast < int64_t > ( num_bytes ) } ;
read_args = {
static_cast < int64_t > ( num_bytes - read_state_ - > buffer . Length ( ) ) } ;
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
if ( endpoint_ - > Read ( std : : bind ( & PromiseEndpoint : : ReadCallback , this ,
std : : placeholders : : _1 , num_bytes ) ,
& pending_read_buffer_ , & read_args ) ) {
ReadCallback ( absl : : OkStatus ( ) , num_bytes ) ;
}
} else {
read_result_ = absl : : OkStatus ( ) ;
}
return [ this , num_bytes ] ( ) - > Poll < absl : : StatusOr < SliceBuffer > > {
MutexLock lock ( & read_mutex_ ) ;
if ( ! read_result_ . has_value ( ) ) {
// If current read isn't finished, return `Pending()`.
read_waker_ = Activity : : current ( ) - > MakeNonOwningWaker ( ) ;
return Pending ( ) ;
} else if ( ! read_result_ - > ok ( ) ) {
// If read fails, return error.
const absl : : Status ret = * read_result_ ;
read_result_ . reset ( ) ;
return ret ;
// called.
read_state_ - > waker = Activity : : current ( ) - > MakeNonOwningWaker ( ) ;
if ( endpoint_ - > Read (
[ read_state = read_state_ , num_bytes ] ( absl : : Status status ) {
read_state - > Complete ( std : : move ( status ) , num_bytes ) ;
} ,
& read_state_ - > pending_buffer , & read_args ) ) {
read_state_ - > waker = Waker ( ) ;
read_state_ - > pending_buffer . MoveFirstNBytesIntoSliceBuffer (
read_state_ - > pending_buffer . Length ( ) , read_state_ - > buffer ) ;
GPR_DEBUG_ASSERT ( read_state_ - > pending_buffer . Count ( ) = = 0u ) ;
} else {
// If read succeeds, return `SliceBuffer` with `num_bytes` bytes.
SliceBuffer ret ;
grpc_slice_buffer_move_first ( read_buffer_ . c_slice_buffer ( ) , num_bytes ,
ret . c_slice_buffer ( ) ) ;
read_result_ . reset ( ) ;
return std : : move ( ret ) ;
complete = false ;
break ;
}
} ;
}
return If (
complete ,
[ this , num_bytes ] ( ) {
SliceBuffer ret ;
grpc_slice_buffer_move_first ( read_state_ - > buffer . c_slice_buffer ( ) ,
num_bytes , ret . c_slice_buffer ( ) ) ;
return [ ret = std : : move (
ret ) ] ( ) mutable - > Poll < absl : : StatusOr < SliceBuffer > > {
return std : : move ( ret ) ;
} ;
} ,
[ this , num_bytes ] ( ) {
return [ read_state = read_state_ ,
num_bytes ] ( ) - > Poll < absl : : StatusOr < SliceBuffer > > {
if ( ! read_state - > complete . load ( std : : memory_order_acquire ) ) {
return Pending ( ) ;
}
// If read succeeds, return `SliceBuffer` with `num_bytes` bytes.
if ( read_state - > result . ok ( ) ) {
SliceBuffer ret ;
grpc_slice_buffer_move_first ( read_state - > buffer . c_slice_buffer ( ) ,
num_bytes , ret . c_slice_buffer ( ) ) ;
read_state - > complete . store ( false , std : : memory_order_relaxed ) ;
return ret ;
}
read_state - > complete . store ( false , std : : memory_order_relaxed ) ;
return std : : move ( read_state - > result ) ;
} ;
} ) ;
}
// Returns a promise that resolves to `Slice` with at least
@ -151,93 +172,20 @@ class PromiseEndpoint {
// `ReadSlice()` before the previous read finishes. Doing that results in
// undefined behavior.
auto ReadSlice ( size_t num_bytes ) {
ReleasableMutexLock lock ( & read_mutex_ ) ;
// Assert previous read finishes.
GPR_ASSERT ( ! read_result_ . has_value ( ) ) ;
// Should not have pending reads.
GPR_ASSERT ( pending_read_buffer_ . Count ( ) = = 0u ) ;
if ( read_buffer_ . Length ( ) < num_bytes ) {
lock . Release ( ) ;
// Set read args with num_bytes as hint.
grpc_event_engine : : experimental : : EventEngine : : Endpoint : : ReadArgs
read_args = { static_cast < int64_t > ( num_bytes ) } ;
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result
// and maybe do further reads.
if ( endpoint_ - > Read ( std : : bind ( & PromiseEndpoint : : ReadCallback , this ,
std : : placeholders : : _1 , num_bytes ) ,
& pending_read_buffer_ , & read_args ) ) {
ReadCallback ( absl : : OkStatus ( ) , num_bytes ) ;
}
} else {
read_result_ = absl : : OkStatus ( ) ;
}
return [ this , num_bytes ] ( ) - > Poll < absl : : StatusOr < Slice > > {
MutexLock lock ( & read_mutex_ ) ;
if ( ! read_result_ . has_value ( ) ) {
// If current read isn't finished, return `Pending()`.
read_waker_ = Activity : : current ( ) - > MakeNonOwningWaker ( ) ;
return Pending ( ) ;
} else if ( ! read_result_ - > ok ( ) ) {
// If read fails, return error.
const auto ret = * read_result_ ;
read_result_ . reset ( ) ;
return ret ;
}
// If read succeeds, return `Slice` with `num_bytes`.
else if ( read_buffer_ . RefSlice ( 0 ) . size ( ) = = num_bytes ) {
read_result_ . reset ( ) ;
return Slice ( read_buffer_ . TakeFirst ( ) . TakeCSlice ( ) ) ;
} else {
// TODO(ladynana): avoid memcpy when read_buffer_.RefSlice(0).size() is
// different from `num_bytes`.
MutableSlice ret = MutableSlice : : CreateUninitialized ( num_bytes ) ;
read_buffer_ . MoveFirstNBytesIntoBuffer ( num_bytes , ret . data ( ) ) ;
read_result_ . reset ( ) ;
return Slice ( std : : move ( ret ) ) ;
}
} ;
return Map ( Read ( num_bytes ) ,
[ ] ( absl : : StatusOr < SliceBuffer > buffer ) - > absl : : StatusOr < Slice > {
if ( ! buffer . ok ( ) ) return buffer . status ( ) ;
return buffer - > JoinIntoSlice ( ) ;
} ) ;
}
// Returns a promise that resolves to a byte with type `uint8_t`.
auto ReadByte ( ) {
ReleasableMutexLock lock ( & read_mutex_ ) ;
// Assert previous read finishes.
GPR_ASSERT ( ! read_result_ . has_value ( ) ) ;
// Should not have pending reads.
GPR_ASSERT ( pending_read_buffer_ . Count ( ) = = 0u ) ;
if ( read_buffer_ . Length ( ) = = 0u ) {
lock . Release ( ) ;
// If `Read()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result and maybe do
// further reads.
if ( endpoint_ - > Read ( std : : bind ( & PromiseEndpoint : : ReadByteCallback , this ,
std : : placeholders : : _1 ) ,
& pending_read_buffer_ , nullptr ) ) {
ReadByteCallback ( absl : : OkStatus ( ) ) ;
}
} else {
read_result_ = absl : : OkStatus ( ) ;
}
return [ this ] ( ) - > Poll < absl : : StatusOr < uint8_t > > {
MutexLock lock ( & read_mutex_ ) ;
if ( ! read_result_ . has_value ( ) ) {
// If current read isn't finished, return `Pending()`.
read_waker_ = Activity : : current ( ) - > MakeNonOwningWaker ( ) ;
return Pending ( ) ;
} else if ( ! read_result_ - > ok ( ) ) {
// If read fails, return error.
const auto ret = * read_result_ ;
read_result_ . reset ( ) ;
return ret ;
} else {
// If read succeeds, return a byte with type `uint8_t`.
uint8_t ret = 0u ;
read_buffer_ . MoveFirstNBytesIntoBuffer ( 1 , & ret ) ;
read_result_ . reset ( ) ;
return ret ;
}
} ;
return Map ( ReadSlice ( 1 ) ,
[ ] ( absl : : StatusOr < Slice > slice ) - > absl : : StatusOr < uint8_t > {
if ( ! slice . ok ( ) ) return slice . status ( ) ;
return ( * slice ) [ 0 ] ;
} ) ;
}
const grpc_event_engine : : experimental : : EventEngine : : ResolvedAddress &
@ -246,49 +194,52 @@ class PromiseEndpoint {
GetLocalAddress ( ) const ;
private :
std : : unique _ptr< grpc_event_engine : : experimental : : EventEngine : : Endpoint >
std : : shared _ptr< grpc_event_engine : : experimental : : EventEngine : : Endpoint >
endpoint_ ;
// Data used for writes.
// TODO(ladynana): Remove this write_mutex_ and use `atomic<bool>
// write_complete_` as write guard.
Mutex write_mutex_ ;
// Write buffer used for `EventEngine::Endpoint::Write()` to ensure the
// memory behind the buffer is not lost.
grpc_event_engine : : experimental : : SliceBuffer write_buffer_ ;
// Used for store the result from `EventEngine::Endpoint::Write()`.
// `write_result_.has_value() == true` means the value has not been polled
// yet.
absl : : optional < absl : : Status > write_result_ ABSL_GUARDED_BY ( write_mutex_ ) ;
Waker write_waker_ ABSL_GUARDED_BY ( write_mutex_ ) ;
// Callback function used for `EventEngine::Endpoint::Write()`.
void WriteCallback ( absl : : Status status ) ;
// Data used for reads
// TODO(ladynana): Remove this read_mutex_ and use `atomic<bool>
// read_complete_` as read guard.
Mutex read_mutex_ ;
// Read buffer used for storing successful reads given by
// `EventEngine::Endpoint` but not yet requested by the caller.
grpc_event_engine : : experimental : : SliceBuffer read_buffer_ ;
// Buffer used to accept data from `EventEngine::Endpoint`.
// Every time after a successful read from `EventEngine::Endpoint`, the data
// in this buffer should be appended to `read_buffer_`.
grpc_event_engine : : experimental : : SliceBuffer pending_read_buffer_ ;
// Used for store the result from `EventEngine::Endpoint::Read()`.
// `read_result_.has_value() == true` means the value has not been polled
// yet.
absl : : optional < absl : : Status > read_result_ ABSL_GUARDED_BY ( read_mutex_ ) ;
Waker read_waker_ ABSL_GUARDED_BY ( read_mutex_ ) ;
struct ReadState : public RefCounted < ReadState > {
std : : atomic < bool > complete { false } ;
// Read buffer used for storing successful reads given by
// `EventEngine::Endpoint` but not yet requested by the caller.
grpc_event_engine : : experimental : : SliceBuffer buffer ;
// Buffer used to accept data from `EventEngine::Endpoint`.
// Every time after a successful read from `EventEngine::Endpoint`, the data
// in this buffer should be appended to `buffer`.
grpc_event_engine : : experimental : : SliceBuffer pending_buffer ;
// Used for store the result from `EventEngine::Endpoint::Read()`.
absl : : Status result ;
Waker waker ;
// Backing endpoint: we keep this on ReadState as reads will need to
// repeatedly read until the target size is hit, and we don't want to access
// the main object during this dance (indeed the main object may be
// deleted).
std : : weak_ptr < grpc_event_engine : : experimental : : EventEngine : : Endpoint >
endpoint ;
void Complete ( absl : : Status status , size_t num_bytes_requested ) ;
} ;
struct WriteState : public RefCounted < WriteState > {
std : : atomic < bool > complete { false } ;
// Write buffer used for `EventEngine::Endpoint::Write()` to ensure the
// memory behind the buffer is not lost.
grpc_event_engine : : experimental : : SliceBuffer buffer ;
// Used for store the result from `EventEngine::Endpoint::Write()`.
absl : : Status result ;
Waker waker ;
void Complete ( absl : : Status status ) {
result = std : : move ( status ) ;
auto w = std : : move ( waker ) ;
complete . store ( true , std : : memory_order_release ) ;
w . Wakeup ( ) ;
}
} ;
// Callback function used for `EventEngine::Endpoint::Read()` shared between
// `Read()` and `ReadSlice()`.
void ReadCallback ( absl : : Status status , size_t num_bytes_requested ) ;
// Callback function used for `EventEngine::Endpoint::Read()` in `ReadByte()`.
void ReadByteCallback ( absl : : Status status ) ;
RefCountedPtr < WriteState > write_state_ = MakeRefCounted < WriteState > ( ) ;
RefCountedPtr < ReadState > read_state_ = MakeRefCounted < ReadState > ( ) ;
} ;
} // namespace grpc_core
# endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
# endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H