|
|
|
@ -201,7 +201,7 @@ struct grpc_pollset_worker { |
|
|
|
|
|
|
|
|
|
struct grpc_pollset { |
|
|
|
|
pollable pollable_obj; |
|
|
|
|
pollable *current_pollable; |
|
|
|
|
pollable *current_pollable_obj; |
|
|
|
|
int kick_alls_pending; |
|
|
|
|
bool kicked_without_poller; |
|
|
|
|
grpc_closure *shutdown_closure; |
|
|
|
@ -667,7 +667,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, |
|
|
|
|
/* p->po.mu must be held before calling this function */ |
|
|
|
|
static grpc_error *pollset_kick(grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *specific_worker) { |
|
|
|
|
pollable *p = pollset->current_pollable; |
|
|
|
|
pollable *p = pollset->current_pollable_obj; |
|
|
|
|
if (p != &pollset->pollable_obj) { |
|
|
|
|
gpr_mu_lock(&p->po.mu); |
|
|
|
|
} |
|
|
|
@ -680,7 +680,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
pollable_init(&pollset->pollable_obj, PO_POLLSET); |
|
|
|
|
pollset->current_pollable = &g_empty_pollable; |
|
|
|
|
pollset->current_pollable_obj = &g_empty_pollable; |
|
|
|
|
pollset->kicked_without_poller = false; |
|
|
|
|
pollset->shutdown_closure = NULL; |
|
|
|
|
pollset->root_worker = NULL; |
|
|
|
@ -795,8 +795,8 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */ |
|
|
|
|
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
pollable_destroy(&pollset->pollable_obj); |
|
|
|
|
if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { |
|
|
|
|
UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, |
|
|
|
|
if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { |
|
|
|
|
UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, |
|
|
|
|
"pollset_pollable"); |
|
|
|
|
} |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_process_events", |
|
|
|
@ -886,7 +886,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
worker->initialized_cv = false; |
|
|
|
|
worker->kicked = false; |
|
|
|
|
worker->pollset = pollset; |
|
|
|
|
worker->pollable_obj = pollset->current_pollable; |
|
|
|
|
worker->pollable_obj = pollset->current_pollable_obj; |
|
|
|
|
|
|
|
|
|
if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { |
|
|
|
|
REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); |
|
|
|
@ -934,7 +934,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return do_poll && pollset->shutdown_closure == NULL && |
|
|
|
|
pollset->current_pollable == worker->pollable_obj; |
|
|
|
|
pollset->current_pollable_obj == worker->pollable_obj; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
@ -976,8 +976,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
pollset->kicked_without_poller = false; |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
if (pollset->current_pollable != &pollset->pollable_obj) { |
|
|
|
|
gpr_mu_lock(&pollset->current_pollable->po.mu); |
|
|
|
|
if (pollset->current_pollable_obj != &pollset->pollable_obj) { |
|
|
|
|
gpr_mu_lock(&pollset->current_pollable_obj->po.mu); |
|
|
|
|
} |
|
|
|
|
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { |
|
|
|
|
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); |
|
|
|
@ -1027,7 +1027,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
bool fd_locked) { |
|
|
|
|
static const char *err_desc = "pollset_add_fd"; |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
if (pollset->current_pollable == &g_empty_pollable) { |
|
|
|
|
if (pollset->current_pollable_obj == &g_empty_pollable) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"PS:%p add fd %p; transition pollable from empty to fd", pollset, |
|
|
|
@ -1035,19 +1035,19 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
/* empty pollable --> single fd pollable */ |
|
|
|
|
pollset_kick_all(exec_ctx, pollset); |
|
|
|
|
pollset->current_pollable = &fd->pollable_obj; |
|
|
|
|
pollset->current_pollable_obj = &fd->pollable_obj; |
|
|
|
|
if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); |
|
|
|
|
append_error(&error, fd_become_pollable_locked(fd), err_desc); |
|
|
|
|
if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); |
|
|
|
|
REF_BY(fd, 2, "pollset_pollable"); |
|
|
|
|
} else if (pollset->current_pollable == &pollset->pollable_obj) { |
|
|
|
|
} else if (pollset->current_pollable_obj == &pollset->pollable_obj) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); |
|
|
|
|
} |
|
|
|
|
append_error(&error, pollable_add_fd(pollset->current_pollable, fd), |
|
|
|
|
append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), |
|
|
|
|
err_desc); |
|
|
|
|
} else if (pollset->current_pollable != &fd->pollable_obj) { |
|
|
|
|
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; |
|
|
|
|
} else if (pollset->current_pollable_obj != &fd->pollable_obj) { |
|
|
|
|
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_polling_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"PS:%p add fd %p; transition pollable from fd %p to multipoller", |
|
|
|
@ -1059,7 +1059,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); |
|
|
|
|
grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); |
|
|
|
|
pollset_kick_all(exec_ctx, pollset); |
|
|
|
|
pollset->current_pollable = &pollset->pollable_obj; |
|
|
|
|
pollset->current_pollable_obj = &pollset->pollable_obj; |
|
|
|
|
if (append_error(&error, pollable_materialize(&pollset->pollable_obj), |
|
|
|
|
err_desc)) { |
|
|
|
|
pollable_add_fd(&pollset->pollable_obj, had_fd); |
|
|
|
|