@ -23,6 +23,7 @@
# include <atomic>
# include <atomic>
# include <functional>
# include <functional>
# include <memory>
# include <memory>
# include <thread>
# include <utility>
# include <utility>
# include <grpc/support/log.h>
# include <grpc/support/log.h>
@ -47,6 +48,12 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
void DrainQueue ( ) ;
void DrainQueue ( ) ;
void Orphan ( ) override ;
void Orphan ( ) override ;
# ifndef NDEBUG
bool RunningInWorkSerializer ( ) const {
return std : : this_thread : : get_id ( ) = = current_thread_ ;
}
# endif
private :
private :
struct CallbackWrapper {
struct CallbackWrapper {
CallbackWrapper ( std : : function < void ( ) > cb , const DebugLocation & loc )
CallbackWrapper ( std : : function < void ( ) > cb , const DebugLocation & loc )
@ -86,6 +93,9 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
// orphaned.
// orphaned.
std : : atomic < uint64_t > refs_ { MakeRefPair ( 0 , 1 ) } ;
std : : atomic < uint64_t > refs_ { MakeRefPair ( 0 , 1 ) } ;
MultiProducerSingleConsumerQueue queue_ ;
MultiProducerSingleConsumerQueue queue_ ;
# ifndef NDEBUG
std : : thread : : id current_thread_ ;
# endif
} ;
} ;
void WorkSerializer : : WorkSerializerImpl : : Run ( std : : function < void ( ) > callback ,
void WorkSerializer : : WorkSerializerImpl : : Run ( std : : function < void ( ) > callback ,
@ -102,10 +112,17 @@ void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
GPR_DEBUG_ASSERT ( GetSize ( prev_ref_pair ) > 0 ) ;
GPR_DEBUG_ASSERT ( GetSize ( prev_ref_pair ) > 0 ) ;
if ( GetOwners ( prev_ref_pair ) = = 0 ) {
if ( GetOwners ( prev_ref_pair ) = = 0 ) {
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
# ifndef NDEBUG
current_thread_ = std : : this_thread : : get_id ( ) ;
# endif
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_work_serializer_trace ) ) {
if ( GRPC_TRACE_FLAG_ENABLED ( grpc_work_serializer_trace ) ) {
gpr_log ( GPR_INFO , " Executing immediately " ) ;
gpr_log ( GPR_INFO , " Executing immediately " ) ;
}
}
callback ( ) ;
callback ( ) ;
// Delete the callback while still holding the WorkSerializer, so
// that any refs being held by the callback via lambda captures will
// be destroyed inside the WorkSerializer.
callback = nullptr ;
DrainQueueOwned ( ) ;
DrainQueueOwned ( ) ;
} else {
} else {
// Another thread is holding the WorkSerializer, so decrement the ownership
// Another thread is holding the WorkSerializer, so decrement the ownership
@ -158,6 +175,9 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
const uint64_t prev_ref_pair =
const uint64_t prev_ref_pair =
refs_ . fetch_add ( MakeRefPair ( 1 , 1 ) , std : : memory_order_acq_rel ) ;
refs_ . fetch_add ( MakeRefPair ( 1 , 1 ) , std : : memory_order_acq_rel ) ;
if ( GetOwners ( prev_ref_pair ) = = 0 ) {
if ( GetOwners ( prev_ref_pair ) = = 0 ) {
# ifndef NDEBUG
current_thread_ = std : : this_thread : : get_id ( ) ;
# endif
// We took ownership of the WorkSerializer. Drain the queue.
// We took ownership of the WorkSerializer. Drain the queue.
DrainQueueOwned ( ) ;
DrainQueueOwned ( ) ;
} else {
} else {
@ -186,6 +206,12 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
}
}
if ( GetSize ( prev_ref_pair ) = = 2 ) {
if ( GetSize ( prev_ref_pair ) = = 2 ) {
// Queue drained. Give up ownership but only if queue remains empty.
// Queue drained. Give up ownership but only if queue remains empty.
# ifndef NDEBUG
// Reset current_thread_ before giving up ownership to avoid TSAN
// race. If we don't wind up giving up ownership, we'll set this
// again below before we pull the next callback out of the queue.
current_thread_ = std : : thread : : id ( ) ;
# endif
uint64_t expected = MakeRefPair ( 1 , 1 ) ;
uint64_t expected = MakeRefPair ( 1 , 1 ) ;
if ( refs_ . compare_exchange_strong ( expected , MakeRefPair ( 0 , 1 ) ,
if ( refs_ . compare_exchange_strong ( expected , MakeRefPair ( 0 , 1 ) ,
std : : memory_order_acq_rel ) ) {
std : : memory_order_acq_rel ) ) {
@ -200,6 +226,10 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
delete this ;
delete this ;
return ;
return ;
}
}
# ifndef NDEBUG
// Didn't wind up giving up ownership, so set current_thread_ again.
current_thread_ = std : : this_thread : : get_id ( ) ;
# endif
}
}
// There is at least one callback on the queue. Pop the callback from the
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
// queue and execute it.
@ -244,4 +274,10 @@ void WorkSerializer::Schedule(std::function<void()> callback,
void WorkSerializer : : DrainQueue ( ) { impl_ - > DrainQueue ( ) ; }
void WorkSerializer : : DrainQueue ( ) { impl_ - > DrainQueue ( ) ; }
# ifndef NDEBUG
bool WorkSerializer : : RunningInWorkSerializer ( ) const {
return impl_ - > RunningInWorkSerializer ( ) ;
}
# endif
} // namespace grpc_core
} // namespace grpc_core