@ -19,7 +19,9 @@
# ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
# define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
# include <atomic>
# include <functional>
# include <type_traits>
# include <grpcpp/impl/codegen/call.h>
# include <grpcpp/impl/codegen/call_op_set.h>
@ -32,19 +34,33 @@
namespace grpc {
// forward declarations
// Declare base class of all reactors as internal
namespace internal {
template < class ServiceType , class RequestType , class ResponseType >
class CallbackUnaryHandler ;
class ServerReactor {
public :
virtual ~ ServerReactor ( ) = default ;
virtual void OnDone ( ) { }
virtual void OnCancel ( ) { }
} ;
} // namespace internal
namespace experimental {
// Forward declarations
template < class Request , class Response >
class ServerReadReactor ;
template < class Request , class Response >
class ServerWriteReactor ;
template < class Request , class Response >
class ServerBidiReactor ;
// For unary RPCs, the exposed controller class is only an interface
// and the actual implementation is an internal class.
class ServerCallbackRpcController {
public :
virtual ~ ServerCallbackRpcController ( ) { }
virtual ~ ServerCallbackRpcController ( ) = default ;
// The method handler must call this function when it is done so that
// the library knows to free its resources
@ -55,18 +71,193 @@ class ServerCallbackRpcController {
virtual void SendInitialMetadata ( std : : function < void ( bool ) > ) = 0 ;
} ;
// NOTE: The actual streaming object classes are provided
// as API only to support mocking. There are no implementations of
// these class interfaces in the API.
template < class Request >
class ServerCallbackReader {
public :
virtual ~ ServerCallbackReader ( ) { }
virtual void Finish ( Status s ) = 0 ;
virtual void SendInitialMetadata ( ) = 0 ;
virtual void Read ( Request * msg ) = 0 ;
protected :
template < class Response >
void BindReactor ( ServerReadReactor < Request , Response > * reactor ) {
reactor - > BindReader ( this ) ;
}
} ;
template < class Response >
class ServerCallbackWriter {
public :
virtual ~ ServerCallbackWriter ( ) { }
virtual void Finish ( Status s ) = 0 ;
virtual void SendInitialMetadata ( ) = 0 ;
virtual void Write ( const Response * msg , WriteOptions options ) = 0 ;
virtual void WriteAndFinish ( const Response * msg , WriteOptions options ,
Status s ) {
// Default implementation that can/should be overridden
Write ( msg , std : : move ( options ) ) ;
Finish ( std : : move ( s ) ) ;
} ;
protected :
template < class Request >
void BindReactor ( ServerWriteReactor < Request , Response > * reactor ) {
reactor - > BindWriter ( this ) ;
}
} ;
template < class Request , class Response >
class ServerCallbackReaderWriter {
public :
virtual ~ ServerCallbackReaderWriter ( ) { }
virtual void Finish ( Status s ) = 0 ;
virtual void SendInitialMetadata ( ) = 0 ;
virtual void Read ( Request * msg ) = 0 ;
virtual void Write ( const Response * msg , WriteOptions options ) = 0 ;
virtual void WriteAndFinish ( const Response * msg , WriteOptions options ,
Status s ) {
// Default implementation that can/should be overridden
Write ( msg , std : : move ( options ) ) ;
Finish ( std : : move ( s ) ) ;
} ;
protected :
void BindReactor ( ServerBidiReactor < Request , Response > * reactor ) {
reactor - > BindStream ( this ) ;
}
} ;
// The following classes are reactors that are to be implemented
// by the user, returned as the result of the method handler for
// a callback method, and activated by the call to OnStarted
template < class Request , class Response >
class ServerBidiReactor : public internal : : ServerReactor {
public :
~ ServerBidiReactor ( ) = default ;
virtual void OnStarted ( ServerContext * ) { }
virtual void OnSendInitialMetadataDone ( bool ok ) { }
virtual void OnReadDone ( bool ok ) { }
virtual void OnWriteDone ( bool ok ) { }
void StartSendInitialMetadata ( ) { stream_ - > SendInitialMetadata ( ) ; }
void StartRead ( Request * msg ) { stream_ - > Read ( msg ) ; }
void StartWrite ( const Response * msg ) { StartWrite ( msg , WriteOptions ( ) ) ; }
void StartWrite ( const Response * msg , WriteOptions options ) {
stream_ - > Write ( msg , std : : move ( options ) ) ;
}
void StartWriteAndFinish ( const Response * msg , WriteOptions options ,
Status s ) {
stream_ - > WriteAndFinish ( msg , std : : move ( options ) , std : : move ( s ) ) ;
}
void StartWriteLast ( const Response * msg , WriteOptions options ) {
StartWrite ( msg , std : : move ( options . set_last_message ( ) ) ) ;
}
void Finish ( Status s ) { stream_ - > Finish ( std : : move ( s ) ) ; }
private :
friend class ServerCallbackReaderWriter < Request , Response > ;
void BindStream ( ServerCallbackReaderWriter < Request , Response > * stream ) {
stream_ = stream ;
}
ServerCallbackReaderWriter < Request , Response > * stream_ ;
} ;
template < class Request , class Response >
class ServerReadReactor : public internal : : ServerReactor {
public :
~ ServerReadReactor ( ) = default ;
virtual void OnStarted ( ServerContext * , Response * resp ) { }
virtual void OnSendInitialMetadataDone ( bool ok ) { }
virtual void OnReadDone ( bool ok ) { }
void StartSendInitialMetadata ( ) { reader_ - > SendInitialMetadata ( ) ; }
void StartRead ( Request * msg ) { reader_ - > Read ( msg ) ; }
void Finish ( Status s ) { reader_ - > Finish ( std : : move ( s ) ) ; }
private :
friend class ServerCallbackReader < Request > ;
void BindReader ( ServerCallbackReader < Request > * reader ) { reader_ = reader ; }
ServerCallbackReader < Request > * reader_ ;
} ;
template < class Request , class Response >
class ServerWriteReactor : public internal : : ServerReactor {
public :
~ ServerWriteReactor ( ) = default ;
virtual void OnStarted ( ServerContext * , const Request * req ) { }
virtual void OnSendInitialMetadataDone ( bool ok ) { }
virtual void OnWriteDone ( bool ok ) { }
void StartSendInitialMetadata ( ) { writer_ - > SendInitialMetadata ( ) ; }
void StartWrite ( const Response * msg ) { StartWrite ( msg , WriteOptions ( ) ) ; }
void StartWrite ( const Response * msg , WriteOptions options ) {
writer_ - > Write ( msg , std : : move ( options ) ) ;
}
void StartWriteAndFinish ( const Response * msg , WriteOptions options ,
Status s ) {
writer_ - > WriteAndFinish ( msg , std : : move ( options ) , std : : move ( s ) ) ;
}
void StartWriteLast ( const Response * msg , WriteOptions options ) {
StartWrite ( msg , std : : move ( options . set_last_message ( ) ) ) ;
}
void Finish ( Status s ) { writer_ - > Finish ( std : : move ( s ) ) ; }
private :
friend class ServerCallbackWriter < Response > ;
void BindWriter ( ServerCallbackWriter < Response > * writer ) { writer_ = writer ; }
ServerCallbackWriter < Response > * writer_ ;
} ;
} // namespace experimental
namespace internal {
template < class ServiceType , class RequestType , class ResponseType >
template < class Request , class Response >
class UnimplementedReadReactor
: public experimental : : ServerReadReactor < Request , Response > {
public :
void OnDone ( ) override { delete this ; }
void OnStarted ( ServerContext * , Response * ) override {
this - > Finish ( Status ( StatusCode : : UNIMPLEMENTED , " " ) ) ;
}
} ;
template < class Request , class Response >
class UnimplementedWriteReactor
: public experimental : : ServerWriteReactor < Request , Response > {
public :
void OnDone ( ) override { delete this ; }
void OnStarted ( ServerContext * , const Request * ) override {
this - > Finish ( Status ( StatusCode : : UNIMPLEMENTED , " " ) ) ;
}
} ;
template < class Request , class Response >
class UnimplementedBidiReactor
: public experimental : : ServerBidiReactor < Request , Response > {
public :
void OnDone ( ) override { delete this ; }
void OnStarted ( ServerContext * ) override {
this - > Finish ( Status ( StatusCode : : UNIMPLEMENTED , " " ) ) ;
}
} ;
template < class RequestType , class ResponseType >
class CallbackUnaryHandler : public MethodHandler {
public :
CallbackUnaryHandler (
std : : function < void ( ServerContext * , const RequestType * , ResponseType * ,
experimental : : ServerCallbackRpcController * ) >
func ,
ServiceType * service )
func )
: func_ ( func ) { }
void RunHandler ( const HandlerParameter & param ) final {
// Arena allocate a controller structure (that includes request/response)
@ -81,9 +272,8 @@ class CallbackUnaryHandler : public MethodHandler {
if ( status . ok ( ) ) {
// Call the actual function handler and expect the user to call finish
CatchingCallback ( std : : move ( func_ ) , param . server_context ,
controller - > request ( ) , controller - > response ( ) ,
controller ) ;
CatchingCallback ( func_ , param . server_context , controller - > request ( ) ,
controller - > response ( ) , controller ) ;
} else {
// if deserialization failed, we need to fail the call
controller - > Finish ( status ) ;
@ -117,79 +307,579 @@ class CallbackUnaryHandler : public MethodHandler {
: public experimental : : ServerCallbackRpcController {
public :
void Finish ( Status s ) override {
finish_tag_ . Set (
call_ . call ( ) ,
[ this ] ( bool ) {
grpc_call * call = call_ . call ( ) ;
auto call_requester = std : : move ( call_requester_ ) ;
this - > ~ ServerCallbackRpcControllerImpl ( ) ; // explicitly call
// destructor
g_core_codegen_interface - > grpc_call_unref ( call ) ;
call_requester ( ) ;
} ,
& finish_buf_ ) ;
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool ) { MaybeDone ( ) ; } ,
& finish_ops_ ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
finish_buf _ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
finish_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
finish_buf _ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
finish_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
// The response is dropped if the status is not OK.
if ( s . ok ( ) ) {
finish_buf _ . ServerSendStatus ( & ctx_ - > trailing_metadata_ ,
finish_buf _ . SendMessage ( resp_ ) ) ;
finish_ops _ . ServerSendStatus ( & ctx_ - > trailing_metadata_ ,
finish_ops _ . SendMessage ( resp_ ) ) ;
} else {
finish_buf _ . ServerSendStatus ( & ctx_ - > trailing_metadata_ , s ) ;
finish_ops _ . ServerSendStatus ( & ctx_ - > trailing_metadata_ , s ) ;
}
finish_buf _ . set_core_cq_tag ( & finish_tag_ ) ;
call_ . PerformOps ( & finish_buf _ ) ;
finish_ops _ . set_core_cq_tag ( & finish_tag_ ) ;
call_ . PerformOps ( & finish_ops _ ) ;
}
void SendInitialMetadata ( std : : function < void ( bool ) > f ) override {
GPR_CODEGEN_ASSERT ( ! ctx_ - > sent_initial_metadata_ ) ;
meta_tag_ . Set ( call_ . call ( ) , std : : move ( f ) , & meta_buf_ ) ;
meta_buf_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
callbacks_outstanding_ + + ;
// TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
// and if performance of this operation matters
meta_tag_ . Set ( call_ . call ( ) ,
[ this , f ] ( bool ok ) {
f ( ok ) ;
MaybeDone ( ) ;
} ,
& meta_ops_ ) ;
meta_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
meta_buf_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
meta_ops _ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
meta_buf_ . set_core_cq_tag ( & meta_tag_ ) ;
call_ . PerformOps ( & meta_buf_ ) ;
meta_ops _ . set_core_cq_tag ( & meta_tag_ ) ;
call_ . PerformOps ( & meta_ops _ ) ;
}
private :
template < class SrvType , class ReqType , class RespType >
friend class CallbackUnaryHandler ;
friend class CallbackUnaryHandler < RequestType , ResponseType > ;
ServerCallbackRpcControllerImpl ( ServerContext * ctx , Call * call ,
RequestType * req ,
const RequestType * req ,
std : : function < void ( ) > call_requester )
: ctx_ ( ctx ) ,
call_ ( * call ) ,
req_ ( req ) ,
call_requester_ ( std : : move ( call_requester ) ) { }
call_requester_ ( std : : move ( call_requester ) ) {
ctx_ - > BeginCompletionOp ( call , [ this ] ( bool ) { MaybeDone ( ) ; } , nullptr ) ;
}
~ ServerCallbackRpcControllerImpl ( ) { req_ - > ~ RequestType ( ) ; }
RequestType * request ( ) { return req_ ; }
const RequestType * request ( ) { return req_ ; }
ResponseType * response ( ) { return & resp_ ; }
CallOpSet < CallOpSendInitialMetadata > meta_buf_ ;
void MaybeDone ( ) {
if ( - - callbacks_outstanding_ = = 0 ) {
grpc_call * call = call_ . call ( ) ;
auto call_requester = std : : move ( call_requester_ ) ;
this - > ~ ServerCallbackRpcControllerImpl ( ) ; // explicitly call destructor
g_core_codegen_interface - > grpc_call_unref ( call ) ;
call_requester ( ) ;
}
}
CallOpSet < CallOpSendInitialMetadata > meta_ops_ ;
CallbackWithSuccessTag meta_tag_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage ,
CallOpServerSendStatus >
finish_buf_ ;
finish_ops _ ;
CallbackWithSuccessTag finish_tag_ ;
ServerContext * ctx_ ;
Call call_ ;
RequestType * req_ ;
const RequestType * req_ ;
ResponseType resp_ ;
std : : function < void ( ) > call_requester_ ;
std : : atomic_int callbacks_outstanding_ {
2 } ; // reserve for Finish and CompletionOp
} ;
} ;
template < class RequestType , class ResponseType >
class CallbackClientStreamingHandler : public MethodHandler {
public :
CallbackClientStreamingHandler (
std : : function <
experimental : : ServerReadReactor < RequestType , ResponseType > * ( ) >
func )
: func_ ( std : : move ( func ) ) { }
void RunHandler ( const HandlerParameter & param ) final {
// Arena allocate a reader structure (that includes response)
g_core_codegen_interface - > grpc_call_ref ( param . call - > call ( ) ) ;
experimental : : ServerReadReactor < RequestType , ResponseType > * reactor =
param . status . ok ( )
? CatchingReactorCreator <
experimental : : ServerReadReactor < RequestType , ResponseType > > (
func_ )
: nullptr ;
if ( reactor = = nullptr ) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new UnimplementedReadReactor < RequestType , ResponseType > ;
}
auto * reader = new ( g_core_codegen_interface - > grpc_call_arena_alloc (
param . call - > call ( ) , sizeof ( ServerCallbackReaderImpl ) ) )
ServerCallbackReaderImpl ( param . server_context , param . call ,
std : : move ( param . call_requester ) , reactor ) ;
reader - > BindReactor ( reactor ) ;
reactor - > OnStarted ( param . server_context , reader - > response ( ) ) ;
reader - > MaybeDone ( ) ;
}
private :
std : : function < experimental : : ServerReadReactor < RequestType , ResponseType > * ( ) >
func_ ;
class ServerCallbackReaderImpl
: public experimental : : ServerCallbackReader < RequestType > {
public :
void Finish ( Status s ) override {
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool ) { MaybeDone ( ) ; } ,
& finish_ops_ ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
finish_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
finish_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
// The response is dropped if the status is not OK.
if ( s . ok ( ) ) {
finish_ops_ . ServerSendStatus ( & ctx_ - > trailing_metadata_ ,
finish_ops_ . SendMessage ( resp_ ) ) ;
} else {
finish_ops_ . ServerSendStatus ( & ctx_ - > trailing_metadata_ , s ) ;
}
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
call_ . PerformOps ( & finish_ops_ ) ;
}
void SendInitialMetadata ( ) override {
GPR_CODEGEN_ASSERT ( ! ctx_ - > sent_initial_metadata_ ) ;
callbacks_outstanding_ + + ;
meta_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnSendInitialMetadataDone ( ok ) ;
MaybeDone ( ) ;
} ,
& meta_ops_ ) ;
meta_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
meta_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
meta_ops_ . set_core_cq_tag ( & meta_tag_ ) ;
call_ . PerformOps ( & meta_ops_ ) ;
}
void Read ( RequestType * req ) override {
callbacks_outstanding_ + + ;
read_ops_ . RecvMessage ( req ) ;
call_ . PerformOps ( & read_ops_ ) ;
}
private :
friend class CallbackClientStreamingHandler < RequestType , ResponseType > ;
ServerCallbackReaderImpl (
ServerContext * ctx , Call * call , std : : function < void ( ) > call_requester ,
experimental : : ServerReadReactor < RequestType , ResponseType > * reactor )
: ctx_ ( ctx ) ,
call_ ( * call ) ,
call_requester_ ( std : : move ( call_requester ) ) ,
reactor_ ( reactor ) {
ctx_ - > BeginCompletionOp ( call , [ this ] ( bool ) { MaybeDone ( ) ; } , reactor ) ;
read_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadDone ( ok ) ;
MaybeDone ( ) ;
} ,
& read_ops_ ) ;
read_ops_ . set_core_cq_tag ( & read_tag_ ) ;
}
~ ServerCallbackReaderImpl ( ) { }
ResponseType * response ( ) { return & resp_ ; }
void MaybeDone ( ) {
if ( - - callbacks_outstanding_ = = 0 ) {
reactor_ - > OnDone ( ) ;
grpc_call * call = call_ . call ( ) ;
auto call_requester = std : : move ( call_requester_ ) ;
this - > ~ ServerCallbackReaderImpl ( ) ; // explicitly call destructor
g_core_codegen_interface - > grpc_call_unref ( call ) ;
call_requester ( ) ;
}
}
CallOpSet < CallOpSendInitialMetadata > meta_ops_ ;
CallbackWithSuccessTag meta_tag_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage ,
CallOpServerSendStatus >
finish_ops_ ;
CallbackWithSuccessTag finish_tag_ ;
CallOpSet < CallOpRecvMessage < RequestType > > read_ops_ ;
CallbackWithSuccessTag read_tag_ ;
ServerContext * ctx_ ;
Call call_ ;
ResponseType resp_ ;
std : : function < void ( ) > call_requester_ ;
experimental : : ServerReadReactor < RequestType , ResponseType > * reactor_ ;
std : : atomic_int callbacks_outstanding_ {
3 } ; // reserve for OnStarted, Finish, and CompletionOp
} ;
} ;
template < class RequestType , class ResponseType >
class CallbackServerStreamingHandler : public MethodHandler {
public :
CallbackServerStreamingHandler (
std : : function <
experimental : : ServerWriteReactor < RequestType , ResponseType > * ( ) >
func )
: func_ ( std : : move ( func ) ) { }
void RunHandler ( const HandlerParameter & param ) final {
// Arena allocate a writer structure
g_core_codegen_interface - > grpc_call_ref ( param . call - > call ( ) ) ;
experimental : : ServerWriteReactor < RequestType , ResponseType > * reactor =
param . status . ok ( )
? CatchingReactorCreator <
experimental : : ServerWriteReactor < RequestType , ResponseType > > (
func_ )
: nullptr ;
if ( reactor = = nullptr ) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new UnimplementedWriteReactor < RequestType , ResponseType > ;
}
auto * writer = new ( g_core_codegen_interface - > grpc_call_arena_alloc (
param . call - > call ( ) , sizeof ( ServerCallbackWriterImpl ) ) )
ServerCallbackWriterImpl ( param . server_context , param . call ,
static_cast < RequestType * > ( param . request ) ,
std : : move ( param . call_requester ) , reactor ) ;
writer - > BindReactor ( reactor ) ;
reactor - > OnStarted ( param . server_context , writer - > request ( ) ) ;
writer - > MaybeDone ( ) ;
}
void * Deserialize ( grpc_call * call , grpc_byte_buffer * req ,
Status * status ) final {
ByteBuffer buf ;
buf . set_buffer ( req ) ;
auto * request = new ( g_core_codegen_interface - > grpc_call_arena_alloc (
call , sizeof ( RequestType ) ) ) RequestType ( ) ;
* status = SerializationTraits < RequestType > : : Deserialize ( & buf , request ) ;
buf . Release ( ) ;
if ( status - > ok ( ) ) {
return request ;
}
request - > ~ RequestType ( ) ;
return nullptr ;
}
private :
std : : function < experimental : : ServerWriteReactor < RequestType , ResponseType > * ( ) >
func_ ;
class ServerCallbackWriterImpl
: public experimental : : ServerCallbackWriter < ResponseType > {
public :
void Finish ( Status s ) override {
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool ) { MaybeDone ( ) ; } ,
& finish_ops_ ) ;
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
finish_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
finish_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
finish_ops_ . ServerSendStatus ( & ctx_ - > trailing_metadata_ , s ) ;
call_ . PerformOps ( & finish_ops_ ) ;
}
void SendInitialMetadata ( ) override {
GPR_CODEGEN_ASSERT ( ! ctx_ - > sent_initial_metadata_ ) ;
callbacks_outstanding_ + + ;
meta_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnSendInitialMetadataDone ( ok ) ;
MaybeDone ( ) ;
} ,
& meta_ops_ ) ;
meta_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
meta_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
meta_ops_ . set_core_cq_tag ( & meta_tag_ ) ;
call_ . PerformOps ( & meta_ops_ ) ;
}
void Write ( const ResponseType * resp , WriteOptions options ) override {
callbacks_outstanding_ + + ;
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
}
if ( ! ctx_ - > sent_initial_metadata_ ) {
write_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
write_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( * resp , options ) . ok ( ) ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void WriteAndFinish ( const ResponseType * resp , WriteOptions options ,
Status s ) override {
// This combines the write into the finish callback
// Don't send any message if the status is bad
if ( s . ok ( ) ) {
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT ( finish_ops_ . SendMessage ( * resp , options ) . ok ( ) ) ;
}
Finish ( std : : move ( s ) ) ;
}
private :
friend class CallbackServerStreamingHandler < RequestType , ResponseType > ;
ServerCallbackWriterImpl (
ServerContext * ctx , Call * call , const RequestType * req ,
std : : function < void ( ) > call_requester ,
experimental : : ServerWriteReactor < RequestType , ResponseType > * reactor )
: ctx_ ( ctx ) ,
call_ ( * call ) ,
req_ ( req ) ,
call_requester_ ( std : : move ( call_requester ) ) ,
reactor_ ( reactor ) {
ctx_ - > BeginCompletionOp ( call , [ this ] ( bool ) { MaybeDone ( ) ; } , reactor ) ;
write_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnWriteDone ( ok ) ;
MaybeDone ( ) ;
} ,
& write_ops_ ) ;
write_ops_ . set_core_cq_tag ( & write_tag_ ) ;
}
~ ServerCallbackWriterImpl ( ) { req_ - > ~ RequestType ( ) ; }
const RequestType * request ( ) { return req_ ; }
void MaybeDone ( ) {
if ( - - callbacks_outstanding_ = = 0 ) {
reactor_ - > OnDone ( ) ;
grpc_call * call = call_ . call ( ) ;
auto call_requester = std : : move ( call_requester_ ) ;
this - > ~ ServerCallbackWriterImpl ( ) ; // explicitly call destructor
g_core_codegen_interface - > grpc_call_unref ( call ) ;
call_requester ( ) ;
}
}
CallOpSet < CallOpSendInitialMetadata > meta_ops_ ;
CallbackWithSuccessTag meta_tag_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage ,
CallOpServerSendStatus >
finish_ops_ ;
CallbackWithSuccessTag finish_tag_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage > write_ops_ ;
CallbackWithSuccessTag write_tag_ ;
ServerContext * ctx_ ;
Call call_ ;
const RequestType * req_ ;
std : : function < void ( ) > call_requester_ ;
experimental : : ServerWriteReactor < RequestType , ResponseType > * reactor_ ;
std : : atomic_int callbacks_outstanding_ {
3 } ; // reserve for OnStarted, Finish, and CompletionOp
} ;
} ;
template < class RequestType , class ResponseType >
class CallbackBidiHandler : public MethodHandler {
public :
CallbackBidiHandler (
std : : function <
experimental : : ServerBidiReactor < RequestType , ResponseType > * ( ) >
func )
: func_ ( std : : move ( func ) ) { }
void RunHandler ( const HandlerParameter & param ) final {
g_core_codegen_interface - > grpc_call_ref ( param . call - > call ( ) ) ;
experimental : : ServerBidiReactor < RequestType , ResponseType > * reactor =
param . status . ok ( )
? CatchingReactorCreator <
experimental : : ServerBidiReactor < RequestType , ResponseType > > (
func_ )
: nullptr ;
if ( reactor = = nullptr ) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new UnimplementedBidiReactor < RequestType , ResponseType > ;
}
auto * stream = new ( g_core_codegen_interface - > grpc_call_arena_alloc (
param . call - > call ( ) , sizeof ( ServerCallbackReaderWriterImpl ) ) )
ServerCallbackReaderWriterImpl ( param . server_context , param . call ,
std : : move ( param . call_requester ) ,
reactor ) ;
stream - > BindReactor ( reactor ) ;
reactor - > OnStarted ( param . server_context ) ;
stream - > MaybeDone ( ) ;
}
private :
std : : function < experimental : : ServerBidiReactor < RequestType , ResponseType > * ( ) >
func_ ;
class ServerCallbackReaderWriterImpl
: public experimental : : ServerCallbackReaderWriter < RequestType ,
ResponseType > {
public :
void Finish ( Status s ) override {
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool ) { MaybeDone ( ) ; } ,
& finish_ops_ ) ;
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
finish_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
finish_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
finish_ops_ . ServerSendStatus ( & ctx_ - > trailing_metadata_ , s ) ;
call_ . PerformOps ( & finish_ops_ ) ;
}
void SendInitialMetadata ( ) override {
GPR_CODEGEN_ASSERT ( ! ctx_ - > sent_initial_metadata_ ) ;
callbacks_outstanding_ + + ;
meta_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnSendInitialMetadataDone ( ok ) ;
MaybeDone ( ) ;
} ,
& meta_ops_ ) ;
meta_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
meta_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
meta_ops_ . set_core_cq_tag ( & meta_tag_ ) ;
call_ . PerformOps ( & meta_ops_ ) ;
}
void Write ( const ResponseType * resp , WriteOptions options ) override {
callbacks_outstanding_ + + ;
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
}
if ( ! ctx_ - > sent_initial_metadata_ ) {
write_ops_ . SendInitialMetadata ( & ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
write_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( * resp , options ) . ok ( ) ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void WriteAndFinish ( const ResponseType * resp , WriteOptions options ,
Status s ) override {
// Don't send any message if the status is bad
if ( s . ok ( ) ) {
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT ( finish_ops_ . SendMessage ( * resp , options ) . ok ( ) ) ;
}
Finish ( std : : move ( s ) ) ;
}
void Read ( RequestType * req ) override {
callbacks_outstanding_ + + ;
read_ops_ . RecvMessage ( req ) ;
call_ . PerformOps ( & read_ops_ ) ;
}
private :
friend class CallbackBidiHandler < RequestType , ResponseType > ;
ServerCallbackReaderWriterImpl (
ServerContext * ctx , Call * call , std : : function < void ( ) > call_requester ,
experimental : : ServerBidiReactor < RequestType , ResponseType > * reactor )
: ctx_ ( ctx ) ,
call_ ( * call ) ,
call_requester_ ( std : : move ( call_requester ) ) ,
reactor_ ( reactor ) {
ctx_ - > BeginCompletionOp ( call , [ this ] ( bool ) { MaybeDone ( ) ; } , reactor ) ;
write_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnWriteDone ( ok ) ;
MaybeDone ( ) ;
} ,
& write_ops_ ) ;
write_ops_ . set_core_cq_tag ( & write_tag_ ) ;
read_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadDone ( ok ) ;
MaybeDone ( ) ;
} ,
& read_ops_ ) ;
read_ops_ . set_core_cq_tag ( & read_tag_ ) ;
}
~ ServerCallbackReaderWriterImpl ( ) { }
void MaybeDone ( ) {
if ( - - callbacks_outstanding_ = = 0 ) {
reactor_ - > OnDone ( ) ;
grpc_call * call = call_ . call ( ) ;
auto call_requester = std : : move ( call_requester_ ) ;
this - > ~ ServerCallbackReaderWriterImpl ( ) ; // explicitly call destructor
g_core_codegen_interface - > grpc_call_unref ( call ) ;
call_requester ( ) ;
}
}
CallOpSet < CallOpSendInitialMetadata > meta_ops_ ;
CallbackWithSuccessTag meta_tag_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage ,
CallOpServerSendStatus >
finish_ops_ ;
CallbackWithSuccessTag finish_tag_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage > write_ops_ ;
CallbackWithSuccessTag write_tag_ ;
CallOpSet < CallOpRecvMessage < RequestType > > read_ops_ ;
CallbackWithSuccessTag read_tag_ ;
ServerContext * ctx_ ;
Call call_ ;
std : : function < void ( ) > call_requester_ ;
experimental : : ServerBidiReactor < RequestType , ResponseType > * reactor_ ;
std : : atomic_int callbacks_outstanding_ {
3 } ; // reserve for OnStarted, Finish, and CompletionOp
} ;
} ;