@ -345,15 +345,77 @@ inline auto RunCall(void (Derived::Call::*fn)(ClientMetadata& md,
}
inline void InterceptClientToServerMessage ( const NoInterceptor * , void * ,
CallArgs & ) { }
const CallArgs & ) { }
inline void InterceptClientToServerMessage ( const NoInterceptor * , void * , void * ,
CallSpineInterface * ) { }
inline void InterceptClientInitialMetadata ( const NoInterceptor * , void * , void * ,
CallSpineInterface * ) { }
template < typename Derived >
inline void InterceptClientInitialMetadata (
void ( Derived : : Call : : * fn ) ( ClientMetadata & md ) , typename Derived : : Call * call ,
Derived * , CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnClientInitialMetadata ) ;
call_spine - > client_initial_metadata ( ) . receiver . InterceptAndMap (
[ call ] ( ClientMetadataHandle md ) {
call - > OnClientInitialMetadata ( * md ) ;
return md ;
} ) ;
}
template < typename Derived >
inline void InterceptClientInitialMetadata (
void ( Derived : : Call : : * fn ) ( ClientMetadata & md , Derived * channel ) ,
typename Derived : : Call * call , Derived * channel ,
CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnClientInitialMetadata ) ;
call_spine - > client_initial_metadata ( ) . receiver . InterceptAndMap (
[ call , channel ] ( ClientMetadataHandle md ) {
call - > OnClientInitialMetadata ( * md , channel ) ;
return md ;
} ) ;
}
template < typename Derived >
inline void InterceptClientInitialMetadata (
ServerMetadataHandle ( Derived : : Call : : * fn ) ( ClientMetadata & md ) ,
typename Derived : : Call * call , Derived * , CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnClientInitialMetadata ) ;
call_spine - > client_initial_metadata ( ) . receiver . InterceptAndMap (
[ call_spine ,
call ] ( ClientMetadataHandle md ) - > absl : : optional < ClientMetadataHandle > {
auto return_md = call - > OnClientInitialMetadata ( * md ) ;
if ( return_md = = nullptr ) return std : : move ( md ) ;
return call_spine - > Cancel ( std : : move ( return_md ) ) ;
} ) ;
}
template < typename Derived >
inline void InterceptClientInitialMetadata (
ServerMetadataHandle ( Derived : : Call : : * fn ) ( ClientMetadata & md ,
Derived * channel ) ,
typename Derived : : Call * call , Derived * channel ,
CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnClientInitialMetadata ) ;
call_spine - > client_initial_metadata ( ) . receiver . InterceptAndMap (
[ call_spine , call , channel ] (
ClientMetadataHandle md ) - > absl : : optional < ClientMetadataHandle > {
auto return_md = call - > OnClientInitialMetadata ( * md , channel ) ;
if ( return_md = = nullptr ) return std : : move ( md ) ;
return call_spine - > Cancel ( std : : move ( return_md ) ) ;
} ) ;
}
template < typename CallArgs >
inline void InterceptServerInitialMetadata ( const NoInterceptor * , void * ,
CallArgs & ) { }
const CallArgs & ) { }
template < typename Derived >
inline void InterceptServerInitialMetadata (
void ( Derived : : Call : : * fn ) ( ServerMetadata & ) ,
FilterCallData < Derived > * call_data , CallArgs & call_args ) {
FilterCallData < Derived > * call_data , const CallArgs & call_args ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnServerInitialMetadata ) ;
call_args . server_initial_metadata - > InterceptAndMap (
[ call_data ] ( ServerMetadataHandle md ) {
@ -365,7 +427,7 @@ inline void InterceptServerInitialMetadata(
template < typename Derived >
inline void InterceptServerInitialMetadata (
absl : : Status ( Derived : : Call : : * fn ) ( ServerMetadata & ) ,
FilterCallData < Derived > * call_data , CallArgs & call_args ) {
FilterCallData < Derived > * call_data , const CallArgs & call_args ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnServerInitialMetadata ) ;
call_args . server_initial_metadata - > InterceptAndMap (
[ call_data ] (
@ -379,8 +441,69 @@ inline void InterceptServerInitialMetadata(
} ) ;
}
template < typename CallArgs >
inline void InterceptServerInitialMetadata ( const NoInterceptor * , void * , void * ,
CallSpineInterface * ) { }
template < typename Derived >
inline void InterceptServerInitialMetadata (
void ( Derived : : Call : : * fn ) ( ServerMetadata & ) , typename Derived : : Call * call ,
Derived * , CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnServerInitialMetadata ) ;
call_spine - > server_initial_metadata ( ) . sender . InterceptAndMap (
[ call ] ( ServerMetadataHandle md ) {
call - > OnServerInitialMetadata ( * md ) ;
return md ;
} ) ;
}
template < typename Derived >
inline void InterceptServerInitialMetadata (
absl : : Status ( Derived : : Call : : * fn ) ( ServerMetadata & ) ,
typename Derived : : Call * call , Derived * , CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnServerInitialMetadata ) ;
call_spine - > server_initial_metadata ( ) . sender . InterceptAndMap (
[ call , call_spine ] (
ServerMetadataHandle md ) - > absl : : optional < ServerMetadataHandle > {
auto status = call - > OnServerInitialMetadata ( * md ) ;
if ( status . ok ( ) ) return std : : move ( md ) ;
return call_spine - > Cancel ( ServerMetadataFromStatus ( status ) ) ;
} ) ;
}
inline void InterceptServerToClientMessage ( const NoInterceptor * , void * ,
CallArgs & ) { }
const CallArgs & ) { }
inline void InterceptServerToClientMessage ( const NoInterceptor * , void * , void * ,
CallSpineInterface * ) { }
inline void InterceptServerTrailingMetadata ( const NoInterceptor * , void * , void * ,
CallSpineInterface * ) { }
template < typename Derived >
inline void InterceptServerTrailingMetadata (
void ( Derived : : Call : : * fn ) ( ServerMetadata & ) , typename Derived : : Call * call ,
Derived * , CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnServerTrailingMetadata ) ;
call_spine - > server_trailing_metadata ( ) . sender . InterceptAndMap (
[ call ] ( ServerMetadataHandle md ) {
call - > OnServerTrailingMetadata ( * md ) ;
return md ;
} ) ;
}
template < typename Derived >
inline void InterceptServerTrailingMetadata (
absl : : Status ( Derived : : Call : : * fn ) ( ServerMetadata & ) ,
typename Derived : : Call * call , Derived * , CallSpineInterface * call_spine ) {
GPR_DEBUG_ASSERT ( fn = = & Derived : : Call : : OnServerTrailingMetadata ) ;
call_spine - > server_trailing_metadata ( ) . sender . InterceptAndMap (
[ call ] ( ServerMetadataHandle md ) - > absl : : optional < ServerMetadataHandle > {
auto status = call - > OnServerTrailingMetadata ( * md ) ;
if ( status . ok ( ) ) return std : : move ( md ) ;
return ServerMetadataFromStatus ( status ) ;
} ) ;
}
template < typename Derived >
absl : : enable_if_t < std : : is_empty < FilterCallData < Derived > > : : value ,
@ -449,6 +572,29 @@ MakeFilterCall(Derived* derived) {
template < typename Derived >
class ImplementChannelFilter : public ChannelFilter {
public :
// Natively construct a v3 call.
void InitCall ( CallSpineInterface * call_spine ) {
auto * call = GetContext < Arena > ( ) - > ManagedNew < typename Derived : : Call > ( ) ;
promise_filter_detail : : InterceptClientInitialMetadata (
& Derived : : Call : : OnClientInitialMetadata , call ,
static_cast < Derived * > ( this ) , call_spine ) ;
promise_filter_detail : : InterceptClientToServerMessage (
& Derived : : Call : : OnClientToServerMessage , call ,
static_cast < Derived * > ( this ) , call_spine ) ;
promise_filter_detail : : InterceptServerInitialMetadata (
& Derived : : Call : : OnServerInitialMetadata , call ,
static_cast < Derived * > ( this ) , call_spine ) ;
promise_filter_detail : : InterceptServerToClientMessage (
& Derived : : Call : : OnServerToClientMessage , call ,
static_cast < Derived * > ( this ) , call_spine ) ;
promise_filter_detail : : InterceptServerTrailingMetadata (
& Derived : : Call : : OnServerTrailingMetadata , call ,
static_cast < Derived * > ( this ) , call_spine ) ;
}
// Polyfill for the original promise scheme.
// Allows writing v3 filters that work with v2 stacks.
// (and consequently also v1 stacks since we can polyfill back to that too).
ArenaPromise < ServerMetadataHandle > MakeCallPromise (
CallArgs call_args , NextPromiseFactory next_promise_factory ) final {
auto * call = promise_filter_detail : : MakeFilterCall < Derived > (
@ -1262,7 +1408,49 @@ struct ChannelFilterWithFlagsMethods {
// ChannelArgs channel_args, ChannelFilter::Args filter_args);
// };
template < typename F , FilterEndpoint kEndpoint , uint8_t kFlags = 0 >
absl : : enable_if_t < std : : is_base_of < ChannelFilter , F > : : value , grpc_channel_filter >
absl : : enable_if_t < std : : is_base_of < ChannelFilter , F > : : value & &
! std : : is_base_of < ImplementChannelFilter < F > , F > : : value ,
grpc_channel_filter >
MakePromiseBasedFilter ( const char * name ) {
using CallData = promise_filter_detail : : CallData < kEndpoint > ;
return grpc_channel_filter {
// start_transport_stream_op_batch
promise_filter_detail : : BaseCallDataMethods : : StartTransportStreamOpBatch ,
// make_call_promise
promise_filter_detail : : ChannelFilterMethods : : MakeCallPromise ,
nullptr ,
// start_transport_op
promise_filter_detail : : ChannelFilterMethods : : StartTransportOp ,
// sizeof_call_data
sizeof ( CallData ) ,
// init_call_elem
promise_filter_detail : : CallDataFilterWithFlagsMethods <
CallData , kFlags > : : InitCallElem ,
// set_pollset_or_pollset_set
promise_filter_detail : : BaseCallDataMethods : : SetPollsetOrPollsetSet ,
// destroy_call_elem
promise_filter_detail : : CallDataFilterWithFlagsMethods <
CallData , kFlags > : : DestroyCallElem ,
// sizeof_channel_data
sizeof ( F ) ,
// init_channel_elem
promise_filter_detail : : ChannelFilterWithFlagsMethods <
F , kFlags > : : InitChannelElem ,
// post_init_channel_elem
promise_filter_detail : : ChannelFilterMethods : : PostInitChannelElem ,
// destroy_channel_elem
promise_filter_detail : : ChannelFilterMethods : : DestroyChannelElem ,
// get_channel_info
promise_filter_detail : : ChannelFilterMethods : : GetChannelInfo ,
// name
name ,
} ;
}
template < typename F , FilterEndpoint kEndpoint , uint8_t kFlags = 0 >
absl : : enable_if_t < std : : is_base_of < ImplementChannelFilter < F > , F > : : value ,
grpc_channel_filter >
MakePromiseBasedFilter ( const char * name ) {
using CallData = promise_filter_detail : : CallData < kEndpoint > ;
@ -1271,6 +1459,9 @@ MakePromiseBasedFilter(const char* name) {
promise_filter_detail : : BaseCallDataMethods : : StartTransportStreamOpBatch ,
// make_call_promise
promise_filter_detail : : ChannelFilterMethods : : MakeCallPromise ,
[ ] ( grpc_channel_element * elem , CallSpineInterface * args ) {
static_cast < F * > ( elem - > channel_data ) - > InitCall ( args ) ;
} ,
// start_transport_op
promise_filter_detail : : ChannelFilterMethods : : StartTransportOp ,
// sizeof_call_data