|
|
|
@ -36,17 +36,17 @@ namespace { |
|
|
|
|
internal::GrpcLibraryInitializer g_gli_initializer; |
|
|
|
|
|
|
|
|
|
gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT; |
|
|
|
|
grpc_core::ManualConstructor<grpc_core::Mutex> g_callback_alternative_mu; |
|
|
|
|
grpc_core::Mutex* g_callback_alternative_mu; |
|
|
|
|
|
|
|
|
|
// Implement a ref-counted callback CQ for global use in the alternative
|
|
|
|
|
// implementation so that its threads are only created once. Do this using
|
|
|
|
|
// explicit ref-counts and raw pointers rather than a shared-ptr since that
|
|
|
|
|
// has a non-trivial destructor and thus can't be used for global variables.
|
|
|
|
|
struct CallbackAlternativeCQ { |
|
|
|
|
int refs = 0; // GUARDED_BY(g_callback_alternative_mu);
|
|
|
|
|
CompletionQueue* cq; // GUARDED_BY(g_callback_alternative_mu);
|
|
|
|
|
std::vector<grpc_core::Thread>* |
|
|
|
|
nexting_threads; // GUARDED_BY(g_callback_alternative_mu);
|
|
|
|
|
int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0; |
|
|
|
|
CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu); |
|
|
|
|
std::vector<grpc_core::Thread>* nexting_threads |
|
|
|
|
ABSL_GUARDED_BY(g_callback_alternative_mu); |
|
|
|
|
|
|
|
|
|
CompletionQueue* Ref() { |
|
|
|
|
grpc_core::MutexLock lock(&*g_callback_alternative_mu); |
|
|
|
@ -104,7 +104,7 @@ struct CallbackAlternativeCQ { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Unref() { |
|
|
|
|
grpc_core::MutexLock lock(&*g_callback_alternative_mu); |
|
|
|
|
grpc_core::MutexLock lock(g_callback_alternative_mu); |
|
|
|
|
refs--; |
|
|
|
|
if (refs == 0) { |
|
|
|
|
cq->Shutdown(); |
|
|
|
@ -191,12 +191,15 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { |
|
|
|
|
|
|
|
|
|
CompletionQueue* CompletionQueue::CallbackAlternativeCQ() { |
|
|
|
|
gpr_once_init(&g_once_init_callback_alternative, |
|
|
|
|
[] { g_callback_alternative_mu.Init(); }); |
|
|
|
|
[] { g_callback_alternative_mu = new grpc_core::Mutex(); }); |
|
|
|
|
return g_callback_alternative_cq.Ref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq) { |
|
|
|
|
void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq) |
|
|
|
|
ABSL_NO_THREAD_SAFETY_ANALYSIS { |
|
|
|
|
(void)cq; |
|
|
|
|
// This accesses g_callback_alternative_cq without acquiring the mutex
|
|
|
|
|
// but it's considered safe because it just reads the pointer address.
|
|
|
|
|
GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq); |
|
|
|
|
g_callback_alternative_cq.Unref(); |
|
|
|
|
} |
|
|
|
|