@ -101,6 +101,39 @@ class AsyncWriterInterface {
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
virtual void Write ( const W & msg , void * tag ) = 0 ;
/// Request the writing of \a msg using WriteOptions \a options with
/// identifying tag \a tag.
///
/// Only one write may be outstanding at any given time. This means that
/// after calling Write, one must wait to receive \a tag from the completion
/// queue BEFORE calling Write again.
/// WriteOptions \a options is used to set the write options of this message.
/// This is thread-safe with respect to \a Read
///
/// \param[in] msg The message to be written.
/// \param[in] options The WriteOptions to be used to write this message.
/// \param[in] tag The tag identifying the operation.
virtual void Write ( const W & msg , WriteOptions options , void * tag ) = 0 ;
/// Request the writing of \a msg and coalesce it with the writing
/// of trailing metadata, using WriteOptions \a options with identifying tag
/// \a tag.
///
/// For client, WriteLast is equivalent of performing Write and WritesDone in
/// a single step.
/// For server, WriteLast buffers the \a msg. The writing of \a msg is held
/// until Finish is called, where \a msg and trailing metadata are coalesced
/// and write is initiated. Note that WriteLast can only buffer \a msg up to
/// the flow control window size. If \a msg size is larger than the window
/// size, it will be sent on wire without buffering.
///
/// \param[in] msg The message to be written.
/// \param[in] options The WriteOptions to be used to write this message.
/// \param[in] tag The tag identifying the operation.
void WriteLast ( const W & msg , WriteOptions options , void * tag ) {
Write ( msg , options . set_last_message ( ) , tag ) ;
}
} ;
template < class R >
@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
: context_ ( context ) , call_ ( channel - > CreateCall ( method , context , cq ) ) {
finish_ops_ . RecvMessage ( response ) ;
finish_ops_ . AllowNoMessage ( ) ;
init_ops_ . set_output_tag ( tag ) ;
init_ops_ . SendInitialMetadata ( context - > send_initial_metadata_ ,
context - > initial_metadata_flags ( ) ) ;
call_ . PerformOps ( & init_ops_ ) ;
// if corked bit is set in context, we buffer up the initial metadata to
// coalesce with later message to be sent. No op is performed.
if ( context_ - > initial_metadata_corked_ ) {
write_ops_ . SendInitialMetadata ( context - > send_initial_metadata_ ,
context - > initial_metadata_flags ( ) ) ;
} else {
init_ops_ . set_output_tag ( tag ) ;
init_ops_ . SendInitialMetadata ( context - > send_initial_metadata_ ,
context - > initial_metadata_flags ( ) ) ;
call_ . PerformOps ( & init_ops_ ) ;
}
}
void ReadInitialMetadata ( void * tag ) override {
@ -205,6 +244,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
call_ . PerformOps ( & write_ops_ ) ;
}
void Write ( const W & msg , WriteOptions options , void * tag ) {
write_ops_ . set_output_tag ( tag ) ;
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
write_ops_ . ClientSendClose ( ) ;
}
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( msg , options ) . ok ( ) ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void WritesDone ( void * tag ) override {
writes_done_ops_ . set_output_tag ( tag ) ;
writes_done_ops_ . ClientSendClose ( ) ;
@ -225,7 +275,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
Call call_ ;
CallOpSet < CallOpSendInitialMetadata > init_ops_ ;
CallOpSet < CallOpRecvInitialMetadata > meta_ops_ ;
CallOpSet < CallOpSendMessage > write_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage , CallOpClientSendClose >
write_ops_ ;
CallOpSet < CallOpClientSendClose > writes_done_ops_ ;
CallOpSet < CallOpRecvInitialMetadata , CallOpGenericRecvMessage ,
CallOpClientRecvStatus >
@ -253,10 +304,17 @@ class ClientAsyncReaderWriter final
const RpcMethod & method , ClientContext * context ,
void * tag )
: context_ ( context ) , call_ ( channel - > CreateCall ( method , context , cq ) ) {
init_ops_ . set_output_tag ( tag ) ;
init_ops_ . SendInitialMetadata ( context - > send_initial_metadata_ ,
context - > initial_metadata_flags ( ) ) ;
call_ . PerformOps ( & init_ops_ ) ;
if ( context_ - > initial_metadata_corked_ ) {
// if corked bit is set in context, we buffer up the initial metadata to
// coalesce with later message to be sent. No op is performed.
write_ops_ . SendInitialMetadata ( context - > send_initial_metadata_ ,
context - > initial_metadata_flags ( ) ) ;
} else {
init_ops_ . set_output_tag ( tag ) ;
init_ops_ . SendInitialMetadata ( context - > send_initial_metadata_ ,
context - > initial_metadata_flags ( ) ) ;
call_ . PerformOps ( & init_ops_ ) ;
}
}
void ReadInitialMetadata ( void * tag ) override {
@ -283,10 +341,21 @@ class ClientAsyncReaderWriter final
call_ . PerformOps ( & write_ops_ ) ;
}
void Write ( const W & msg , WriteOptions options , void * tag ) {
write_ops_ . set_output_tag ( tag ) ;
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
write_ops_ . ClientSendClose ( ) ;
}
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( msg , options ) . ok ( ) ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void WritesDone ( void * tag ) override {
writes_done_ops_ . set_output_tag ( tag ) ;
writes_done_ops_ . ClientSendClose ( ) ;
call_ . PerformOps ( & writes_done_ops_ ) ;
write_ops_ . set_output_tag ( tag ) ;
write_ops_ . ClientSendClose ( ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void Finish ( Status * status , void * tag ) override {
@ -304,8 +373,8 @@ class ClientAsyncReaderWriter final
CallOpSet < CallOpSendInitialMetadata > init_ops_ ;
CallOpSet < CallOpRecvInitialMetadata > meta_ops_ ;
CallOpSet < CallOpRecvInitialMetadata , CallOpRecvMessage < R > > read_ops_ ;
CallOpSet < CallOpSendMessage > write_ops_ ;
CallOpSet < CallOpClientSendClose > writes_don e_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage , CallOpClientSendClose >
write_ops_ ;
CallOpSet < CallOpRecvInitialMetadata , CallOpClientRecvStatus > finish_ops_ ;
} ;
@ -395,6 +464,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
public AsyncWriterInterface < W > {
public :
virtual void Finish ( const Status & status , void * tag ) = 0 ;
/// Request the writing of \a msg and coalesce it with trailing metadata which
/// contains \a status, using WriteOptions options with identifying tag \a
/// tag.
///
/// WriteAndFinish is equivalent of performing WriteLast and Finish in a
/// single step.
///
/// \param[in] msg The message to be written.
/// \param[in] options The WriteOptions to be used to write this message.
/// \param[in] status The Status that server returns to client.
/// \param[in] tag The tag identifying the operation.
virtual void WriteAndFinish ( const W & msg , WriteOptions options ,
const Status & status , void * tag ) = 0 ;
} ;
template < class W >
@ -431,6 +514,42 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
call_ . PerformOps ( & write_ops_ ) ;
}
void Write ( const W & msg , WriteOptions options , void * tag ) override {
write_ops_ . set_output_tag ( tag ) ;
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
}
if ( ! ctx_ - > sent_initial_metadata_ ) {
write_ops_ . SendInitialMetadata ( ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
write_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( msg , options ) . ok ( ) ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void WriteAndFinish ( const W & msg , WriteOptions options , const Status & status ,
void * tag ) override {
write_ops_ . set_output_tag ( tag ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
write_ops_ . SendInitialMetadata ( ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
write_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
options . set_buffer_hint ( ) ;
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( msg , options ) . ok ( ) ) ;
write_ops_ . ServerSendStatus ( ctx_ - > trailing_metadata_ , status ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void Finish ( const Status & status , void * tag ) override {
finish_ops_ . set_output_tag ( tag ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
@ -451,7 +570,9 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
Call call_ ;
ServerContext * ctx_ ;
CallOpSet < CallOpSendInitialMetadata > meta_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage > write_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage ,
CallOpServerSendStatus >
write_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpServerSendStatus > finish_ops_ ;
} ;
@ -462,6 +583,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
public AsyncReaderInterface < R > {
public :
virtual void Finish ( const Status & status , void * tag ) = 0 ;
/// Request the writing of \a msg and coalesce it with trailing metadata which
/// contains \a status, using WriteOptions options with identifying tag \a
/// tag.
///
/// WriteAndFinish is equivalent of performing WriteLast and Finish in a
/// single step.
///
/// \param[in] msg The message to be written.
/// \param[in] options The WriteOptions to be used to write this message.
/// \param[in] status The Status that server returns to client.
/// \param[in] tag The tag identifying the operation.
virtual void WriteAndFinish ( const W & msg , WriteOptions options ,
const Status & status , void * tag ) = 0 ;
} ;
template < class W , class R >
@ -505,6 +640,40 @@ class ServerAsyncReaderWriter final
call_ . PerformOps ( & write_ops_ ) ;
}
void Write ( const W & msg , WriteOptions options , void * tag ) override {
write_ops_ . set_output_tag ( tag ) ;
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
}
if ( ! ctx_ - > sent_initial_metadata_ ) {
write_ops_ . SendInitialMetadata ( ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
write_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( msg , options ) . ok ( ) ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void WriteAndFinish ( const W & msg , WriteOptions options , const Status & status ,
void * tag ) override {
write_ops_ . set_output_tag ( tag ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
write_ops_ . SendInitialMetadata ( ctx_ - > initial_metadata_ ,
ctx_ - > initial_metadata_flags ( ) ) ;
if ( ctx_ - > compression_level_set ( ) ) {
write_ops_ . set_compression_level ( ctx_ - > compression_level ( ) ) ;
}
ctx_ - > sent_initial_metadata_ = true ;
}
options . set_buffer_hint ( ) ;
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessage ( msg , options ) . ok ( ) ) ;
write_ops_ . ServerSendStatus ( ctx_ - > trailing_metadata_ , status ) ;
call_ . PerformOps ( & write_ops_ ) ;
}
void Finish ( const Status & status , void * tag ) override {
finish_ops_ . set_output_tag ( tag ) ;
if ( ! ctx_ - > sent_initial_metadata_ ) {
@ -528,7 +697,9 @@ class ServerAsyncReaderWriter final
ServerContext * ctx_ ;
CallOpSet < CallOpSendInitialMetadata > meta_ops_ ;
CallOpSet < CallOpRecvMessage < R > > read_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage > write_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpSendMessage ,
CallOpServerSendStatus >
write_ops_ ;
CallOpSet < CallOpSendInitialMetadata , CallOpServerSendStatus > finish_ops_ ;
} ;