|
|
|
@ -28,6 +28,7 @@ |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
|
#include <grpc/support/time.h> |
|
|
|
|
#include <grpc/support/tls.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/debug/stats.h" |
|
|
|
|
#include "src/core/lib/iomgr/pollset.h" |
|
|
|
@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount = |
|
|
|
|
GRPC_TRACER_INITIALIZER(false, "cq_refcount"); |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
// Specifies a cq thread local cache.
|
|
|
|
|
// The first event that occurs on a thread
|
|
|
|
|
// with a cq cache will go into that cache, and
|
|
|
|
|
// will only be returned on the thread that initialized the cache.
|
|
|
|
|
// NOTE: Only one event will ever be cached.
|
|
|
|
|
GPR_TLS_DECL(g_cached_event); |
|
|
|
|
GPR_TLS_DECL(g_cached_cq); |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
grpc_pollset_worker **worker; |
|
|
|
|
void *tag; |
|
|
|
@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = |
|
|
|
|
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, |
|
|
|
|
grpc_error *error); |
|
|
|
|
|
|
|
|
|
void grpc_cq_global_init() { |
|
|
|
|
gpr_tls_init(&g_cached_event); |
|
|
|
|
gpr_tls_init(&g_cached_cq); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) { |
|
|
|
|
if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) { |
|
|
|
|
gpr_tls_set(&g_cached_event, (intptr_t)0); |
|
|
|
|
gpr_tls_set(&g_cached_cq, (intptr_t)cq); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, |
|
|
|
|
void **tag, int *ok) { |
|
|
|
|
grpc_cq_completion *storage = |
|
|
|
|
(grpc_cq_completion *)gpr_tls_get(&g_cached_event); |
|
|
|
|
int ret = 0; |
|
|
|
|
if (storage != NULL && |
|
|
|
|
(grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) { |
|
|
|
|
*tag = storage->tag; |
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
storage->done(&exec_ctx, storage->done_arg, storage); |
|
|
|
|
*ok = (storage->next & (uintptr_t)(1)) == 1; |
|
|
|
|
ret = 1; |
|
|
|
|
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); |
|
|
|
|
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { |
|
|
|
|
GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
cq_finish_shutdown_next(&exec_ctx, cq); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); |
|
|
|
|
} |
|
|
|
|
grpc_exec_ctx_finish(&exec_ctx); |
|
|
|
|
} |
|
|
|
|
gpr_tls_set(&g_cached_event, (intptr_t)0); |
|
|
|
|
gpr_tls_set(&g_cached_cq, (intptr_t)0); |
|
|
|
|
|
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_event_queue_init(grpc_cq_event_queue *q) { |
|
|
|
|
gpr_mpscq_init(&q->queue); |
|
|
|
|
q->queue_lock = GPR_SPINLOCK_INITIALIZER; |
|
|
|
@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); |
|
|
|
|
int is_success = (error == GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
|
@ -628,44 +676,50 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
cq_check_tag(cq, tag, true); /* Used in debug builds only */ |
|
|
|
|
|
|
|
|
|
/* Add the completion to the queue */ |
|
|
|
|
bool is_first = cq_event_queue_push(&cqd->queue, storage); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); |
|
|
|
|
|
|
|
|
|
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
|
|
|
|
|
load here (instead of a 'no_barrier' load) to match with the release store |
|
|
|
|
(done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next |
|
|
|
|
*/ |
|
|
|
|
bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; |
|
|
|
|
|
|
|
|
|
if (!will_definitely_shutdown) { |
|
|
|
|
/* Only kick if this is the first item queued */ |
|
|
|
|
if (is_first) { |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
grpc_error *kick_error = |
|
|
|
|
cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq && |
|
|
|
|
(grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) { |
|
|
|
|
gpr_tls_set(&g_cached_event, (intptr_t)storage); |
|
|
|
|
} else { |
|
|
|
|
/* Add the completion to the queue */ |
|
|
|
|
bool is_first = cq_event_queue_push(&cqd->queue, storage); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); |
|
|
|
|
|
|
|
|
|
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
|
|
|
|
|
load here (instead of a 'no_barrier' load) to match with the release |
|
|
|
|
store |
|
|
|
|
(done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next |
|
|
|
|
*/ |
|
|
|
|
bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; |
|
|
|
|
|
|
|
|
|
if (!will_definitely_shutdown) { |
|
|
|
|
/* Only kick if this is the first item queued */ |
|
|
|
|
if (is_first) { |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
grpc_error *kick_error = |
|
|
|
|
cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
|
|
|
|
|
if (kick_error != GRPC_ERROR_NONE) { |
|
|
|
|
const char *msg = grpc_error_string(kick_error); |
|
|
|
|
gpr_log(GPR_ERROR, "Kick failed: %s", msg); |
|
|
|
|
GRPC_ERROR_UNREF(kick_error); |
|
|
|
|
if (kick_error != GRPC_ERROR_NONE) { |
|
|
|
|
const char *msg = grpc_error_string(kick_error); |
|
|
|
|
gpr_log(GPR_ERROR, "Kick failed: %s", msg); |
|
|
|
|
GRPC_ERROR_UNREF(kick_error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { |
|
|
|
|
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { |
|
|
|
|
GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
cq_finish_shutdown_next(exec_ctx, cq); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); |
|
|
|
|
gpr_atm_rel_store(&cqd->pending_events, 0); |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
cq_finish_shutdown_next(exec_ctx, cq); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); |
|
|
|
|
gpr_atm_rel_store(&cqd->pending_events, 0); |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
cq_finish_shutdown_next(exec_ctx, cq); |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_TIMER_END("cq_end_op_for_next", 0); |
|
|
|
|