diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3cbfac39d26..c4ee2220437 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -68,8 +69,8 @@ typedef struct { * Currently this is only used in completion queues whose completion_type is * GRPC_CQ_NEXT */ typedef struct grpc_cq_event_queue { - /* Mutex to serialize consumers i.e pop() operations */ - gpr_mu queue_mu; + /* spinlock to serialize consumers i.e pop() operations */ + gpr_spinlock queue_lock; gpr_mpscq queue; @@ -142,13 +143,12 @@ static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, static void cq_event_queue_init(grpc_cq_event_queue *q) { gpr_mpscq_init(&q->queue); - gpr_mu_init(&q->queue_mu); + q->queue_lock = GPR_SPINLOCK_INITIALIZER; gpr_atm_no_barrier_store(&q->num_queue_items, 0); } static void cq_event_queue_destroy(grpc_cq_event_queue *q) { gpr_mpscq_destroy(&q->queue); - gpr_mu_destroy(&q->queue_mu); } static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { @@ -157,9 +157,12 @@ static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { } static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { - gpr_mu_lock(&q->queue_mu); - grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); - gpr_mu_unlock(&q->queue_mu); + grpc_cq_completion *c = NULL; + if (gpr_spinlock_trylock(&q->queue_lock)) { + c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + gpr_spinlock_unlock(&q->queue_lock); + } + if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } @@ -170,7 +173,7 @@ static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { /* Note: The counter is not incremented/decremented atomically with push/pop. * The count is only eventually consistent */ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { - return (long) gpr_atm_no_barrier_load(&q->num_queue_items); + return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } grpc_completion_queue *grpc_completion_queue_create_internal(