@ -461,51 +461,76 @@ class ClientCallbackReaderWriterImpl
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any read backlog
// 3. Any write backlog
// 4. Recv trailing metadata (unless corked)
// 4. Recv trailing metadata, on_completion callback
started_ = true ;
start_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadInitialMetadataDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& start_ops_ , /*can_inline=*/ false ) ;
if ( ! start_corked_ ) {
start_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
}
start_ops_ . RecvInitialMetadata ( context_ ) ;
start_ops_ . set_core_cq_tag ( & start_tag_ ) ;
call_ . PerformOps ( & start_ops_ ) ;
{
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( backlog_ . read_ops ) {
call_ . PerformOps ( & read_ops_ ) ;
}
if ( backlog_ . write_ops ) {
call_ . PerformOps ( & write_ops_ ) ;
}
if ( backlog_ . writes_done_ops ) {
call_ . PerformOps ( & writes_done_ops_ ) ;
}
call_ . PerformOps ( & finish_ops_ ) ;
// The last thing in this critical section is to set started_ so that it
// can be used lock-free as well.
started_ . store ( true , std : : memory_order_release ) ;
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnWriteDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& write_ops_ , /*can_inline=*/ false ) ;
write_ops_ . set_core_cq_tag ( & write_tag_ ) ;
read_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& read_ops_ , /*can_inline=*/ false ) ;
read_ops_ . set_core_cq_tag ( & read_tag_ ) ;
if ( read_ops_at_start_ ) {
call_ . PerformOps ( & read_ops_ ) ;
}
if ( write_ops_at_start_ ) {
call_ . PerformOps ( & write_ops_ ) ;
}
if ( writes_done_ops_at_start_ ) {
call_ . PerformOps ( & writes_done_ops_ ) ;
}
// MaybeFinish outside the lock to make sure that destruction of this object
// doesn't take place while holding the lock (which would cause the lock to
// be released after destruction)
this - > MaybeFinish ( ) ;
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool /*ok*/ ) { MaybeFinish ( ) ; } ,
& finish_ops_ , /*can_inline=*/ false ) ;
finish_ops_ . ClientRecvStatus ( context_ , & finish_status_ ) ;
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
call_ . PerformOps ( & finish_ops_ ) ;
}
void Read ( Response * msg ) override {
read_ops_ . RecvMessage ( msg ) ;
callbacks_outstanding_ . fetch_add ( 1 , std : : memory_order_relaxed ) ;
if ( GPR_UNLIKELY ( ! started_ . load ( std : : memory_order_acquire ) ) ) {
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( GPR_LIKELY ( ! started_ . load ( std : : memory_order_relaxed ) ) ) {
backlog_ . read_ops = true ;
return ;
}
if ( started_ ) {
call_ . PerformOps ( & read_ops_ ) ;
} else {
read_ops_at_start_ = true ;
}
call_ . PerformOps ( & read_ops_ ) ;
}
void Write ( const Request * msg , : : grpc : : WriteOptions options ) override {
if ( start_corked_ ) {
write_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
start_corked_ = false ;
}
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
write_ops_ . ClientSendClose ( ) ;
@ -513,22 +538,18 @@ class ClientCallbackReaderWriterImpl
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessagePtr ( msg , options ) . ok ( ) ) ;
callbacks_outstanding_ . fetch_add ( 1 , std : : memory_order_relaxed ) ;
if ( GPR_UNLIKELY ( corked_write_needed_ ) ) {
write_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
corked_write_needed_ = false ;
}
if ( GPR_UNLIKELY ( ! started_ . load ( std : : memory_order_acquire ) ) ) {
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( GPR_LIKELY ( ! started_ . load ( std : : memory_order_relaxed ) ) ) {
backlog_ . write_ops = true ;
return ;
}
if ( started_ ) {
call_ . PerformOps ( & write_ops_ ) ;
} else {
write_ops_at_start_ = true ;
}
call_ . PerformOps ( & write_ops_ ) ;
}
void WritesDone ( ) override {
if ( start_corked_ ) {
writes_done_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
start_corked_ = false ;
}
writes_done_ops_ . ClientSendClose ( ) ;
writes_done_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
@ -538,19 +559,11 @@ class ClientCallbackReaderWriterImpl
& writes_done_ops_ , /*can_inline=*/ false ) ;
writes_done_ops_ . set_core_cq_tag ( & writes_done_tag_ ) ;
callbacks_outstanding_ . fetch_add ( 1 , std : : memory_order_relaxed ) ;
if ( GPR_UNLIKELY ( corked_write_needed_ ) ) {
writes_done_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
corked_write_needed_ = fals e;
if ( started_ ) {
call_ . PerformOps ( & writes_done_ops_ ) ;
} else {
writes_done_ops_at_start_ = tru e;
}
if ( GPR_UNLIKELY ( ! started_ . load ( std : : memory_order_acquire ) ) ) {
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( GPR_LIKELY ( ! started_ . load ( std : : memory_order_relaxed ) ) ) {
backlog_ . writes_done_ops = true ;
return ;
}
}
call_ . PerformOps ( & writes_done_ops_ ) ;
}
void AddHold ( int holds ) override {
@ -567,42 +580,8 @@ class ClientCallbackReaderWriterImpl
: context_ ( context ) ,
call_ ( call ) ,
reactor_ ( reactor ) ,
start_corked_ ( context_ - > initial_metadata_corked_ ) ,
corked_write_needed_ ( start_corked_ ) {
start_corked_ ( context_ - > initial_metadata_corked_ ) {
this - > BindReactor ( reactor ) ;
// Set up the unchanging parts of the start, read, and write tags and ops.
start_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadInitialMetadataDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& start_ops_ , /*can_inline=*/ false ) ;
start_ops_ . RecvInitialMetadata ( context_ ) ;
start_ops_ . set_core_cq_tag ( & start_tag_ ) ;
write_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnWriteDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& write_ops_ , /*can_inline=*/ false ) ;
write_ops_ . set_core_cq_tag ( & write_tag_ ) ;
read_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& read_ops_ , /*can_inline=*/ false ) ;
read_ops_ . set_core_cq_tag ( & read_tag_ ) ;
// Also set up the Finish tag and op set.
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool /*ok*/ ) { MaybeFinish ( ) ; } ,
& finish_ops_ ,
/*can_inline=*/ false ) ;
finish_ops_ . ClientRecvStatus ( context_ , & finish_status_ ) ;
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
}
: : grpc_impl : : ClientContext * const context_ ;
@ -613,9 +592,7 @@ class ClientCallbackReaderWriterImpl
grpc : : internal : : CallOpRecvInitialMetadata >
start_ops_ ;
grpc : : internal : : CallbackWithSuccessTag start_tag_ ;
const bool start_corked_ ;
bool corked_write_needed_ ; // no lock needed since only accessed in
// Write/WritesDone which cannot be concurrent
bool start_corked_ ;
grpc : : internal : : CallOpSet < grpc : : internal : : CallOpClientRecvStatus > finish_ops_ ;
grpc : : internal : : CallbackWithSuccessTag finish_tag_ ;
@ -626,27 +603,22 @@ class ClientCallbackReaderWriterImpl
grpc : : internal : : CallOpClientSendClose >
write_ops_ ;
grpc : : internal : : CallbackWithSuccessTag write_tag_ ;
bool write_ops_at_start_ { false } ;
grpc : : internal : : CallOpSet < grpc : : internal : : CallOpSendInitialMetadata ,
grpc : : internal : : CallOpClientSendClose >
writes_done_ops_ ;
grpc : : internal : : CallbackWithSuccessTag writes_done_tag_ ;
bool writes_done_ops_at_start_ { false } ;
grpc : : internal : : CallOpSet < grpc : : internal : : CallOpRecvMessage < Response > >
read_ops_ ;
grpc : : internal : : CallbackWithSuccessTag read_tag_ ;
bool read_ops_at_start_ { false } ;
struct StartCallBacklog {
bool write_ops = false ;
bool writes_done_ops = false ;
bool read_ops = false ;
} ;
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */ ;
// Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
std : : atomic < intptr_t > callbacks_outstanding_ { 3 } ;
std : : atomic_bool started_ { false } ;
grpc : : internal : : Mutex start_mu_ ;
// Minimum of 2 callbacks to pre-register for start and finish
std : : atomic < intptr_t > callbacks_outstanding_ { 2 } ;
bool started_ { false } ;
} ;
template < class Request , class Response >
@ -698,7 +670,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
// This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog
// 3. Recv trailing metadata
// 3. Recv trailing metadata, on_completion callback
started_ = true ;
start_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
@ -720,13 +693,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
} ,
& read_ops_ , /*can_inline=*/ false ) ;
read_ops_ . set_core_cq_tag ( & read_tag_ ) ;
{
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( backlog_ . read_ops ) {
call_ . PerformOps ( & read_ops_ ) ;
}
started_ . store ( true , std : : memory_order_release ) ;
if ( read_ops_at_start_ ) {
call_ . PerformOps ( & read_ops_ ) ;
}
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool /*ok*/ ) { MaybeFinish ( ) ; } ,
@ -739,14 +707,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
void Read ( Response * msg ) override {
read_ops_ . RecvMessage ( msg ) ;
callbacks_outstanding_ . fetch_add ( 1 , std : : memory_order_relaxed ) ;
if ( GPR_UNLIKELY ( ! started_ . load ( std : : memory_order_acquire ) ) ) {
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( GPR_LIKELY ( ! started_ . load ( std : : memory_order_relaxed ) ) ) {
backlog_ . read_ops = true ;
return ;
}
if ( started_ ) {
call_ . PerformOps ( & read_ops_ ) ;
} else {
read_ops_at_start_ = true ;
}
call_ . PerformOps ( & read_ops_ ) ;
}
void AddHold ( int holds ) override {
@ -787,16 +752,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
grpc : : internal : : CallOpSet < grpc : : internal : : CallOpRecvMessage < Response > >
read_ops_ ;
grpc : : internal : : CallbackWithSuccessTag read_tag_ ;
struct StartCallBacklog {
bool read_ops = false ;
} ;
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */ ;
bool read_ops_at_start_ { false } ;
// Minimum of 2 callbacks to pre-register for start and finish
std : : atomic < intptr_t > callbacks_outstanding_ { 2 } ;
std : : atomic_bool started_ { false } ;
grpc : : internal : : Mutex start_mu_ ;
bool started_ { false } ;
} ;
template < class Response >
@ -849,60 +809,74 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
// This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog
// 3. Recv trailing metadata
// 3. Recv trailing metadata, on_completion callback
started_ = true ;
start_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadInitialMetadataDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& start_ops_ , /*can_inline=*/ false ) ;
if ( ! start_corked_ ) {
start_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
}
start_ops_ . RecvInitialMetadata ( context_ ) ;
start_ops_ . set_core_cq_tag ( & start_tag_ ) ;
call_ . PerformOps ( & start_ops_ ) ;
{
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( backlog_ . write_ops ) {
call_ . PerformOps ( & write_ops_ ) ;
}
if ( backlog_ . writes_done_ops ) {
call_ . PerformOps ( & writes_done_ops_ ) ;
}
call_ . PerformOps ( & finish_ops_ ) ;
// The last thing in this critical section is to set started_ so that it
// can be used lock-free as well.
started_ . store ( true , std : : memory_order_release ) ;
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnWriteDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& write_ops_ , /*can_inline=*/ false ) ;
write_ops_ . set_core_cq_tag ( & write_tag_ ) ;
if ( write_ops_at_start_ ) {
call_ . PerformOps ( & write_ops_ ) ;
}
// MaybeFinish outside the lock to make sure that destruction of this object
// doesn't take place while holding the lock (which would cause the lock to
// be released after destruction)
this - > MaybeFinish ( ) ;
if ( writes_done_ops_at_start_ ) {
call_ . PerformOps ( & writes_done_ops_ ) ;
}
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool /*ok*/ ) { MaybeFinish ( ) ; } ,
& finish_ops_ , /*can_inline=*/ false ) ;
finish_ops_ . ClientRecvStatus ( context_ , & finish_status_ ) ;
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
call_ . PerformOps ( & finish_ops_ ) ;
}
void Write ( const Request * msg , : : grpc : : WriteOptions options ) override {
if ( GPR_UNLIKELY ( options . is_last_message ( ) ) ) {
if ( start_corked_ ) {
write_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
start_corked_ = false ;
}
if ( options . is_last_message ( ) ) {
options . set_buffer_hint ( ) ;
write_ops_ . ClientSendClose ( ) ;
}
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT ( write_ops_ . SendMessagePtr ( msg , options ) . ok ( ) ) ;
callbacks_outstanding_ . fetch_add ( 1 , std : : memory_order_relaxed ) ;
if ( GPR_UNLIKELY ( corked_write_needed_ ) ) {
write_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
corked_write_needed_ = false ;
if ( started_ ) {
call_ . PerformOps ( & write_ops_ ) ;
} else {
write_ops_at_start_ = true ;
}
if ( GPR_UNLIKELY ( ! started_ . load ( std : : memory_order_acquire ) ) ) {
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( GPR_LIKELY ( ! started_ . load ( std : : memory_order_relaxed ) ) ) {
backlog_ . write_ops = true ;
return ;
}
}
call_ . PerformOps ( & write_ops_ ) ;
}
void WritesDone ( ) override {
if ( start_corked_ ) {
writes_done_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
start_corked_ = false ;
}
writes_done_ops_ . ClientSendClose ( ) ;
writes_done_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
@ -912,21 +886,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
& writes_done_ops_ , /*can_inline=*/ false ) ;
writes_done_ops_ . set_core_cq_tag ( & writes_done_tag_ ) ;
callbacks_outstanding_ . fetch_add ( 1 , std : : memory_order_relaxed ) ;
if ( GPR_UNLIKELY ( corked_write_needed_ ) ) {
writes_done_ops_ . SendInitialMetadata ( & context_ - > send_initial_metadata_ ,
context_ - > initial_metadata_flags ( ) ) ;
corked_write_needed_ = false ;
}
if ( GPR_UNLIKELY ( ! started_ . load ( std : : memory_order_acquire ) ) ) {
grpc : : internal : : MutexLock lock ( & start_mu_ ) ;
if ( GPR_LIKELY ( ! started_ . load ( std : : memory_order_relaxed ) ) ) {
backlog_ . writes_done_ops = true ;
return ;
}
if ( started_ ) {
call_ . PerformOps ( & writes_done_ops_ ) ;
} else {
writes_done_ops_at_start_ = true ;
}
call_ . PerformOps ( & writes_done_ops_ ) ;
}
void AddHold ( int holds ) override {
@ -945,36 +909,10 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
: context_ ( context ) ,
call_ ( call ) ,
reactor_ ( reactor ) ,
start_corked_ ( context_ - > initial_metadata_corked_ ) ,
corked_write_needed_ ( start_corked_ ) {
start_corked_ ( context_ - > initial_metadata_corked_ ) {
this - > BindReactor ( reactor ) ;
// Set up the unchanging parts of the start and write tags and ops.
start_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnReadInitialMetadataDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& start_ops_ , /*can_inline=*/ false ) ;
start_ops_ . RecvInitialMetadata ( context_ ) ;
start_ops_ . set_core_cq_tag ( & start_tag_ ) ;
write_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
reactor_ - > OnWriteDone ( ok ) ;
MaybeFinish ( ) ;
} ,
& write_ops_ , /*can_inline=*/ false ) ;
write_ops_ . set_core_cq_tag ( & write_tag_ ) ;
// Also set up the Finish tag and op set.
finish_ops_ . RecvMessage ( response ) ;
finish_ops_ . AllowNoMessage ( ) ;
finish_tag_ . Set ( call_ . call ( ) , [ this ] ( bool /*ok*/ ) { MaybeFinish ( ) ; } ,
& finish_ops_ ,
/*can_inline=*/ false ) ;
finish_ops_ . ClientRecvStatus ( context_ , & finish_status_ ) ;
finish_ops_ . set_core_cq_tag ( & finish_tag_ ) ;
}
: : grpc_impl : : ClientContext * const context_ ;
@ -985,9 +923,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
grpc : : internal : : CallOpRecvInitialMetadata >
start_ops_ ;
grpc : : internal : : CallbackWithSuccessTag start_tag_ ;
const bool start_corked_ ;
bool corked_write_needed_ ; // no lock needed since only accessed in
// Write/WritesDone which cannot be concurrent
bool start_corked_ ;
grpc : : internal : : CallOpSet < grpc : : internal : : CallOpGenericRecvMessage ,
grpc : : internal : : CallOpClientRecvStatus >
@ -1000,22 +936,17 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
grpc : : internal : : CallOpClientSendClose >
write_ops_ ;
grpc : : internal : : CallbackWithSuccessTag write_tag_ ;
bool write_ops_at_start_ { false } ;
grpc : : internal : : CallOpSet < grpc : : internal : : CallOpSendInitialMetadata ,
grpc : : internal : : CallOpClientSendClose >
writes_done_ops_ ;
grpc : : internal : : CallbackWithSuccessTag writes_done_tag_ ;
bool writes_done_ops_at_start_ { false } ;
struct StartCallBacklog {
bool write_ops = false ;
bool writes_done_ops = false ;
} ;
StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */ ;
// Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
std : : atomic < intptr_t > callbacks_outstanding_ { 3 } ;
std : : atomic_bool started_ { false } ;
grpc : : internal : : Mutex start_mu_ ;
// Minimum of 2 callbacks to pre-register for start and finish
std : : atomic < intptr_t > callbacks_outstanding_ { 2 } ;
bool started_ { false } ;
} ;
template < class Request >
@ -1054,6 +985,7 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
// This call initiates two batches, each with a callback
// 1. Send initial metadata + write + writes done + recv initial metadata
// 2. Read message, recv trailing metadata
started_ = true ;
start_tag_ . Set ( call_ . call ( ) ,
[ this ] ( bool ok ) {
@ -1121,6 +1053,7 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
// This call will have 2 callbacks: start and finish
std : : atomic < intptr_t > callbacks_outstanding_ { 2 } ;
bool started_ { false } ;
} ;
class ClientCallbackUnaryFactory {