From 9dd94ea1fa24aee24722867e21059e5735270c44 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Thu, 14 Feb 2019 16:50:20 -0500 Subject: [PATCH] Use std::atomic for CQ data. There was a sub-optimality in the CAS operation. vjpai@ and I decided to move to std::atomic. This commit basically moves CQ data to C++ structures, and makes grpc_cq_event_queue a proper c++ class called CQEventQueue. --- src/core/lib/gprpp/atomic.h | 12 +- src/core/lib/surface/completion_queue.cc | 280 +++++++++++------------ 2 files changed, 139 insertions(+), 153 deletions(-) diff --git a/src/core/lib/gprpp/atomic.h b/src/core/lib/gprpp/atomic.h index 622df1b7889..5bc14d15ea0 100644 --- a/src/core/lib/gprpp/atomic.h +++ b/src/core/lib/gprpp/atomic.h @@ -49,8 +49,9 @@ class Atomic { bool CompareExchangeWeak(T* expected, T desired, MemoryOrder success, MemoryOrder failure) { - return GPR_ATM_INC_CAS_THEN( - storage_.compare_exchange_weak(*expected, desired, success, failure)); + return GPR_ATM_INC_CAS_THEN(storage_.compare_exchange_weak( + *expected, desired, static_cast(success), + static_cast(failure))); } bool CompareExchangeStrong(T* expected, T desired, MemoryOrder success, @@ -74,7 +75,7 @@ class Atomic { // Atomically increment a counter only if the counter value is not zero. // Returns true if increment took place; false if counter is zero. - bool IncrementIfNonzero(MemoryOrder load_order = MemoryOrder::ACQ_REL) { + bool IncrementIfNonzero(MemoryOrder load_order = MemoryOrder::ACQUIRE) { T count = storage_.load(static_cast(load_order)); do { // If zero, we are done (without an increment). If not, we must do a CAS @@ -83,9 +84,8 @@ class Atomic { if (count == 0) { return false; } - } while (!storage_.AtomicCompareExchangeWeak( - &count, count + 1, static_cast(MemoryOrder::ACQ_REL), - static_cast(load_order))); + } while (!CompareExchangeWeak(&count, count + 1, MemoryOrder::ACQ_REL, + load_order)); return true; } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index bfd8445f70e..7d679204bac 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -33,6 +33,7 @@ #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" +#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -44,6 +45,8 @@ grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure"); grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags"); grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount"); +namespace { + // Specifies a cq thread local cache. // The first event that occurs on a thread // with a cq cache will go into that cache, and @@ -84,24 +87,22 @@ typedef struct { grpc_closure* shutdown; } non_polling_poller; -static size_t non_polling_poller_size(void) { - return sizeof(non_polling_poller); -} +size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); } -static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { +void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { non_polling_poller* npp = reinterpret_cast(pollset); gpr_mu_init(&npp->mu); *mu = &npp->mu; } -static void non_polling_poller_destroy(grpc_pollset* pollset) { +void non_polling_poller_destroy(grpc_pollset* pollset) { non_polling_poller* npp = reinterpret_cast(pollset); gpr_mu_destroy(&npp->mu); } -static grpc_error* non_polling_poller_work(grpc_pollset* pollset, - grpc_pollset_worker** worker, - grpc_millis deadline) { +grpc_error* non_polling_poller_work(grpc_pollset* pollset, + grpc_pollset_worker** worker, + grpc_millis deadline) { non_polling_poller* npp = reinterpret_cast(pollset); if (npp->shutdown) return GRPC_ERROR_NONE; if (npp->kicked_without_poller) { @@ -141,8 +142,8 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, return GRPC_ERROR_NONE; } -static grpc_error* non_polling_poller_kick( - grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { +grpc_error* non_polling_poller_kick(grpc_pollset* pollset, + grpc_pollset_worker* specific_worker) { non_polling_poller* p = reinterpret_cast(pollset); if (specific_worker == nullptr) specific_worker = reinterpret_cast(p->root); @@ -159,8 +160,7 @@ static grpc_error* non_polling_poller_kick( return GRPC_ERROR_NONE; } -static void non_polling_poller_shutdown(grpc_pollset* pollset, - grpc_closure* closure) { +void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) { non_polling_poller* p = reinterpret_cast(pollset); GPR_ASSERT(closure != nullptr); p->shutdown = closure; @@ -175,7 +175,7 @@ static void non_polling_poller_shutdown(grpc_pollset* pollset, } } -static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { +const cq_poller_vtable g_poller_vtable_by_poller_type[] = { /* GRPC_CQ_DEFAULT_POLLING */ {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick, grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy}, @@ -188,7 +188,9 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { non_polling_poller_shutdown, non_polling_poller_destroy}, }; -typedef struct cq_vtable { +} // namespace + +struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; void (*init)(void* data, @@ -203,80 +205,116 @@ typedef struct cq_vtable { void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); -} cq_vtable; +}; + +namespace { /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue * (a lockfree multiproducer single consumer queue). It uses a queue_lock * to support multiple consumers. * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */ -typedef struct grpc_cq_event_queue { +class CqEventQueue { + public: + CqEventQueue() { gpr_mpscq_init(&queue_); } + ~CqEventQueue() { gpr_mpscq_destroy(&queue_); } + + /* Note: The counter is not incremented/decremented atomically with push/pop. + * The count is only eventually consistent */ + intptr_t num_items() const { + return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED); + } + + bool Push(grpc_cq_completion* c); + grpc_cq_completion* Pop(); + + private: /* Spinlock to serialize consumers i.e pop() operations */ - gpr_spinlock queue_lock; + gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER; - gpr_mpscq queue; + gpr_mpscq queue_; /* A lazy counter of number of items in the queue. This is NOT atomically incremented/decremented along with push/pop operations and hence is only eventually consistent */ - gpr_atm num_queue_items; -} grpc_cq_event_queue; + grpc_core::Atomic num_queue_items_{0}; +}; + +struct cq_next_data { + ~cq_next_data() { GPR_ASSERT(queue.num_items() == 0); } -typedef struct cq_next_data { /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; + CqEventQueue queue; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ - gpr_atm things_queued_ever; + grpc_core::Atomic things_queued_ever{0}; - /* Number of outstanding events (+1 if not shut down) */ - gpr_atm pending_events; + /** Number of outstanding events (+1 if not shut down) + Initial count is dropped by grpc_completion_queue_shutdown */ + grpc_core::Atomic pending_events{1}; /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called; -} cq_next_data; + bool shutdown_called = false; +}; + +struct cq_pluck_data { + cq_pluck_data() { + completed_tail = &completed_head; + completed_head.next = reinterpret_cast(completed_tail); + } + + ~cq_pluck_data() { + GPR_ASSERT(completed_head.next == + reinterpret_cast(&completed_head)); + } -typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion* completed_tail; - /** Number of pending events (+1 if we're not shutdown) */ - gpr_atm pending_events; + /** Number of pending events (+1 if we're not shutdown). + Initial count is dropped by grpc_completion_queue_shutdown. */ + grpc_core::Atomic pending_events{1}; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ - gpr_atm things_queued_ever; + grpc_core::Atomic things_queued_ever{0}; /** 0 initially. 1 once we completed shutting */ /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if * (pending_events == 0). So consider removing this in future and use * pending_events */ - gpr_atm shutdown; + grpc_core::Atomic shutdown{false}; /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called; + bool shutdown_called = false; - int num_pluckers; + int num_pluckers = 0; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; -} cq_pluck_data; +}; -typedef struct cq_callback_data { +struct cq_callback_data { + cq_callback_data( + grpc_experimental_completion_queue_functor* shutdown_callback) + : shutdown_callback(shutdown_callback) {} /** No actual completed events queue, unlike other types */ - /** Number of pending events (+1 if we're not shutdown) */ - gpr_atm pending_events; + /** Number of pending events (+1 if we're not shutdown). + Initial count is dropped by grpc_completion_queue_shutdown. */ + grpc_core::Atomic pending_events{1}; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ - gpr_atm things_queued_ever; + grpc_core::Atomic things_queued_ever{0}; /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called; + bool shutdown_called = false; /** A callback that gets invoked when the CQ completes shutdown */ grpc_experimental_completion_queue_functor* shutdown_callback; -} cq_callback_data; +}; + +} // namespace /* Completion queue structure */ struct grpc_completion_queue { @@ -408,7 +446,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, storage->done(storage->done_arg, storage); ret = 1; cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(cq); @@ -422,31 +460,21 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, return ret; } -static void cq_event_queue_init(grpc_cq_event_queue* q) { - gpr_mpscq_init(&q->queue); - q->queue_lock = GPR_SPINLOCK_INITIALIZER; - gpr_atm_no_barrier_store(&q->num_queue_items, 0); +bool CqEventQueue::Push(grpc_cq_completion* c) { + gpr_mpscq_push(&queue_, reinterpret_cast(c)); + return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0; } -static void cq_event_queue_destroy(grpc_cq_event_queue* q) { - gpr_mpscq_destroy(&q->queue); -} - -static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { - gpr_mpscq_push(&q->queue, reinterpret_cast(c)); - return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; -} - -static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { +grpc_cq_completion* CqEventQueue::Pop() { grpc_cq_completion* c = nullptr; - if (gpr_spinlock_trylock(&q->queue_lock)) { + if (gpr_spinlock_trylock(&queue_lock_)) { GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); bool is_empty = false; c = reinterpret_cast( - gpr_mpscq_pop_and_check_end(&q->queue, &is_empty)); - gpr_spinlock_unlock(&q->queue_lock); + gpr_mpscq_pop_and_check_end(&queue_, &is_empty)); + gpr_spinlock_unlock(&queue_lock_); if (c == nullptr && !is_empty) { GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(); @@ -456,18 +484,12 @@ static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { } if (c) { - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); + num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED); } return c; } -/* Note: The counter is not incremented/decremented atomically with push/pop. - * The count is only eventually consistent */ -static long cq_event_queue_num_items(grpc_cq_event_queue* q) { - return static_cast(gpr_atm_no_barrier_load(&q->num_queue_items)); -} - grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_experimental_completion_queue_functor* shutdown_callback) { @@ -507,49 +529,33 @@ grpc_completion_queue* grpc_completion_queue_create_internal( static void cq_init_next( void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - cq_next_data* cqd = static_cast(data); - /* Initial count is dropped by grpc_completion_queue_shutdown */ - gpr_atm_no_barrier_store(&cqd->pending_events, 1); - cqd->shutdown_called = false; - gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); - cq_event_queue_init(&cqd->queue); + new (data) cq_next_data(); } static void cq_destroy_next(void* data) { cq_next_data* cqd = static_cast(data); - GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); - cq_event_queue_destroy(&cqd->queue); + cqd->~cq_next_data(); } static void cq_init_pluck( void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - cq_pluck_data* cqd = static_cast(data); - /* Initial count is dropped by grpc_completion_queue_shutdown */ - gpr_atm_no_barrier_store(&cqd->pending_events, 1); - cqd->completed_tail = &cqd->completed_head; - cqd->completed_head.next = (uintptr_t)cqd->completed_tail; - gpr_atm_no_barrier_store(&cqd->shutdown, 0); - cqd->shutdown_called = false; - cqd->num_pluckers = 0; - gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + new (data) cq_pluck_data(); } static void cq_destroy_pluck(void* data) { cq_pluck_data* cqd = static_cast(data); - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); + cqd->~cq_pluck_data(); } static void cq_init_callback( void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - cq_callback_data* cqd = static_cast(data); - /* Initial count is dropped by grpc_completion_queue_shutdown */ - gpr_atm_no_barrier_store(&cqd->pending_events, 1); - cqd->shutdown_called = false; - gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); - cqd->shutdown_callback = shutdown_callback; + new (data) cq_callback_data(shutdown_callback); } -static void cq_destroy_callback(void* data) {} +static void cq_destroy_callback(void* data) { + cq_callback_data* cqd = static_cast(data); + cqd->~cq_callback_data(); +} grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { return cq->vtable->cq_completion_type; @@ -632,37 +638,19 @@ static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {} #endif -/* Atomically increments a counter only if the counter is not zero. Returns - * true if the increment was successful; false if the counter is zero */ -static bool atm_inc_if_nonzero(gpr_atm* counter) { - while (true) { - gpr_atm count = gpr_atm_acq_load(counter); - /* If zero, we are done. If not, we must to a CAS (instead of an atomic - * increment) to maintain the contract: do not increment the counter if it - * is zero. */ - if (count == 0) { - return false; - } else if (gpr_atm_full_cas(counter, count, count + 1)) { - break; - } - } - - return true; -} - static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) { cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); - return atm_inc_if_nonzero(&cqd->pending_events); + return cqd->pending_events.IncrementIfNonzero(); } static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) { cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); - return atm_inc_if_nonzero(&cqd->pending_events); + return cqd->pending_events.IncrementIfNonzero(); } static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) { cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); - return atm_inc_if_nonzero(&cqd->pending_events); + return cqd->pending_events.IncrementIfNonzero(); } bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { @@ -716,17 +704,14 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, gpr_tls_set(&g_cached_event, (intptr_t)storage); } else { /* Add the completion to the queue */ - bool is_first = cq_event_queue_push(&cqd->queue, storage); - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - + bool is_first = cqd->queue.Push(storage); + cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); /* Since we do not hold the cq lock here, it is important to do an 'acquire' load here (instead of a 'no_barrier' load) to match with the release store - (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + (done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next */ - bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; - - if (!will_definitely_shutdown) { + if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) { /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cq->mu); @@ -740,7 +725,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, GRPC_ERROR_UNREF(kick_error); } } - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == + 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(cq); @@ -749,7 +735,7 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, } } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); - gpr_atm_rel_store(&cqd->pending_events, 0); + cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); @@ -795,12 +781,12 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); cqd->completed_tail->next = ((uintptr_t)storage) | (1u & cqd->completed_tail->next); cqd->completed_tail = storage; - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_pluck(cq); gpr_mu_unlock(cq->mu); } else { @@ -856,8 +842,8 @@ static void cq_end_op_for_callback( cq_check_tag(cq, tag, true); /* Used in debug builds only */ - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED); + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_callback(cq); } @@ -893,20 +879,20 @@ class ExecCtxNext : public grpc_core::ExecCtx { cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == nullptr); - gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + intptr_t current_last_seen_things_queued_ever = + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty * might return NULL in some cases even if the queue is not empty; but * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ - a->stolen_completion = cq_event_queue_pop(&cqd->queue); + a->stolen_completion = cqd->queue.Pop(); if (a->stolen_completion != nullptr) { return true; } @@ -965,7 +951,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { - gpr_atm_no_barrier_load(&cqd->things_queued_ever), + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED), cq, deadline_millis, nullptr, @@ -985,7 +971,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, break; } - grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue); + grpc_cq_completion* c = cqd->queue.Pop(); if (c != nullptr) { ret.type = GRPC_OP_COMPLETE; @@ -999,16 +985,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, so that the thread comes back quickly from poll to make a second attempt at popping. Not doing this can potentially deadlock this thread forever (if the deadline is infinity) */ - if (cq_event_queue_num_items(&cqd->queue) > 0) { + if (cqd->queue.num_items() > 0) { iteration_deadline = 0; } } - if (gpr_atm_acq_load(&cqd->pending_events) == 0) { + if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ - if (cq_event_queue_num_items(&cqd->queue) > 0) { + if (cqd->queue.num_items() > 0) { /* Go to the beginning of the loop. No point doing a poll because (cq->shutdown == true) is only possible when there is no pending work (i.e cq->pending_events == 0) and any outstanding completion @@ -1049,8 +1035,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, is_finished_arg.first_loop = false; } - if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_acq_load(&cqd->pending_events) > 0) { + if (cqd->queue.num_items() > 0 && + cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) { gpr_mu_lock(cq->mu); cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); @@ -1074,7 +1060,7 @@ static void cq_finish_shutdown_next(grpc_completion_queue* cq) { cq_next_data* cqd = static_cast DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } @@ -1096,10 +1082,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) { return; } cqd->shutdown_called = true; - /* Doing a full_fetch_add (i.e acq/release) here to match with + /* Doing acq/release FetchSub here to match with * cq_begin_op_for_next and cq_end_op_for_next functions which read/write * on this counter without necessarily holding a lock on cq */ - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_next(cq); } gpr_mu_unlock(cq->mu); @@ -1148,12 +1134,12 @@ class ExecCtxPluck : public grpc_core::ExecCtx { GPR_ASSERT(a->stolen_completion == nullptr); gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED); grpc_cq_completion* c; grpc_cq_completion* prev = &cqd->completed_head; while ((c = (grpc_cq_completion*)(prev->next & @@ -1209,7 +1195,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_mu_lock(cq->mu); grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); cq_is_finished_arg is_finished_arg = { - gpr_atm_no_barrier_load(&cqd->things_queued_ever), + cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED), cq, deadline_millis, nullptr, @@ -1246,7 +1232,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, } prev = c; } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) { gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; @@ -1309,8 +1295,8 @@ static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) { cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); + GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)); + cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } @@ -1334,7 +1320,7 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) { return; } cqd->shutdown_called = true; - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { cq_finish_shutdown_pluck(cq); } gpr_mu_unlock(cq->mu); @@ -1368,7 +1354,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { return; } cqd->shutdown_called = true; - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); } else {