@ -67,6 +67,7 @@
# include "src/core/lib/promise/poll.h"
# include "src/core/lib/promise/promise.h"
# include "src/core/lib/promise/seq.h"
# include "src/core/lib/promise/try_join.h"
# include "src/core/lib/promise/try_seq.h"
# include "src/core/lib/slice/slice_buffer.h"
# include "src/core/lib/slice/slice_internal.h"
@ -75,6 +76,7 @@
# include "src/core/lib/surface/channel.h"
# include "src/core/lib/surface/channel_stack_type.h"
# include "src/core/lib/surface/completion_queue.h"
# include "src/core/lib/surface/wait_for_cq_end_op.h"
# include "src/core/lib/transport/connectivity_state.h"
# include "src/core/lib/transport/error_utils.h"
@ -82,57 +84,6 @@ namespace grpc_core {
TraceFlag grpc_server_channel_trace ( false , " server_channel " ) ;
//
// Server::RequestedCall
//
struct Server : : RequestedCall {
enum class Type { BATCH_CALL , REGISTERED_CALL } ;
RequestedCall ( void * tag_arg , grpc_completion_queue * call_cq ,
grpc_call * * call_arg , grpc_metadata_array * initial_md ,
grpc_call_details * details )
: type ( Type : : BATCH_CALL ) ,
tag ( tag_arg ) ,
cq_bound_to_call ( call_cq ) ,
call ( call_arg ) ,
initial_metadata ( initial_md ) {
data . batch . details = details ;
}
RequestedCall ( void * tag_arg , grpc_completion_queue * call_cq ,
grpc_call * * call_arg , grpc_metadata_array * initial_md ,
RegisteredMethod * rm , gpr_timespec * deadline ,
grpc_byte_buffer * * optional_payload )
: type ( Type : : REGISTERED_CALL ) ,
tag ( tag_arg ) ,
cq_bound_to_call ( call_cq ) ,
call ( call_arg ) ,
initial_metadata ( initial_md ) {
data . registered . method = rm ;
data . registered . deadline = deadline ;
data . registered . optional_payload = optional_payload ;
}
MultiProducerSingleConsumerQueue : : Node mpscq_node ;
const Type type ;
void * const tag ;
grpc_completion_queue * const cq_bound_to_call ;
grpc_call * * const call ;
grpc_cq_completion completion ;
grpc_metadata_array * const initial_metadata ;
union {
struct {
grpc_call_details * details ;
} batch ;
struct {
RegisteredMethod * method ;
gpr_timespec * deadline ;
grpc_byte_buffer * * optional_payload ;
} registered ;
} data ;
} ;
//
// Server::RegisteredMethod
//
@ -248,6 +199,87 @@ class Server::RequestMatcherInterface {
virtual Server * server ( ) const = 0 ;
} ;
//
// Server::RequestedCall
//
struct Server : : RequestedCall {
enum class Type { BATCH_CALL , REGISTERED_CALL } ;
RequestedCall ( void * tag_arg , grpc_completion_queue * call_cq ,
grpc_call * * call_arg , grpc_metadata_array * initial_md ,
grpc_call_details * details )
: type ( Type : : BATCH_CALL ) ,
tag ( tag_arg ) ,
cq_bound_to_call ( call_cq ) ,
call ( call_arg ) ,
initial_metadata ( initial_md ) {
data . batch . details = details ;
}
RequestedCall ( void * tag_arg , grpc_completion_queue * call_cq ,
grpc_call * * call_arg , grpc_metadata_array * initial_md ,
RegisteredMethod * rm , gpr_timespec * deadline ,
grpc_byte_buffer * * optional_payload )
: type ( Type : : REGISTERED_CALL ) ,
tag ( tag_arg ) ,
cq_bound_to_call ( call_cq ) ,
call ( call_arg ) ,
initial_metadata ( initial_md ) {
data . registered . method = rm ;
data . registered . deadline = deadline ;
data . registered . optional_payload = optional_payload ;
}
void Complete ( NextResult < MessageHandle > payload , ClientMetadata & md ) {
Timestamp deadline = GetContext < CallContext > ( ) - > deadline ( ) ;
switch ( type ) {
case RequestedCall : : Type : : BATCH_CALL :
GPR_ASSERT ( ! payload . has_value ( ) ) ;
data . batch . details - > host =
CSliceRef ( md . get_pointer ( HttpAuthorityMetadata ( ) ) - > c_slice ( ) ) ;
data . batch . details - > method =
CSliceRef ( md . Take ( HttpPathMetadata ( ) ) - > c_slice ( ) ) ;
data . batch . details - > deadline =
deadline . as_timespec ( GPR_CLOCK_MONOTONIC ) ;
break ;
case RequestedCall : : Type : : REGISTERED_CALL :
md . Remove ( HttpPathMetadata ( ) ) ;
* data . registered . deadline = deadline . as_timespec ( GPR_CLOCK_MONOTONIC ) ;
if ( data . registered . optional_payload ! = nullptr ) {
if ( payload . has_value ( ) ) {
auto * sb = payload . value ( ) - > payload ( ) - > c_slice_buffer ( ) ;
* data . registered . optional_payload =
grpc_raw_byte_buffer_create ( sb - > slices , sb - > count ) ;
} else {
* data . registered . optional_payload = nullptr ;
}
}
break ;
default :
GPR_UNREACHABLE_CODE ( abort ( ) ) ;
}
}
MultiProducerSingleConsumerQueue : : Node mpscq_node ;
const Type type ;
void * const tag ;
grpc_completion_queue * const cq_bound_to_call ;
grpc_call * * const call ;
grpc_cq_completion completion ;
grpc_metadata_array * const initial_metadata ;
union {
struct {
grpc_call_details * details ;
} batch ;
struct {
RegisteredMethod * method ;
gpr_timespec * deadline ;
grpc_byte_buffer * * optional_payload ;
} registered ;
} data ;
} ;
// The RealRequestMatcher is an implementation of RequestMatcherInterface that
// actually uses all the features of RequestMatcherInterface: expecting the
// application to explicitly request RPCs and then matching those to incoming
@ -757,7 +789,9 @@ class ChannelBroadcaster {
const grpc_channel_filter Server : : kServerTopFilter = {
Server : : CallData : : StartTransportStreamOpBatch ,
Server : : ChannelData : : MakeCallPromise ,
/* init_call: */ nullptr ,
[ ] ( grpc_channel_element * , CallSpineInterface * ) {
// TODO(ctiller): remove the server filter when call-v3 is finalized
} ,
grpc_channel_next_op ,
sizeof ( Server : : CallData ) ,
Server : : CallData : : InitCallElement ,
@ -1275,16 +1309,31 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
}
// Start accept_stream transport op.
grpc_transport_op * op = grpc_make_transport_op ( nullptr ) ;
int accept_stream_types = 0 ;
if ( transport - > filter_stack_transport ( ) ! = nullptr ) {
+ + accept_stream_types ;
op - > set_accept_stream = true ;
op - > set_accept_stream_fn = AcceptStream ;
if ( IsRegisteredMethodLookupInTransportEnabled ( ) ) {
op - > set_registered_method_matcher_fn = [ ] ( void * arg ,
ClientMetadata * metadata ) {
static_cast < ChannelData * > ( arg ) - > SetRegisteredMethodOnMetadata ( * metadata ) ;
static_cast < ChannelData * > ( arg ) - > SetRegisteredMethodOnMetadata (
* metadata ) ;
} ;
}
// op->set_registered_method_matcher_fn = Registered
op - > set_accept_stream_user_data = this ;
}
if ( transport - > server_transport ( ) ! = nullptr ) {
+ + accept_stream_types ;
transport - > server_transport ( ) - > SetAcceptFunction (
[ this ] ( ClientMetadata & metadata ) {
SetRegisteredMethodOnMetadata ( metadata ) ;
auto call = MakeServerCall ( server_ . get ( ) , channel_ . get ( ) ) ;
InitCall ( call ) ;
return CallInitiator ( std : : move ( call ) ) ;
} ) ;
}
GPR_ASSERT ( accept_stream_types = = 1 ) ;
op - > start_connectivity_watch = MakeOrphanable < ConnectivityWatcher > ( this ) ;
if ( server_ - > ShutdownCalled ( ) ) {
op - > disconnect_with_error = GRPC_ERROR_CREATE ( " Server shutdown " ) ;
@ -1368,17 +1417,89 @@ auto CancelledDueToServerShutdown() {
}
} // namespace
void Server : : ChannelData : : InitCall ( RefCountedPtr < CallSpineInterface > call ) {
call - > SpawnGuarded ( " request_matcher " , [ this , call ] ( ) {
return TrySeq (
// Wait for initial metadata to pass through all filters
Map ( call - > client_initial_metadata ( ) . receiver . Next ( ) ,
[ ] ( NextResult < ClientMetadataHandle > md )
- > absl : : StatusOr < ClientMetadataHandle > {
if ( ! md . has_value ( ) ) {
return absl : : InternalError ( " Missing metadata " ) ;
}
if ( ! md . value ( ) - > get_pointer ( HttpPathMetadata ( ) ) ) {
return absl : : InternalError ( " Missing :path header " ) ;
}
if ( ! md . value ( ) - > get_pointer ( HttpAuthorityMetadata ( ) ) ) {
return absl : : InternalError ( " Missing :authority header " ) ;
}
return std : : move ( * md ) ;
} ) ,
// Match request with requested call
[ this , call ] ( ClientMetadataHandle md ) {
auto * registered_method = static_cast < RegisteredMethod * > (
md - > get ( GrpcRegisteredMethod ( ) ) . value_or ( nullptr ) ) ;
RequestMatcherInterface * rm ;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE ;
if ( registered_method = = nullptr ) {
rm = server_ - > unregistered_request_matcher_ . get ( ) ;
} else {
rm = registered_method - > matcher . get ( ) ;
}
auto maybe_read_first_message = If (
payload_handling = = GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER ,
[ call ] ( ) {
return call - > client_to_server_messages ( ) . receiver . Next ( ) ;
} ,
[ ] ( ) - > NextResult < MessageHandle > {
return NextResult < MessageHandle > ( ) ;
} ) ;
return TryJoin < absl : : StatusOr > (
Map ( std : : move ( maybe_read_first_message ) ,
[ ] ( NextResult < MessageHandle > n ) {
return ValueOrFailure < NextResult < MessageHandle > > {
std : : move ( n ) } ;
} ) ,
rm - > MatchRequest ( cq_idx ( ) ) , [ md = std : : move ( md ) ] ( ) mutable {
return ValueOrFailure < ClientMetadataHandle > ( std : : move ( md ) ) ;
} ) ;
} ,
// Publish call to cq
[ ] ( std : : tuple < NextResult < MessageHandle > ,
RequestMatcherInterface : : MatchResult ,
ClientMetadataHandle >
r ) {
RequestMatcherInterface : : MatchResult & mr = std : : get < 1 > ( r ) ;
auto md = std : : move ( std : : get < 2 > ( r ) ) ;
auto * rc = mr . TakeCall ( ) ;
rc - > Complete ( std : : move ( std : : get < 0 > ( r ) ) , * md ) ;
auto * call_context = GetContext < CallContext > ( ) ;
* rc - > call = call_context - > c_call ( ) ;
grpc_call_set_completion_queue ( call_context - > c_call ( ) ,
rc - > cq_bound_to_call ) ;
call_context - > server_call_context ( ) - > PublishInitialMetadata (
std : : move ( md ) , rc - > initial_metadata ) ;
// TODO(ctiller): publish metadata
return Map ( WaitForCqEndOp ( false , rc - > tag , absl : : OkStatus ( ) , mr . cq ( ) ) ,
[ rc = std : : unique_ptr < RequestedCall > ( rc ) ] ( Empty ) {
return absl : : OkStatus ( ) ;
} ) ;
} ) ;
} ) ;
}
ArenaPromise < ServerMetadataHandle > Server : : ChannelData : : MakeCallPromise (
grpc_channel_element * elem , CallArgs call_args , NextPromiseFactory ) {
auto * chand = static_cast < Server : : ChannelData * > ( elem - > channel_data ) ;
auto * server = chand - > server_ . get ( ) ;
absl : : optional < Slice > path =
call_args . client_initial_metadata - > Take ( HttpPathMetadata ( ) ) ;
if ( server - > ShutdownCalled ( ) ) return CancelledDueToServerShutdown ( ) ;
auto cleanup_ref =
absl : : MakeCleanup ( [ server ] { server - > ShutdownUnrefOnRequest ( ) ; } ) ;
if ( ! server - > ShutdownRefOnRequest ( ) ) return CancelledDueToServerShutdown ( ) ;
if ( ! path . has_value ( ) ) {
auto path_ptr =
call_args . client_initial_metadata - > get_pointer ( HttpPathMetadata ( ) ) ;
if ( path_ptr = = nullptr ) {
return [ ] {
return ServerMetadataFromStatus (
absl : : InternalError ( " Missing :path header " ) ) ;
@ -1392,7 +1513,6 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
absl : : InternalError ( " Missing :authority header " ) ) ;
} ;
}
Timestamp deadline = GetContext < CallContext > ( ) - > deadline ( ) ;
// Find request matcher.
RequestMatcherInterface * matcher ;
RegisteredMethod * rm = nullptr ;
@ -1402,7 +1522,7 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
. value_or ( nullptr ) ) ;
} else {
rm = chand - > GetRegisteredMethod ( host_ptr - > as_string_view ( ) ,
path - > as_string_view ( ) ) ;
path_ptr - > as_string_view ( ) ) ;
}
ArenaPromise < absl : : StatusOr < NextResult < MessageHandle > > >
maybe_read_first_message ( [ ] { return NextResult < MessageHandle > ( ) ; } ) ;
@ -1439,8 +1559,7 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
return std : : make_pair ( std : : move ( * mr ) , std : : move ( payload ) ) ;
} ) ;
} ,
[ host_ptr , path = std : : move ( path ) , deadline ,
call_args =
[ call_args =
std : : move ( call_args ) ] ( std : : pair < RequestMatcherInterface : : MatchResult ,
NextResult < MessageHandle > >
r ) mutable {
@ -1448,35 +1567,13 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
auto & payload = r . second ;
auto * rc = mr . TakeCall ( ) ;
auto * cq_for_new_request = mr . cq ( ) ;
switch ( rc - > type ) {
case RequestedCall : : Type : : BATCH_CALL :
GPR_ASSERT ( ! payload . has_value ( ) ) ;
rc - > data . batch . details - > host = CSliceRef ( host_ptr - > c_slice ( ) ) ;
rc - > data . batch . details - > method = CSliceRef ( path - > c_slice ( ) ) ;
rc - > data . batch . details - > deadline =
deadline . as_timespec ( GPR_CLOCK_MONOTONIC ) ;
break ;
case RequestedCall : : Type : : REGISTERED_CALL :
* rc - > data . registered . deadline =
deadline . as_timespec ( GPR_CLOCK_MONOTONIC ) ;
if ( rc - > data . registered . optional_payload ! = nullptr ) {
if ( payload . has_value ( ) ) {
auto * sb = payload . value ( ) - > payload ( ) - > c_slice_buffer ( ) ;
* rc - > data . registered . optional_payload =
grpc_raw_byte_buffer_create ( sb - > slices , sb - > count ) ;
} else {
* rc - > data . registered . optional_payload = nullptr ;
}
}
break ;
default :
GPR_UNREACHABLE_CODE ( abort ( ) ) ;
}
return GetContext < CallContext > ( )
- > server_call_context ( )
- > MakeTopOfServerCallPromise (
auto * server_call_context =
GetContext < CallContext > ( ) - > server_call_context ( ) ;
rc - > Complete ( std : : move ( payload ) , * call_args . client_initial_metadata ) ;
server_call_context - > PublishInitialMetadata (
std : : move ( call_args . client_initial_metadata ) , rc - > initial_metadata ) ;
return server_call_context - > MakeTopOfServerCallPromise (
std : : move ( call_args ) , rc - > cq_bound_to_call ,
rc - > initial_metadata ,
[ rc , cq_for_new_request ] ( grpc_call * call ) {
* rc - > call = call ;
grpc_cq_end_op ( cq_for_new_request , rc - > tag , absl : : OkStatus ( ) ,