Reverting suffixes and changing field names instead

pull/12545/head
Yash Tibrewal 7 years ago
parent 79a89c9acc
commit b2a54ac4cb
  1. 54
      src/core/lib/iomgr/ev_epoll1_linux.c
  2. 186
      src/core/lib/iomgr/ev_epollex_linux.c
  3. 66
      src/core/lib/surface/call.c
  4. 2
      src/core/lib/surface/call.h
  5. 2
      src/core/lib/surface/channel.c

@ -130,9 +130,9 @@ static void fd_global_shutdown(void);
* Pollset Declarations
*/
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t;
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
static const char *kick_state_string(kick_state_t st) {
static const char *kick_state_string(kick_state st) {
switch (st) {
case UNKICKED:
return "UNKICKED";
@ -145,7 +145,7 @@ static const char *kick_state_string(kick_state_t st) {
}
struct grpc_pollset_worker {
kick_state_t kick_state;
kick_state state;
int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
@ -154,9 +154,9 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work;
};
#define SET_KICK_STATE(worker, state) \
#define SET_KICK_STATE(worker, kick_state) \
do { \
(worker)->kick_state = (state); \
(worker)->state = (kick_state); \
(worker)->kick_state_mutator = __LINE__; \
} while (false)
@ -508,7 +508,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
switch (worker->kick_state) {
switch (worker->state) {
case KICKED:
break;
case UNKICKED:
@ -688,7 +688,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_mu_lock(&pollset->mu);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
pollset, worker, kick_state_string(worker->kick_state),
pollset, worker, kick_state_string(worker->state),
is_reassigning);
}
if (pollset->seen_inactive) {
@ -708,12 +708,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
at this point is if it were "kicked specifically". Since the worker has
not added itself to the pollset yet (by calling worker_insert()), it is
not visible in the "kick any" path yet */
if (worker->kick_state == UNKICKED) {
if (worker->state == UNKICKED) {
pollset->seen_inactive = false;
if (neighborhood->active_root == NULL) {
neighborhood->active_root = pollset->next = pollset->prev = pollset;
/* Make this the designated poller if there isn't one already */
if (worker->kick_state == UNKICKED &&
if (worker->state == UNKICKED &&
gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
@ -733,19 +733,19 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker_insert(pollset, worker);
pollset->begin_refs--;
if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
while (worker->state == UNKICKED && !pollset->shutting_down) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
pollset, worker, kick_state_string(worker->kick_state),
pollset, worker, kick_state_string(worker->state),
pollset->shutting_down);
}
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
worker->kick_state == UNKICKED) {
worker->state == UNKICKED) {
/* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
received a kick */
SET_KICK_STATE(worker, KICKED);
@ -758,7 +758,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_log(GPR_ERROR,
"PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
"kicked_without_poller: %d",
pollset, worker, kick_state_string(worker->kick_state),
pollset, worker, kick_state_string(worker->state),
pollset->shutting_down, pollset->kicked_without_poller);
}
@ -778,7 +778,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
GPR_TIMER_END("begin_worker", 0);
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighborhood_for_available_poller(
@ -795,7 +795,7 @@ static bool check_neighborhood_for_available_poller(
grpc_pollset_worker *inspect_worker = inspect->root_worker;
if (inspect_worker != NULL) {
do {
switch (inspect_worker->kick_state) {
switch (inspect_worker->state) {
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
@ -858,7 +858,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->kick_state == UNKICKED) {
if (worker->next != worker && worker->next->state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
}
@ -993,14 +993,14 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
gpr_strvec_add(&log, tmp);
if (pollset->root_worker != NULL) {
gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
kick_state_string(pollset->root_worker->kick_state),
kick_state_string(pollset->root_worker->state),
pollset->root_worker->next,
kick_state_string(pollset->root_worker->next->kick_state));
kick_state_string(pollset->root_worker->next->state));
gpr_strvec_add(&log, tmp);
}
if (specific_worker != NULL) {
gpr_asprintf(&tmp, " worker_kick_state=%s",
kick_state_string(specific_worker->kick_state));
kick_state_string(specific_worker->state));
gpr_strvec_add(&log, tmp);
}
tmp = gpr_strvec_flatten(&log, NULL);
@ -1020,13 +1020,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
}
grpc_pollset_worker *next_worker = root_worker->next;
if (root_worker->kick_state == KICKED) {
if (root_worker->state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
goto done;
} else if (next_worker->kick_state == KICKED) {
} else if (next_worker->state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
}
@ -1043,7 +1043,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
SET_KICK_STATE(root_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (next_worker->kick_state == UNKICKED) {
} else if (next_worker->state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
}
@ -1051,8 +1051,8 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
goto done;
} else if (next_worker->kick_state == DESIGNATED_POLLER) {
if (root_worker->kick_state != DESIGNATED_POLLER) {
} else if (next_worker->state == DESIGNATED_POLLER) {
if (root_worker->state != DESIGNATED_POLLER) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(
GPR_ERROR,
@ -1074,7 +1074,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
}
} else {
GPR_ASSERT(next_worker->kick_state == KICKED);
GPR_ASSERT(next_worker->state == KICKED);
SET_KICK_STATE(next_worker, KICKED);
goto done;
}
@ -1088,7 +1088,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
GPR_UNREACHABLE_CODE(goto done);
}
if (specific_worker->kick_state == KICKED) {
if (specific_worker->state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. specific worker already kicked");
}

@ -97,12 +97,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
* pollable Declarations
*/
typedef struct pollable_t {
typedef struct pollable {
polling_obj po;
int epfd;
grpc_wakeup_fd wakeup;
grpc_pollset_worker *root_worker;
} pollable_t;
} pollable;
static const char *polling_obj_type_string(polling_obj_type t) {
switch (t) {
@ -122,7 +122,7 @@ static const char *polling_obj_type_string(polling_obj_type t) {
return "<invalid>";
}
static char *pollable_desc(pollable_t *p) {
static char *pollable_desc(pollable *p) {
char *out;
gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
polling_obj_type_string(p->po.type), p->po.group, p->epfd,
@ -130,19 +130,19 @@ static char *pollable_desc(pollable_t *p) {
return out;
}
static pollable_t g_empty_pollable;
static pollable g_empty_pollable;
static void pollable_init(pollable_t *p, polling_obj_type type);
static void pollable_destroy(pollable_t *p);
static void pollable_init(pollable *p, polling_obj_type type);
static void pollable_destroy(pollable *p);
/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
static grpc_error *pollable_materialize(pollable_t *p);
static grpc_error *pollable_materialize(pollable *p);
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
pollable_t pollable;
pollable pollable_obj;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@ -193,15 +193,15 @@ struct grpc_pollset_worker {
pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
gpr_cv cv;
grpc_pollset *pollset;
pollable_t *pollable;
pollable *pollable_obj;
};
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
struct grpc_pollset {
pollable_t pollable;
pollable_t *current_pollable;
pollable pollable_obj;
pollable *current_pollable;
int kick_alls_pending;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
@ -282,7 +282,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_fd *fd = (grpc_fd *)arg;
/* Add the fd to the freelist */
grpc_iomgr_unregister_object(&fd->iomgr_object);
pollable_destroy(&fd->pollable);
pollable_destroy(&fd->pollable_obj);
gpr_mu_destroy(&fd->orphaned_mu);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@ -343,7 +343,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
}
pollable_init(&new_fd->pollable, PO_FD);
pollable_init(&new_fd->pollable_obj, PO_FD);
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@ -385,7 +385,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&fd->pollable.po.mu);
gpr_mu_lock(&fd->pollable_obj.po.mu);
gpr_mu_lock(&fd->orphaned_mu);
fd->on_done_closure = on_done;
@ -411,7 +411,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->orphaned_mu);
gpr_mu_unlock(&fd->pollable.po.mu);
gpr_mu_unlock(&fd->pollable_obj.po.mu);
UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
@ -451,13 +451,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
* Pollable Definitions
*/
static void pollable_init(pollable_t *p, polling_obj_type type) {
static void pollable_init(pollable *p, polling_obj_type type) {
po_init(&p->po, type);
p->root_worker = NULL;
p->epfd = -1;
}
static void pollable_destroy(pollable_t *p) {
static void pollable_destroy(pollable *p) {
po_destroy(&p->po);
if (p->epfd != -1) {
close(p->epfd);
@ -466,7 +466,7 @@ static void pollable_destroy(pollable_t *p) {
}
/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
static grpc_error *pollable_materialize(pollable_t *p) {
static grpc_error *pollable_materialize(pollable *p) {
if (p->epfd == -1) {
int new_epfd = epoll_create1(EPOLL_CLOEXEC);
if (new_epfd < 0) {
@ -492,7 +492,7 @@ static grpc_error *pollable_materialize(pollable_t *p) {
}
/* pollable must be materialized */
static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) {
static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollable_add_fd";
const int epfd = p->epfd;
@ -557,30 +557,33 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_unused) {
grpc_error *error = GRPC_ERROR_NONE;
grpc_pollset *pollset = (grpc_pollset *)arg;
gpr_mu_lock(&pollset->pollable.po.mu);
gpr_mu_lock(&pollset->pollable_obj.po.mu);
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
if (worker->pollable != &pollset->pollable) {
gpr_mu_lock(&worker->pollable->po.mu);
if (worker->pollable_obj != &pollset->pollable_obj) {
gpr_mu_lock(&worker->pollable_obj->po.mu);
}
if (worker->initialized_cv && worker != pollset->root_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
pollset, worker, &pollset->pollable, worker->pollable);
pollset, worker, &pollset->pollable_obj,
worker->pollable_obj);
}
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
pollset, worker, &pollset->pollable, worker->pollable);
pollset, worker, &pollset->pollable_obj,
worker->pollable_obj);
}
append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup),
append_error(&error,
grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup),
"pollset_shutdown");
}
if (worker->pollable != &pollset->pollable) {
gpr_mu_unlock(&worker->pollable->po.mu);
if (worker->pollable_obj != &pollset->pollable_obj) {
gpr_mu_unlock(&worker->pollable_obj->po.mu);
}
worker = worker->links[PWL_POLLSET].next;
@ -588,7 +591,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
}
pollset->kick_alls_pending--;
pollset_maybe_finish_shutdown(exec_ctx, pollset);
gpr_mu_unlock(&pollset->pollable.po.mu);
gpr_mu_unlock(&pollset->pollable_obj.po.mu);
GRPC_LOG_IF_ERROR("kick_all", error);
}
@ -599,7 +602,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_ERROR_NONE);
}
static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p,
static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
grpc_pollset_worker *specific_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
@ -664,24 +667,24 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p,
/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
pollable_t *p = pollset->current_pollable;
if (p != &pollset->pollable) {
pollable *p = pollset->current_pollable;
if (p != &pollset->pollable_obj) {
gpr_mu_lock(&p->po.mu);
}
grpc_error *error = pollset_kick_inner(pollset, p, specific_worker);
if (p != &pollset->pollable) {
if (p != &pollset->pollable_obj) {
gpr_mu_unlock(&p->po.mu);
}
return error;
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollable_init(&pollset->pollable, PO_POLLSET);
pollable_init(&pollset->pollable_obj, PO_POLLSET);
pollset->current_pollable = &g_empty_pollable;
pollset->kicked_without_poller = false;
pollset->shutdown_closure = NULL;
pollset->root_worker = NULL;
*mu = &pollset->pollable.po.mu;
*mu = &pollset->pollable_obj.po.mu;
}
/* Convert a timespec to milliseconds:
@ -729,8 +732,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "fd_become_pollable";
if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) {
append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc);
if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) {
append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc);
}
return error;
}
@ -744,8 +747,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) {
return p != &g_empty_pollable && p != &pollset->pollable;
static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
return p != &g_empty_pollable && p != &pollset->pollable_obj;
}
static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
@ -791,7 +794,7 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
pollable_destroy(&pollset->pollable);
pollable_destroy(&pollset->pollable_obj);
if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) {
UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2,
"pollset_pollable");
@ -801,7 +804,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollable_t *p, gpr_timespec now,
pollable *p, gpr_timespec now,
gpr_timespec deadline) {
int timeout = poll_deadline_to_millis_timeout(deadline, now);
@ -883,68 +886,69 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->initialized_cv = false;
worker->kicked = false;
worker->pollset = pollset;
worker->pollable = pollset->current_pollable;
worker->pollable_obj = pollset->current_pollable;
if (pollset_is_pollable_fd(pollset, worker->pollable)) {
REF_BY((grpc_fd *)worker->pollable, 2, "one_poll");
if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll");
}
worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) {
if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE,
worker)) {
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
if (worker->pollable != &pollset->pollable) {
gpr_mu_unlock(&pollset->pollable.po.mu);
if (worker->pollable_obj != &pollset->pollable_obj) {
gpr_mu_unlock(&pollset->pollable_obj.po.mu);
}
if (GRPC_TRACER_ON(grpc_polling_trace) &&
worker->pollable->root_worker != worker) {
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
worker->pollable, worker,
worker->pollable_obj, worker,
poll_deadline_to_millis_timeout(deadline, *now));
}
while (do_poll && worker->pollable->root_worker != worker) {
if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) {
while (do_poll && worker->pollable_obj->root_worker != worker) {
if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
worker->pollable, worker);
worker->pollable_obj, worker);
}
do_poll = false;
} else if (worker->kicked) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable,
worker);
gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
worker->pollable_obj, worker);
}
do_poll = false;
} else if (GRPC_TRACER_ON(grpc_polling_trace) &&
worker->pollable->root_worker != worker) {
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
worker->pollable, worker);
worker->pollable_obj, worker);
}
}
if (worker->pollable != &pollset->pollable) {
gpr_mu_unlock(&worker->pollable->po.mu);
gpr_mu_lock(&pollset->pollable.po.mu);
gpr_mu_lock(&worker->pollable->po.mu);
if (worker->pollable_obj != &pollset->pollable_obj) {
gpr_mu_unlock(&worker->pollable_obj->po.mu);
gpr_mu_lock(&pollset->pollable_obj.po.mu);
gpr_mu_lock(&worker->pollable_obj->po.mu);
}
*now = gpr_now(now->clock_type);
}
return do_poll && pollset->shutdown_closure == NULL &&
pollset->current_pollable == worker->pollable;
pollset->current_pollable == worker->pollable_obj;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
if (NEW_ROOT ==
worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) {
gpr_cv_signal(&worker->pollable->root_worker->cv);
worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) {
gpr_cv_signal(&worker->pollable_obj->root_worker->cv);
}
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
if (pollset_is_pollable_fd(pollset, worker->pollable)) {
UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll");
if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll");
}
if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
@ -972,41 +976,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
if (pollset->current_pollable != &pollset->pollable) {
if (pollset->current_pollable != &pollset->pollable_obj) {
gpr_mu_lock(&pollset->current_pollable->po.mu);
}
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
append_error(&error, pollable_materialize(worker.pollable), err_desc);
if (worker.pollable != &pollset->pollable) {
gpr_mu_unlock(&worker.pollable->po.mu);
append_error(&error, pollable_materialize(worker.pollable_obj), err_desc);
if (worker.pollable_obj != &pollset->pollable_obj) {
gpr_mu_unlock(&worker.pollable_obj->po.mu);
}
gpr_mu_unlock(&pollset->pollable.po.mu);
gpr_mu_unlock(&pollset->pollable_obj.po.mu);
if (pollset->event_cursor == pollset->event_count) {
append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable,
append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj,
now, deadline),
err_desc);
}
append_error(&error, pollset_process_events(exec_ctx, pollset, false),
err_desc);
gpr_mu_lock(&pollset->pollable.po.mu);
if (worker.pollable != &pollset->pollable) {
gpr_mu_lock(&worker.pollable->po.mu);
gpr_mu_lock(&pollset->pollable_obj.po.mu);
if (worker.pollable_obj != &pollset->pollable_obj) {
gpr_mu_lock(&worker.pollable_obj->po.mu);
}
gpr_tls_set(&g_current_thread_pollset, 0);
gpr_tls_set(&g_current_thread_worker, 0);
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
if (worker.pollable != &pollset->pollable) {
gpr_mu_unlock(&worker.pollable->po.mu);
if (worker.pollable_obj != &pollset->pollable_obj) {
gpr_mu_unlock(&worker.pollable_obj->po.mu);
}
if (grpc_exec_ctx_has_work(exec_ctx)) {
gpr_mu_unlock(&pollset->pollable.po.mu);
gpr_mu_unlock(&pollset->pollable_obj.po.mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->pollable.po.mu);
gpr_mu_lock(&pollset->pollable_obj.po.mu);
}
return error;
}
@ -1029,20 +1033,20 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
}
/* empty pollable --> single fd pollable_t */
/* empty pollable --> single fd pollable */
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu);
pollset->current_pollable = &fd->pollable_obj;
if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu);
append_error(&error, fd_become_pollable_locked(fd), err_desc);
if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu);
if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu);
REF_BY(fd, 2, "pollset_pollable");
} else if (pollset->current_pollable == &pollset->pollable) {
} else if (pollset->current_pollable == &pollset->pollable_obj) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
}
append_error(&error, pollable_add_fd(pollset->current_pollable, fd),
err_desc);
} else if (pollset->current_pollable != &fd->pollable) {
} else if (pollset->current_pollable != &fd->pollable_obj) {
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
@ -1055,11 +1059,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
pollset->current_pollable = &pollset->pollable_obj;
if (append_error(&error, pollable_materialize(&pollset->pollable_obj),
err_desc)) {
pollable_add_fd(&pollset->pollable, had_fd);
pollable_add_fd(&pollset->pollable, fd);
pollable_add_fd(&pollset->pollable_obj, had_fd);
pollable_add_fd(&pollset->pollable_obj, fd);
}
GRPC_CLOSURE_SCHED(exec_ctx,
GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
@ -1071,9 +1075,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
gpr_mu_lock(&pollset->pollable.po.mu);
gpr_mu_lock(&pollset->pollable_obj.po.mu);
grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false);
gpr_mu_unlock(&pollset->pollable.po.mu);
gpr_mu_unlock(&pollset->pollable_obj.po.mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
@ -1095,7 +1099,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
po_join(exec_ctx, &pss->po, &fd->pollable.po);
po_join(exec_ctx, &pss->po, &fd->pollable_obj.po);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
@ -1103,7 +1107,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
po_join(exec_ctx, &pss->po, &ps->pollable.po);
po_join(exec_ctx, &pss->po, &ps->pollable_obj.po);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,

@ -135,7 +135,7 @@ typedef struct batch_control {
typedef struct {
gpr_mu child_list_mu;
grpc_call *first_child;
} parent_call_t;
} parent_call;
typedef struct {
grpc_call *parent;
@ -144,7 +144,7 @@ typedef struct {
parent->mu */
grpc_call *sibling_next;
grpc_call *sibling_prev;
} child_call_t;
} child_call;
#define RECV_NONE ((gpr_atm)0)
#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
@ -157,8 +157,8 @@ struct grpc_call {
grpc_polling_entity pollent;
grpc_channel *channel;
gpr_timespec start_time;
/* parent_call_t* */ gpr_atm parent_call_atm;
child_call_t *child_call;
/* parent_call* */ gpr_atm parent_call_atm;
child_call *child;
/* client or server call */
bool is_client;
@ -304,21 +304,21 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
return gpr_arena_alloc(call->arena, size);
}
static parent_call_t *get_or_create_parent_call(grpc_call *call) {
parent_call_t *p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm);
static parent_call *get_or_create_parent_call(grpc_call *call) {
parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
if (p == NULL) {
p = (parent_call_t *)gpr_arena_alloc(call->arena, sizeof(*p));
p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p));
gpr_mu_init(&p->child_list_mu);
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
gpr_mu_destroy(&p->child_list_mu);
p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm);
p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
}
}
return p;
}
static parent_call_t *get_parent_call(grpc_call *call) {
return (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm);
static parent_call *get_parent_call(grpc_call *call) {
return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
}
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
@ -377,24 +377,24 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
bool immediately_cancel = false;
if (args->parent_call != NULL) {
child_call_t *cc = call->child_call =
(child_call_t *)gpr_arena_alloc(arena, sizeof(child_call_t));
call->child_call->parent = args->parent_call;
if (args->parent != NULL) {
child_call *cc = call->child =
(child_call *)gpr_arena_alloc(arena, sizeof(child_call));
call->child->parent = args->parent;
GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GRPC_CALL_INTERNAL_REF(args->parent, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent_call->is_client);
GPR_ASSERT(!args->parent->is_client);
parent_call_t *pc = get_or_create_parent_call(args->parent_call);
parent_call *pc = get_or_create_parent_call(args->parent);
gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
args->parent_call->send_deadline.clock_type),
args->parent_call->send_deadline);
args->parent->send_deadline.clock_type),
args->parent->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
@ -406,9 +406,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
"Census tracing propagation requested "
"without Census context propagation"));
}
grpc_call_context_set(
call, GRPC_CONTEXT_TRACING,
args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
args->parent->context[GRPC_CONTEXT_TRACING].value,
NULL);
} else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Census context propagation requested "
@ -416,7 +416,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) {
if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
immediately_cancel = true;
}
}
@ -426,9 +426,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
cc->sibling_next = cc->sibling_prev = call;
} else {
cc->sibling_next = pc->first_child;
cc->sibling_prev = pc->first_child->child_call->sibling_prev;
cc->sibling_next->child_call->sibling_prev =
cc->sibling_prev->child_call->sibling_next = call;
cc->sibling_prev = pc->first_child->child->sibling_prev;
cc->sibling_next->child->sibling_prev =
cc->sibling_prev->child->sibling_next = call;
}
gpr_mu_unlock(&pc->child_list_mu);
@ -533,7 +533,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
parent_call_t *pc = get_parent_call(c);
parent_call *pc = get_parent_call(c);
if (pc != NULL) {
gpr_mu_destroy(&pc->child_list_mu);
}
@ -570,14 +570,14 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); }
void grpc_call_unref(grpc_call *c) {
if (!gpr_unref(&c->ext_ref)) return;
child_call_t *cc = c->child_call;
child_call *cc = c->child;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_call_unref", 0);
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
if (cc) {
parent_call_t *pc = get_parent_call(cc->parent);
parent_call *pc = get_parent_call(cc->parent);
gpr_mu_lock(&pc->child_list_mu);
if (c == pc->first_child) {
pc->first_child = cc->sibling_next;
@ -585,8 +585,8 @@ void grpc_call_unref(grpc_call *c) {
pc->first_child = NULL;
}
}
cc->sibling_prev->child_call->sibling_next = cc->sibling_next;
cc->sibling_next->child_call->sibling_prev = cc->sibling_prev;
cc->sibling_prev->child->sibling_next = cc->sibling_next;
cc->sibling_next->child->sibling_prev = cc->sibling_prev;
gpr_mu_unlock(&pc->child_list_mu);
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
}
@ -1309,14 +1309,14 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call_t *pc = get_parent_call(call);
parent_call *pc = get_parent_call(call);
if (pc != NULL) {
grpc_call *child;
gpr_mu_lock(&pc->child_list_mu);
child = pc->first_child;
if (child != NULL) {
do {
next_child_call = child->child_call->sibling_next;
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,

@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
typedef struct grpc_call_create_args {
grpc_channel *channel;
grpc_call *parent_call;
grpc_call *parent;
uint32_t propagation_mask;
grpc_completion_queue *cq;

@ -282,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_call_create_args args;
memset(&args, 0, sizeof(args));
args.channel = channel;
args.parent_call = parent_call;
args.parent = parent_call;
args.propagation_mask = propagation_mask;
args.cq = cq;
args.pollset_set_alternative = pollset_set_alternative;

Loading…
Cancel
Save