|
|
|
@ -51,6 +51,7 @@ |
|
|
|
|
#include <assert.h> |
|
|
|
|
#include <errno.h> |
|
|
|
|
#include <poll.h> |
|
|
|
|
#include <stdbool.h> |
|
|
|
|
#include <string.h> |
|
|
|
|
#include <sys/socket.h> |
|
|
|
|
#include <unistd.h> |
|
|
|
@ -88,9 +89,9 @@ struct grpc_fd { |
|
|
|
|
gpr_atm refst; |
|
|
|
|
|
|
|
|
|
gpr_mu mu; |
|
|
|
|
int shutdown; |
|
|
|
|
int closed; |
|
|
|
|
int released; |
|
|
|
|
bool shutdown; |
|
|
|
|
bool closed; |
|
|
|
|
bool released; |
|
|
|
|
|
|
|
|
|
/* The watcher list.
|
|
|
|
|
|
|
|
|
@ -186,8 +187,8 @@ typedef struct grpc_cached_wakeup_fd { |
|
|
|
|
|
|
|
|
|
struct grpc_pollset_worker { |
|
|
|
|
grpc_cached_wakeup_fd *wakeup_fd; |
|
|
|
|
int reevaluate_polling_on_wakeup; |
|
|
|
|
int kicked_specifically; |
|
|
|
|
bool reevaluate_polling_on_wakeup; |
|
|
|
|
bool kicked_specifically; |
|
|
|
|
struct grpc_pollset_worker *next; |
|
|
|
|
struct grpc_pollset_worker *prev; |
|
|
|
|
}; |
|
|
|
@ -201,9 +202,9 @@ struct grpc_pollset { |
|
|
|
|
gpr_mu mu; |
|
|
|
|
grpc_pollset_worker root_worker; |
|
|
|
|
int in_flight_cbs; |
|
|
|
|
int shutting_down; |
|
|
|
|
int called_shutdown; |
|
|
|
|
int kicked_without_pollers; |
|
|
|
|
bool shutting_down; |
|
|
|
|
bool called_shutdown; |
|
|
|
|
bool kicked_without_pollers; |
|
|
|
|
grpc_closure *shutdown_done; |
|
|
|
|
grpc_closure_list idle_jobs; |
|
|
|
|
union { |
|
|
|
@ -332,7 +333,7 @@ static grpc_fd *alloc_fd(int fd) { |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&r->mu); |
|
|
|
|
gpr_atm_rel_store(&r->refst, 1); |
|
|
|
|
r->shutdown = 0; |
|
|
|
|
r->shutdown = false; |
|
|
|
|
r->read_closure = CLOSURE_NOT_READY; |
|
|
|
|
r->write_closure = CLOSURE_NOT_READY; |
|
|
|
|
r->fd = fd; |
|
|
|
@ -341,8 +342,8 @@ static grpc_fd *alloc_fd(int fd) { |
|
|
|
|
r->freelist_next = NULL; |
|
|
|
|
r->read_watcher = r->write_watcher = NULL; |
|
|
|
|
r->on_done_closure = NULL; |
|
|
|
|
r->closed = 0; |
|
|
|
|
r->released = 0; |
|
|
|
|
r->closed = false; |
|
|
|
|
r->released = false; |
|
|
|
|
gpr_mu_unlock(&r->mu); |
|
|
|
|
return r; |
|
|
|
|
} |
|
|
|
@ -455,7 +456,7 @@ static int has_watchers(grpc_fd *fd) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
fd->closed = 1; |
|
|
|
|
fd->closed = true; |
|
|
|
|
if (!fd->released) { |
|
|
|
|
close(fd->fd); |
|
|
|
|
} else { |
|
|
|
@ -538,28 +539,28 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* returns 1 if state becomes not ready */ |
|
|
|
|
static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure **st) { |
|
|
|
|
/* returns true if state becomes not ready */ |
|
|
|
|
static bool set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure **st) { |
|
|
|
|
if (*st == CLOSURE_READY) { |
|
|
|
|
/* duplicate ready ==> ignore */ |
|
|
|
|
return 0; |
|
|
|
|
return false; |
|
|
|
|
} else if (*st == CLOSURE_NOT_READY) { |
|
|
|
|
/* not ready, and not waiting ==> flag ready */ |
|
|
|
|
*st = CLOSURE_READY; |
|
|
|
|
return 0; |
|
|
|
|
return false; |
|
|
|
|
} else { |
|
|
|
|
/* waiting ==> queue closure */ |
|
|
|
|
grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); |
|
|
|
|
*st = CLOSURE_NOT_READY; |
|
|
|
|
return 1; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
gpr_mu_lock(&fd->mu); |
|
|
|
|
GPR_ASSERT(!fd->shutdown); |
|
|
|
|
fd->shutdown = 1; |
|
|
|
|
fd->shutdown = true; |
|
|
|
|
set_ready_locked(exec_ctx, fd, &fd->read_closure); |
|
|
|
|
set_ready_locked(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
@ -632,8 +633,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
int got_read, int got_write) { |
|
|
|
|
int was_polling = 0; |
|
|
|
|
int kick = 0; |
|
|
|
|
bool was_polling = false; |
|
|
|
|
bool kick = false; |
|
|
|
|
grpc_fd *fd = watcher->fd; |
|
|
|
|
|
|
|
|
|
if (fd == NULL) { |
|
|
|
@ -644,17 +645,17 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
|
|
|
|
|
if (watcher == fd->read_watcher) { |
|
|
|
|
/* remove read watcher, kick if we still need a read */ |
|
|
|
|
was_polling = 1; |
|
|
|
|
was_polling = true; |
|
|
|
|
if (!got_read) { |
|
|
|
|
kick = 1; |
|
|
|
|
kick = true; |
|
|
|
|
} |
|
|
|
|
fd->read_watcher = NULL; |
|
|
|
|
} |
|
|
|
|
if (watcher == fd->write_watcher) { |
|
|
|
|
/* remove write watcher, kick if we still need a write */ |
|
|
|
|
was_polling = 1; |
|
|
|
|
was_polling = true; |
|
|
|
|
if (!got_write) { |
|
|
|
|
kick = 1; |
|
|
|
|
kick = true; |
|
|
|
|
} |
|
|
|
|
fd->write_watcher = NULL; |
|
|
|
|
} |
|
|
|
@ -665,12 +666,12 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
} |
|
|
|
|
if (got_read) { |
|
|
|
|
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { |
|
|
|
|
kick = 1; |
|
|
|
|
kick = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (got_write) { |
|
|
|
|
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { |
|
|
|
|
kick = 1; |
|
|
|
|
kick = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (kick) { |
|
|
|
@ -753,23 +754,23 @@ static grpc_error *pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
kick_append_error( |
|
|
|
|
&error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); |
|
|
|
|
} |
|
|
|
|
p->kicked_without_pollers = 1; |
|
|
|
|
p->kicked_without_pollers = true; |
|
|
|
|
GPR_TIMER_END("pollset_kick_ext.broadcast", 0); |
|
|
|
|
} else if (gpr_tls_get(&g_current_thread_worker) != |
|
|
|
|
(intptr_t)specific_worker) { |
|
|
|
|
GPR_TIMER_MARK("different_thread_worker", 0); |
|
|
|
|
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { |
|
|
|
|
specific_worker->reevaluate_polling_on_wakeup = 1; |
|
|
|
|
specific_worker->reevaluate_polling_on_wakeup = true; |
|
|
|
|
} |
|
|
|
|
specific_worker->kicked_specifically = 1; |
|
|
|
|
specific_worker->kicked_specifically = true; |
|
|
|
|
kick_append_error(&error, |
|
|
|
|
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); |
|
|
|
|
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { |
|
|
|
|
GPR_TIMER_MARK("kick_yoself", 0); |
|
|
|
|
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { |
|
|
|
|
specific_worker->reevaluate_polling_on_wakeup = 1; |
|
|
|
|
specific_worker->reevaluate_polling_on_wakeup = true; |
|
|
|
|
} |
|
|
|
|
specific_worker->kicked_specifically = 1; |
|
|
|
|
specific_worker->kicked_specifically = true; |
|
|
|
|
kick_append_error(&error, |
|
|
|
|
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); |
|
|
|
|
} |
|
|
|
@ -797,7 +798,7 @@ static grpc_error *pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GPR_TIMER_MARK("kicked_no_pollers", 0); |
|
|
|
|
p->kicked_without_pollers = 1; |
|
|
|
|
p->kicked_without_pollers = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -839,12 +840,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
*mu = &pollset->mu; |
|
|
|
|
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; |
|
|
|
|
pollset->in_flight_cbs = 0; |
|
|
|
|
pollset->shutting_down = 0; |
|
|
|
|
pollset->called_shutdown = 0; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
pollset->shutting_down = false; |
|
|
|
|
pollset->called_shutdown = false; |
|
|
|
|
pollset->kicked_without_pollers = false; |
|
|
|
|
pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; |
|
|
|
|
pollset->local_wakeup_cache = NULL; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
become_basic_pollset(pollset, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -868,9 +868,9 @@ static void pollset_reset(grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
|
|
|
|
pollset->vtable->destroy(pollset); |
|
|
|
|
pollset->shutting_down = 0; |
|
|
|
|
pollset->called_shutdown = 0; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
pollset->shutting_down = false; |
|
|
|
|
pollset->called_shutdown = false; |
|
|
|
|
pollset->kicked_without_pollers = false; |
|
|
|
|
become_basic_pollset(pollset, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -909,7 +909,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work", 0); |
|
|
|
|
/* this must happen before we (potentially) drop pollset->mu */ |
|
|
|
|
worker.next = worker.prev = NULL; |
|
|
|
|
worker.reevaluate_polling_on_wakeup = 0; |
|
|
|
|
worker.reevaluate_polling_on_wakeup = false; |
|
|
|
|
if (pollset->local_wakeup_cache != NULL) { |
|
|
|
|
worker.wakeup_fd = pollset->local_wakeup_cache; |
|
|
|
|
pollset->local_wakeup_cache = worker.wakeup_fd->next; |
|
|
|
@ -920,7 +920,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
worker.kicked_specifically = 0; |
|
|
|
|
worker.kicked_specifically = false; |
|
|
|
|
/* If there's work waiting for the pollset to be idle, and the
|
|
|
|
|
pollset is idle, then do that work */ |
|
|
|
|
if (!pollset_has_workers(pollset) && |
|
|
|
@ -962,7 +962,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_tls_set(&g_current_thread_poller, 0); |
|
|
|
|
} else { |
|
|
|
|
GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
pollset->kicked_without_pollers = false; |
|
|
|
|
} |
|
|
|
|
/* Finished execution - start cleaning up.
|
|
|
|
|
Note that we may arrive here from outside the enclosing while() loop. |
|
|
|
@ -978,8 +978,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force |
|
|
|
|
a loop */ |
|
|
|
|
if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) { |
|
|
|
|
worker.reevaluate_polling_on_wakeup = 0; |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
worker.reevaluate_polling_on_wakeup = false; |
|
|
|
|
pollset->kicked_without_pollers = false; |
|
|
|
|
if (queued_work || worker.kicked_specifically) { |
|
|
|
|
/* If there's queued work on the list, then set the deadline to be
|
|
|
|
|
immediate so we get back out of the polling loop quickly */ |
|
|
|
@ -1000,7 +1000,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
if (pollset_has_workers(pollset)) { |
|
|
|
|
pollset_kick(pollset, NULL); |
|
|
|
|
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
pollset->called_shutdown = true; |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
finish_shutdown(exec_ctx, pollset); |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
@ -1024,7 +1024,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
GPR_ASSERT(!pollset->shutting_down); |
|
|
|
|
pollset->shutting_down = 1; |
|
|
|
|
pollset->shutting_down = true; |
|
|
|
|
pollset->shutdown_done = closure; |
|
|
|
|
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
if (!pollset_has_workers(pollset)) { |
|
|
|
@ -1032,7 +1032,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && |
|
|
|
|
!pollset_has_workers(pollset)) { |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
pollset->called_shutdown = true; |
|
|
|
|
finish_shutdown(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1096,7 +1096,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, |
|
|
|
|
if (pollset->shutting_down) { |
|
|
|
|
/* We don't care about this pollset anymore. */ |
|
|
|
|
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
pollset->called_shutdown = true; |
|
|
|
|
finish_shutdown(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
} else if (fd_is_orphaned(fd)) { |
|
|
|
@ -1622,7 +1622,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
if (da->pollset->shutting_down) { |
|
|
|
|
/* We don't care about this pollset anymore. */ |
|
|
|
|
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { |
|
|
|
|
da->pollset->called_shutdown = 1; |
|
|
|
|
da->pollset->called_shutdown = true; |
|
|
|
|
grpc_exec_ctx_push(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE, |
|
|
|
|
NULL); |
|
|
|
|
} |
|
|
|
|