@ -23,6 +23,8 @@
# include "src/core/lib/gprpp/ref_counted.h"
# include "src/core/lib/gprpp/ref_counted_ptr.h"
# include "src/core/lib/promise/latch.h"
# include "src/core/lib/promise/map.h"
# include "src/core/lib/promise/promise.h"
# include "src/core/lib/promise/status_flag.h"
# include "src/core/lib/transport/call_final_info.h"
@ -771,6 +773,55 @@ struct AddOpImpl<
}
} ;
// PROMISE_RETURNING(absl::StatusOr<$VALUE_HANDLE>)
// $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
template < typename FilterType , typename T , typename R ,
R ( FilterType : : Call : : * impl ) ( T , FilterType * ) >
struct AddOpImpl < FilterType , T , R ( FilterType : : Call : : * ) ( T , FilterType * ) , impl ,
absl : : enable_if_t < std : : is_same < absl : : StatusOr < T > ,
PromiseResult < R > > : : value > > {
static void Add ( FilterType * channel_data , size_t call_offset ,
Layout < FallibleOperator < T > > & to ) {
class Promise {
public :
Promise ( T value , typename FilterType : : Call * call_data ,
FilterType * channel_data )
: impl_ ( ( call_data - > * impl ) ( std : : move ( value ) , channel_data ) ) { }
Poll < ResultOr < T > > PollOnce ( ) {
auto p = impl_ ( ) ;
auto * r = p . value_if_ready ( ) ;
if ( r = = nullptr ) return Pending { } ;
this - > ~ Promise ( ) ;
if ( r - > ok ( ) ) return ResultOr < T > { std : : move ( * * r ) , nullptr } ;
return ResultOr < T > { nullptr , ServerMetadataFromStatus ( r - > status ( ) ) } ;
}
private :
GPR_NO_UNIQUE_ADDRESS R impl_ ;
} ;
to . Add ( sizeof ( Promise ) , alignof ( Promise ) ,
FallibleOperator < T > {
channel_data ,
call_offset ,
[ ] ( void * promise_data , void * call_data , void * channel_data ,
T value ) - > Poll < ResultOr < T > > {
auto * promise = new ( promise_data )
Promise ( std : : move ( value ) ,
static_cast < typename FilterType : : Call * > ( call_data ) ,
static_cast < FilterType * > ( channel_data ) ) ;
return promise - > PollOnce ( ) ;
} ,
[ ] ( void * promise_data ) {
return static_cast < Promise * > ( promise_data ) - > PollOnce ( ) ;
} ,
[ ] ( void * promise_data ) {
static_cast < Promise * > ( promise_data ) - > ~ Promise ( ) ;
} ,
} ) ;
}
} ;
struct ChannelDataDestructor {
void ( * destroy ) ( void * channel_data ) ;
void * channel_data ;
@ -783,7 +834,7 @@ struct ChannelDataDestructor {
// in-flight calls.
struct StackData {
// Overall size and alignment of the call data for this stack.
size_t call_data_alignment = 0 ;
size_t call_data_alignment = 1 ;
size_t call_data_size = 0 ;
// A complete list of filters for this call, so that we can construct the
// call data for each filter.
@ -1104,14 +1155,25 @@ class PipeState {
void DropPush ( ) ;
// Poll for push completion: occurs after the corresponding Pull()
Poll < StatusFlag > PollPush ( ) ;
Poll < StatusFlag > PollPull ( ) ;
// Poll for pull completion; returns Failure{} if closed with error,
// true if a value is available, or false if the pipe was closed without
// error.
Poll < ValueOrFailure < bool > > PollPull ( ) ;
// A pulled value has been consumed: we can unblock the push
void AckPull ( ) ;
// A previously started pull operation has completed
void DropPull ( ) ;
// Close sending
void CloseSending ( ) ;
// Close sending with error
void CloseWithError ( ) ;
// Poll for closedness - if true, closed with error
Poll < bool > PollClosed ( ) ;
bool holds_error ( ) const { return state_ = = ValueState : : kError ; }
std : : string DebugString ( ) const ;
private :
enum class ValueState : uint8_t {
// Nothing sending nor receiving
@ -1248,6 +1310,44 @@ class CallFilters {
filters_detail : : StackData data_ ;
} ;
class NextMessage {
public :
NextMessage ( ) : has_value_ ( false ) , cancelled_ ( false ) { }
explicit NextMessage ( MessageHandle value )
: has_value_ ( true ) , value_ ( std : : move ( value ) ) { }
explicit NextMessage ( bool cancelled )
: has_value_ ( false ) , cancelled_ ( cancelled ) { }
NextMessage ( const NextMessage & ) = delete ;
NextMessage & operator = ( const NextMessage & ) = delete ;
NextMessage ( NextMessage & & other ) noexcept = default ;
NextMessage & operator = ( NextMessage & & other ) = default ;
using value_type = MessageHandle ;
void reset ( ) {
has_value_ = false ;
cancelled_ = false ;
value_ . reset ( ) ;
}
bool has_value ( ) const { return has_value_ ; }
const MessageHandle & value ( ) const {
GPR_DEBUG_ASSERT ( has_value_ ) ;
return value_ ;
}
MessageHandle & value ( ) {
GPR_DEBUG_ASSERT ( has_value_ ) ;
return value_ ;
}
const MessageHandle & operator * ( ) const { return value ( ) ; }
MessageHandle & operator * ( ) { return value ( ) ; }
bool cancelled ( ) const { return ! has_value_ & & cancelled_ ; }
private :
bool has_value_ ;
bool cancelled_ ;
MessageHandle value_ ;
} ;
explicit CallFilters ( ClientMetadataHandle client_initial_metadata ) ;
~ CallFilters ( ) ;
@ -1258,25 +1358,59 @@ class CallFilters {
void SetStack ( RefCountedPtr < Stack > stack ) ;
// Access client initial metadata before it's processed
ClientMetadata * unprocessed_client_initial_metadata ( ) {
return client_initial_metadata_ . get ( ) ;
}
// Client: Fetch client initial metadata
// Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata ( ) ;
// Server: Indicate that no server initial metadata will be sent
void NoServerInitialMetadata ( ) {
server_initial_metadata_state_ . CloseSending ( ) ;
}
// Server: Push server initial metadata
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushServerInitialMetadata ( ServerMetadataHandle md ) ;
// Client: Fetch server initial metadata
// Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle>
GRPC_MUST_USE_RESULT auto PullServerInitialMetadata ( ) ;
// Client: Push client to server message
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushClientToServerMessage ( MessageHandle message ) ;
// Client: Indicate that no more messages will be sent
void FinishClientToServerSends ( ) {
client_to_server_message_state_ . CloseSending ( ) ;
}
// Server: Fetch client to server message
// Returns a promise that resolves to ValueOrFailure<MessageHandle>
GRPC_MUST_USE_RESULT auto PullClientToServerMessage ( ) ;
// Server: Push server to client message
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushServerToClientMessage ( MessageHandle message ) ;
// Server: Fetch server to client message
// Returns a promise that resolves to ValueOrFailure<MessageHandle>
GRPC_MUST_USE_RESULT auto PullServerToClientMessage ( ) ;
void PushServerTrailingMetadata ( ServerMetadataHandle md ) {
GPR_ASSERT ( md ! = nullptr ) ;
if ( server_trailing_metadata_ ! = nullptr ) return ;
server_trailing_metadata_ = std : : move ( md ) ;
server_trailing_metadata_waiter_ . Wake ( ) ;
}
// Server: Indicate end of response
// Closes the request entirely - no messages can be sent/received
// If no server initial metadata has been sent, implies
// NoServerInitialMetadata() called.
void PushServerTrailingMetadata ( ServerMetadataHandle md ) ;
// Client: Fetch server trailing metadata
// Returns a promise that resolves to ServerMetadataHandle
GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata ( ) ;
// Server: Wait for server trailing metadata to have been sent
// Returns a promise that resolves to a StatusFlag indicating whether the
// request was cancelled or not -- failure to send trailing metadata is
// considered a cancellation, as is actual cancellation -- but not application
// errors.
GRPC_MUST_USE_RESULT auto WasCancelled ( ) ;
// Client & server: fill in final_info with the final status of the call.
void Finalize ( const grpc_call_final_info * final_info ) ;
std : : string DebugString ( ) const ;
private :
template < filters_detail : : PipeState ( CallFilters : : * state_ptr ) ,
void * ( CallFilters : : * push_ptr ) , typename T ,
@ -1315,6 +1449,10 @@ class CallFilters {
T TakeValue ( ) { return std : : move ( value_ ) ; }
absl : : string_view DebugString ( ) const {
return value_ ! = nullptr ? " (not pulled) " : " " ;
}
private :
filters_detail : : PipeState & state ( ) { return filters_ - > * state_ptr ; }
void * & push_slot ( ) { return filters_ - > * push_ptr ; }
@ -1323,24 +1461,36 @@ class CallFilters {
T value_ ;
} ;
class Pull {
static std : : string DebugString ( absl : : string_view name ,
const CallFilters * filters ) {
auto * push = static_cast < Push * > ( filters - > * push_ptr ) ;
return absl : : StrCat ( name , " : " , ( filters - > * state_ptr ) . DebugString ( ) ,
push = = nullptr ? " " : push - > DebugString ( ) ) ;
}
class PullMaybe {
public :
explicit Pull ( CallFilters * filters ) : filters_ ( filters ) { }
~ Pull ( ) {
explicit PullMaybe ( CallFilters * filters ) : filters_ ( filters ) { }
~ PullMaybe ( ) {
if ( filters_ ! = nullptr ) {
state ( ) . DropPull ( ) ;
}
}
Pull ( const Pull & ) = delete ;
Pull & operator = ( const Pull & ) = delete ;
Pull ( Pull & & other ) noexcept
PullMaybe ( const PullMaybe & ) = delete ;
PullMaybe & operator = ( const PullMaybe & ) = delete ;
PullMaybe ( PullMaybe & & other ) noexcept
: filters_ ( std : : exchange ( other . filters_ , nullptr ) ) ,
executor_ ( std : : move ( other . executor_ ) ) { }
Pull & operator = ( Pull & & ) = delete ;
PullMaybe & operator = ( PullMaybe & & ) = delete ;
Poll < ValueOrFailure < T > > operator ( ) ( ) {
Poll < ValueOrFailure < absl : : optional < T > > > operator ( ) ( ) {
if ( executor_ . IsRunning ( ) ) {
auto c = state ( ) . PollClosed ( ) ;
if ( c . ready ( ) & & c . value ( ) ) {
filters_ - > CancelDueToFailedPipeOperation ( ) ;
return Failure { } ;
}
return FinishOperationExecutor ( executor_ . Step ( filters_ - > call_data_ ) ) ;
}
auto p = state ( ) . PollPull ( ) ;
@ -1350,6 +1500,7 @@ class CallFilters {
filters_ - > CancelDueToFailedPipeOperation ( ) ;
return Failure { } ;
}
if ( ! * * r ) return absl : : nullopt ;
return FinishOperationExecutor ( executor_ . Start (
layout ( ) , push ( ) - > TakeValue ( ) , filters_ - > call_data_ ) ) ;
}
@ -1362,7 +1513,7 @@ class CallFilters {
return & ( filters_ - > stack_ - > data_ . * layout_ptr ) ;
}
Poll < ValueOrFailure < T > > FinishOperationExecutor (
Poll < ValueOrFailure < absl : : optional < T > > > FinishOperationExecutor (
Poll < filters_detail : : ResultOr < T > > p ) {
auto * r = p . value_if_ready ( ) ;
if ( r = = nullptr ) return Pending { } ;
@ -1376,6 +1527,66 @@ class CallFilters {
CallFilters * filters_ ;
filters_detail : : OperationExecutor < T > executor_ ;
} ;
class PullMessage {
public :
explicit PullMessage ( CallFilters * filters ) : filters_ ( filters ) { }
~ PullMessage ( ) {
if ( filters_ ! = nullptr ) {
state ( ) . DropPull ( ) ;
}
}
PullMessage ( const PullMessage & ) = delete ;
PullMessage & operator = ( const PullMessage & ) = delete ;
PullMessage ( PullMessage & & other ) noexcept
: filters_ ( std : : exchange ( other . filters_ , nullptr ) ) ,
executor_ ( std : : move ( other . executor_ ) ) { }
PullMessage & operator = ( PullMessage & & ) = delete ;
Poll < NextMessage > operator ( ) ( ) {
if ( executor_ . IsRunning ( ) ) {
auto c = state ( ) . PollClosed ( ) ;
if ( c . ready ( ) & & c . value ( ) ) {
filters_ - > CancelDueToFailedPipeOperation ( ) ;
return NextMessage ( true ) ;
}
return FinishOperationExecutor ( executor_ . Step ( filters_ - > call_data_ ) ) ;
}
auto p = state ( ) . PollPull ( ) ;
auto * r = p . value_if_ready ( ) ;
if ( r = = nullptr ) return Pending { } ;
if ( ! r - > ok ( ) ) {
filters_ - > CancelDueToFailedPipeOperation ( ) ;
return NextMessage ( true ) ;
}
if ( ! * * r ) return NextMessage ( false ) ;
return FinishOperationExecutor ( executor_ . Start (
layout ( ) , push ( ) - > TakeValue ( ) , filters_ - > call_data_ ) ) ;
}
private :
filters_detail : : PipeState & state ( ) { return filters_ - > * state_ptr ; }
Push * push ( ) { return static_cast < Push * > ( filters_ - > * push_ptr ) ; }
const filters_detail : : Layout < filters_detail : : FallibleOperator < T > > *
layout ( ) {
return & ( filters_ - > stack_ - > data_ . * layout_ptr ) ;
}
Poll < NextMessage > FinishOperationExecutor (
Poll < filters_detail : : ResultOr < T > > p ) {
auto * r = p . value_if_ready ( ) ;
if ( r = = nullptr ) return Pending { } ;
GPR_DEBUG_ASSERT ( ! executor_ . IsRunning ( ) ) ;
state ( ) . AckPull ( ) ;
if ( r - > ok ! = nullptr ) return NextMessage ( std : : move ( r - > ok ) ) ;
filters_ - > PushServerTrailingMetadata ( std : : move ( r - > error ) ) ;
return NextMessage ( true ) ;
}
CallFilters * filters_ ;
filters_detail : : OperationExecutor < T > executor_ ;
} ;
} ;
class PullClientInitialMetadataPromise {
@ -1400,7 +1611,12 @@ class CallFilters {
}
auto p = state ( ) . PollPull ( ) ;
auto * r = p . value_if_ready ( ) ;
gpr_log ( GPR_INFO , " %s " , r = = nullptr ? " PENDING " : r - > ToString ( ) . c_str ( ) ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_trace_promise_primitives ) ) {
gpr_log ( GPR_INFO , " %s " ,
r = = nullptr
? " PENDING "
: ( r - > ok ( ) ? ( r - > value ( ) ? " TRUE " : " FALSE " ) : " FAILURE " ) ) ;
}
if ( r = = nullptr ) return Pending { } ;
if ( ! r - > ok ( ) ) {
filters_ - > CancelDueToFailedPipeOperation ( ) ;
@ -1450,11 +1666,39 @@ class CallFilters {
Poll < ServerMetadataHandle > operator ( ) ( ) {
if ( executor_ . IsRunning ( ) ) {
return executor_ . Step ( filters_ - > call_data_ ) ;
auto r = executor_ . Step ( filters_ - > call_data_ ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_trace_promise_primitives ) ) {
if ( r . pending ( ) ) {
gpr_log ( GPR_INFO ,
" %s PullServerTrailingMetadata[%p]: Pending(but executing) " ,
GetContext < Activity > ( ) - > DebugTag ( ) . c_str ( ) , filters_ ) ;
} else {
gpr_log ( GPR_INFO , " %s PullServerTrailingMetadata[%p]: Ready: %s " ,
GetContext < Activity > ( ) - > DebugTag ( ) . c_str ( ) , filters_ ,
r . value ( ) - > DebugString ( ) . c_str ( ) ) ;
}
}
return r ;
}
if ( filters_ - > server_trailing_metadata_ = = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_trace_promise_primitives ) ) {
gpr_log ( GPR_INFO ,
" %s PullServerTrailingMetadata[%p]: Pending(not pushed) " ,
GetContext < Activity > ( ) - > DebugTag ( ) . c_str ( ) , filters_ ) ;
}
return filters_ - > server_trailing_metadata_waiter_ . pending ( ) ;
}
// If no stack has been set, we can just return the result of the call
if ( filters_ - > stack_ = = nullptr ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_trace_promise_primitives ) ) {
gpr_log ( GPR_INFO ,
" %s PullServerTrailingMetadata[%p]: Ready(no-stack): %s " ,
GetContext < Activity > ( ) - > DebugTag ( ) . c_str ( ) , filters_ ,
filters_ - > server_trailing_metadata_ - > DebugString ( ) . c_str ( ) ) ;
}
return std : : move ( filters_ - > server_trailing_metadata_ ) ;
}
// Otherwise we need to process it through all the filters.
return executor_ . Start ( & filters_ - > stack_ - > data_ . server_trailing_metadata ,
std : : move ( filters_ - > server_trailing_metadata_ ) ,
filters_ - > call_data_ ) ;
@ -1465,7 +1709,7 @@ class CallFilters {
filters_detail : : InfallibleOperationExecutor < ServerMetadataHandle > executor_ ;
} ;
void CancelDueToFailedPipeOperation ( ) ;
void CancelDueToFailedPipeOperation ( SourceLocation but_where = { } ) ;
RefCountedPtr < Stack > stack_ ;
@ -1475,6 +1719,7 @@ class CallFilters {
filters_detail : : PipeState client_to_server_message_state_ ;
filters_detail : : PipeState server_to_client_message_state_ ;
IntraActivityWaiter server_trailing_metadata_waiter_ ;
Latch < bool > cancelled_ ;
void * call_data_ ;
ClientMetadataHandle client_initial_metadata_ ;
@ -1516,7 +1761,7 @@ inline auto CallFilters::PushServerInitialMetadata(ServerMetadataHandle md) {
}
inline auto CallFilters : : PullServerInitialMetadata ( ) {
return ServerInitialMetadataPromises : : Pull { this } ;
return ServerInitialMetadataPromises : : PullMaybe { this } ;
}
inline auto CallFilters : : PushClientToServerMessage ( MessageHandle message ) {
@ -1526,7 +1771,7 @@ inline auto CallFilters::PushClientToServerMessage(MessageHandle message) {
}
inline auto CallFilters : : PullClientToServerMessage ( ) {
return ClientToServerMessagePromises : : Pull { this } ;
return ClientToServerMessagePromises : : PullMessage { this } ;
}
inline auto CallFilters : : PushServerToClientMessage ( MessageHandle message ) {
@ -1536,13 +1781,19 @@ inline auto CallFilters::PushServerToClientMessage(MessageHandle message) {
}
inline auto CallFilters : : PullServerToClientMessage ( ) {
return ServerToClientMessagePromises : : Pull { this } ;
return ServerToClientMessagePromises : : PullMessage { this } ;
}
inline auto CallFilters : : PullServerTrailingMetadata ( ) {
return PullServerTrailingMetadataPromise ( this ) ;
return Map ( PullServerTrailingMetadataPromise ( this ) ,
[ this ] ( ServerMetadataHandle h ) {
cancelled_ . Set ( h - > get ( GrpcCallWasCancelled ( ) ) . value_or ( false ) ) ;
return h ;
} ) ;
}
inline auto CallFilters : : WasCancelled ( ) { return cancelled_ . Wait ( ) ; }
} // namespace grpc_core
# endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H