clang-format

pull/12789/head
Craig Tiller 7 years ago
parent 3d073c261e
commit 29a9c3af38
  1. 68
      src/core/lib/iomgr/ev_epollex_linux.c

@ -49,7 +49,8 @@
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/spinlock.h"
// debug aid: create workers on the heap (allows asan to spot use-after-destruction) // debug aid: create workers on the heap (allows asan to spot
// use-after-destruction)
#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 #define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
#ifndef NDEBUG #ifndef NDEBUG
@ -434,13 +435,13 @@ static grpc_error *pollable_create(pollable_type type, pollable **p) {
} }
struct epoll_event ev; struct epoll_event ev;
ev.events = (uint32_t)(EPOLLIN | EPOLLET); ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = (void*)(1 | (intptr_t)&(*p)->wakeup); ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) { if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) {
err = GRPC_OS_ERROR(errno, "epoll_ctl"); err = GRPC_OS_ERROR(errno, "epoll_ctl");
close(epfd); close(epfd);
grpc_wakeup_fd_destroy(&(*p)->wakeup); grpc_wakeup_fd_destroy(&(*p)->wakeup);
gpr_free(*p); gpr_free(*p);
*p = NULL; *p = NULL;
return err; return err;
} }
@ -461,7 +462,8 @@ static pollable *pollable_ref(pollable *p) {
static pollable *pollable_ref(pollable *p, int line, const char *reason) { static pollable *pollable_ref(pollable *p, int line, const char *reason) {
if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) {
int r = (int)gpr_atm_no_barrier_load(&p->refs.count); int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, "POLLABLE:%p ref %d->%d %s", p, r, r+1, reason); gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
"POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason);
} }
#endif #endif
gpr_ref(&p->refs); gpr_ref(&p->refs);
@ -475,7 +477,8 @@ static void pollable_unref(pollable *p, int line, const char *reason) {
if (p == NULL) return; if (p == NULL) return;
if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) {
int r = (int)gpr_atm_no_barrier_load(&p->refs.count); int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, "POLLABLE:%p unref %d->%d %s", p, r, r-1, reason); gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
"POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason);
} }
#endif #endif
if (p != NULL && gpr_unref(&p->refs)) { if (p != NULL && gpr_unref(&p->refs)) {
@ -543,7 +546,7 @@ static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) { grpc_pollset_worker *specific_worker) {
pollable *p = specific_worker->pollable_obj; pollable *p = specific_worker->pollable_obj;
GPR_ASSERT(specific_worker != NULL); GPR_ASSERT(specific_worker != NULL);
if (specific_worker->kicked) { if (specific_worker->kicked) {
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
@ -581,8 +584,10 @@ static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p",
pollset, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), pollset, specific_worker,
(void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker); (void *)gpr_tls_get(&g_current_thread_pollset),
(void *)gpr_tls_get(&g_current_thread_worker),
pollset->root_worker);
} }
if (specific_worker == NULL) { if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
@ -621,13 +626,13 @@ static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
const char *err_desc = "pollset_kick_all"; const char *err_desc = "pollset_kick_all";
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
grpc_pollset_worker *w = pollset->root_worker; grpc_pollset_worker *w = pollset->root_worker;
if (w!=NULL) { if (w != NULL) {
do { do {
append_error(&error, pollset_kick_one(exec_ctx, pollset, w), err_desc); append_error(&error, pollset_kick_one(exec_ctx, pollset, w), err_desc);
w = w->links[PWLINK_POLLSET].next; w = w->links[PWLINK_POLLSET].next;
} while (w != pollset->root_worker); } while (w != pollset->root_worker);
} }
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
return error; return error;
} }
@ -690,7 +695,7 @@ static grpc_error *fd_become_pollable(grpc_fd *fd, pollable **p) {
if (fd->pollable_obj == NULL) { if (fd->pollable_obj == NULL) {
if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
err_desc)) { err_desc)) {
fd->pollable_obj->owner_fd = fd; fd->pollable_obj->owner_fd = fd;
if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd), if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd),
err_desc)) { err_desc)) {
POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
@ -850,7 +855,8 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->initialized_cv = false; worker->initialized_cv = false;
worker->kicked = false; worker->kicked = false;
worker->pollset = pollset; worker->pollset = pollset;
worker->pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_worker"); worker->pollable_obj =
POLLABLE_REF(pollset->active_pollable, "pollset_worker");
worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET); worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET);
gpr_mu_lock(&worker->pollable_obj->mu); gpr_mu_lock(&worker->pollable_obj->mu);
if (!worker_insert(&worker->pollable_obj->root_worker, worker, if (!worker_insert(&worker->pollable_obj->root_worker, worker,
@ -930,7 +936,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64
".%09d deadline=%" PRId64 ".%09d kwp=%d pollable=%p", ".%09d deadline=%" PRId64 ".%09d kwp=%d pollable=%p",
pollset, worker_hdl, WORKER_PTR, now.tv_sec, now.tv_nsec, pollset, worker_hdl, WORKER_PTR, now.tv_sec, now.tv_nsec,
deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, pollset->active_pollable); deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller,
pollset->active_pollable);
} }
static const char *err_desc = "pollset_work"; static const char *err_desc = "pollset_work";
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
@ -943,7 +950,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutdown_closure); GPR_ASSERT(!pollset->shutdown_closure);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
if (pollset->event_cursor == pollset->event_count) { if (pollset->event_cursor == pollset->event_count) {
append_error(&error, pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj, append_error(&error,
pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj,
now, deadline), now, deadline),
err_desc); err_desc);
} }
@ -967,7 +975,8 @@ static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked(
static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd"; static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd";
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p (%d); transition pollable from empty to fd", gpr_log(GPR_DEBUG,
"PS:%p add fd %p (%d); transition pollable from empty to fd",
pollset, fd, fd->fd); pollset, fd, fd->fd);
} }
append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
@ -982,9 +991,11 @@ static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked(
static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi"; static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi";
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
if (GRPC_TRACER_ON(grpc_polling_trace)) { if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, gpr_log(
GPR_DEBUG,
"PS:%p add fd %p (%d); transition pollable from fd %p to multipoller", "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
pollset, and_add_fd, and_add_fd?and_add_fd->fd:-1, pollset->active_pollable->owner_fd); pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1,
pollset->active_pollable->owner_fd);
} }
append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
grpc_fd *initial_fd = pollset->active_pollable->owner_fd; grpc_fd *initial_fd = pollset->active_pollable->owner_fd;
@ -1007,7 +1018,8 @@ static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked(
static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, grpc_fd *fd) { grpc_pollset *pollset, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
pollable *po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_add_fd"); pollable *po_at_start =
POLLABLE_REF(pollset->active_pollable, "pollset_add_fd");
switch (pollset->active_pollable->type) { switch (pollset->active_pollable->type) {
case PO_EMPTY: case PO_EMPTY:
/* empty pollable --> single fd pollable */ /* empty pollable --> single fd pollable */
@ -1037,7 +1049,8 @@ static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx,
pollable **pollable_obj) { pollable **pollable_obj) {
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
pollable *po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); pollable *po_at_start =
POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable");
switch (pollset->active_pollable->type) { switch (pollset->active_pollable->type) {
case PO_EMPTY: case PO_EMPTY:
POLLABLE_UNREF(pollset->active_pollable, "pollset"); POLLABLE_UNREF(pollset->active_pollable, "pollset");
@ -1091,16 +1104,15 @@ static grpc_pollset_set *pollset_set_create(void) {
return pss; return pss;
} }
static void pollset_set_unref(grpc_exec_ctx *exec_ctx, static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) {
grpc_pollset_set *pss) {
if (pss == NULL) return; if (pss == NULL) return;
if (!gpr_unref(&pss->refs)) return; if (!gpr_unref(&pss->refs)) return;
pollset_set_unref(exec_ctx, pss->parent); pollset_set_unref(exec_ctx, pss->parent);
gpr_mu_destroy(&pss->mu); gpr_mu_destroy(&pss->mu);
for (size_t i=0; i<pss->pollset_count; i++) { for (size_t i = 0; i < pss->pollset_count; i++) {
POLLABLE_UNREF(pss->pollsets[i], "pollset_set"); POLLABLE_UNREF(pss->pollsets[i], "pollset_set");
} }
for (size_t i=0;i<pss->fd_count; i++) { for (size_t i = 0; i < pss->fd_count; i++) {
UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set"); UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
} }
gpr_free(pss->pollsets); gpr_free(pss->pollsets);
@ -1161,7 +1173,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
pollable *pollable_obj = NULL; pollable *pollable_obj = NULL;
if (!GRPC_LOG_IF_ERROR( if (!GRPC_LOG_IF_ERROR(
err_desc, pollset_as_multipollable(exec_ctx, ps, &pollable_obj))) { err_desc, pollset_as_multipollable(exec_ctx, ps, &pollable_obj))) {
GPR_ASSERT(pollable_obj==NULL); GPR_ASSERT(pollable_obj == NULL);
return; return;
} }
pss = pss_lock_adam(pss); pss = pss_lock_adam(pss);

Loading…
Cancel
Save