diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c4b8d60782f..6a1d83ce5d4 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -71,6 +71,7 @@ struct grpc_completion_queue { grpc_pollset pollset; /* 0 initially, 1 once we've begun shutting down */ int shutdown; + int shutdown_called; /* Head of a linked list of queued events (prev points to the last element) */ event *queue; /* Fixed size chained hash table of events for pluck() */ @@ -107,7 +108,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, grpc_event_finish_func on_finish, void *user_data) { event *ev = gpr_malloc(sizeof(event)); gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - GPR_ASSERT(!cc->shutdown); ev->base.type = type; ev->base.tag = tag; ev->base.call = call; @@ -150,6 +150,7 @@ static void end_op_locked(grpc_completion_queue *cc, #endif if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); } @@ -380,6 +381,10 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->shutdown_called = 1; + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_unref(&cc->refs)) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown);