|
|
|
@ -139,42 +139,43 @@ struct grpc_fd { |
|
|
|
|
Polling strategies that do not need to alter their behavior depending on the |
|
|
|
|
fd's current interest (such as epoll) do not need to call this function. |
|
|
|
|
MUST NOT be called with a pollset lock taken */ |
|
|
|
|
uint32_t grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, uint32_t read_mask, |
|
|
|
|
uint32_t write_mask, grpc_fd_watcher *rec); |
|
|
|
|
/* Complete polling previously started with grpc_fd_begin_poll
|
|
|
|
|
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, uint32_t read_mask, |
|
|
|
|
uint32_t write_mask, grpc_fd_watcher *rec); |
|
|
|
|
/* Complete polling previously started with fd_begin_poll
|
|
|
|
|
MUST NOT be called with a pollset lock taken |
|
|
|
|
if got_read or got_write are 1, also does the become_{readable,writable} as |
|
|
|
|
appropriate. */ |
|
|
|
|
void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, |
|
|
|
|
int got_read, int got_write); |
|
|
|
|
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, |
|
|
|
|
int got_read, int got_write); |
|
|
|
|
|
|
|
|
|
/* Return 1 if this fd is orphaned, 0 otherwise */ |
|
|
|
|
int grpc_fd_is_orphaned(grpc_fd *fd); |
|
|
|
|
static bool fd_is_orphaned(grpc_fd *fd); |
|
|
|
|
|
|
|
|
|
/* Notification from the poller to an fd that it has become readable or
|
|
|
|
|
writable. |
|
|
|
|
If allow_synchronous_callback is 1, allow running the fd callback inline |
|
|
|
|
in this callstack, otherwise register an asynchronous callback and return */ |
|
|
|
|
void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); |
|
|
|
|
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); |
|
|
|
|
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); |
|
|
|
|
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); |
|
|
|
|
|
|
|
|
|
/* Reference counting for fds */ |
|
|
|
|
/*#define GRPC_FD_REF_COUNT_DEBUG*/ |
|
|
|
|
#ifdef GRPC_FD_REF_COUNT_DEBUG |
|
|
|
|
void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); |
|
|
|
|
void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); |
|
|
|
|
#define GRPC_FD_REF(fd, reason) grpc_fd_ref(fd, reason, __FILE__, __LINE__) |
|
|
|
|
#define GRPC_FD_UNREF(fd, reason) grpc_fd_unref(fd, reason, __FILE__, __LINE__) |
|
|
|
|
static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); |
|
|
|
|
static void fd_unref(grpc_fd *fd, const char *reason, const char *file, |
|
|
|
|
int line); |
|
|
|
|
#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__) |
|
|
|
|
#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__) |
|
|
|
|
#else |
|
|
|
|
void grpc_fd_ref(grpc_fd *fd); |
|
|
|
|
void grpc_fd_unref(grpc_fd *fd); |
|
|
|
|
#define GRPC_FD_REF(fd, reason) grpc_fd_ref(fd) |
|
|
|
|
#define GRPC_FD_UNREF(fd, reason) grpc_fd_unref(fd) |
|
|
|
|
static void fd_ref(grpc_fd *fd); |
|
|
|
|
static void fd_unref(grpc_fd *fd); |
|
|
|
|
#define GRPC_FD_REF(fd, reason) fd_ref(fd) |
|
|
|
|
#define GRPC_FD_UNREF(fd, reason) fd_unref(fd) |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
void grpc_fd_global_init(void); |
|
|
|
|
void grpc_fd_global_shutdown(void); |
|
|
|
|
static void fd_global_init(void); |
|
|
|
|
static void fd_global_shutdown(void); |
|
|
|
|
|
|
|
|
|
#define CLOSURE_NOT_READY ((grpc_closure *)0) |
|
|
|
|
#define CLOSURE_READY ((grpc_closure *)1) |
|
|
|
@ -231,13 +232,11 @@ struct grpc_pollset_vtable { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/* Add an fd to a pollset */ |
|
|
|
|
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
struct grpc_fd *fd); |
|
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
struct grpc_fd *fd); |
|
|
|
|
|
|
|
|
|
/* Returns the fd to listen on for kicks */ |
|
|
|
|
int grpc_kick_read_fd(grpc_pollset *p); |
|
|
|
|
/* Call after polling has been kicked to leave the kicked state */ |
|
|
|
|
void grpc_kick_drain(grpc_pollset *p); |
|
|
|
|
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, grpc_fd *fd); |
|
|
|
|
|
|
|
|
|
/* Convert a timespec to milliseconds:
|
|
|
|
|
- very small or negative poll times are clamped to zero to do a |
|
|
|
@ -246,35 +245,31 @@ void grpc_kick_drain(grpc_pollset *p); |
|
|
|
|
- longer than a millisecond polls are rounded up to the next nearest |
|
|
|
|
millisecond to avoid spinning |
|
|
|
|
- infinite timeouts are converted to -1 */ |
|
|
|
|
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, |
|
|
|
|
gpr_timespec now); |
|
|
|
|
static int poll_deadline_to_millis_timeout(gpr_timespec deadline, |
|
|
|
|
gpr_timespec now); |
|
|
|
|
|
|
|
|
|
/* Allow kick to wakeup the currently polling worker */ |
|
|
|
|
#define GRPC_POLLSET_CAN_KICK_SELF 1 |
|
|
|
|
/* Force the wakee to repoll when awoken */ |
|
|
|
|
#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 |
|
|
|
|
/* As per grpc_pollset_kick, with an extended set of flags (defined above)
|
|
|
|
|
/* As per pollset_kick, with an extended set of flags (defined above)
|
|
|
|
|
-- mostly for fd_posix's use. */ |
|
|
|
|
void grpc_pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
grpc_pollset_worker *specific_worker, |
|
|
|
|
uint32_t flags); |
|
|
|
|
static void pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
grpc_pollset_worker *specific_worker, |
|
|
|
|
uint32_t flags); |
|
|
|
|
|
|
|
|
|
/* turn a pollset into a multipoller: platform specific */ |
|
|
|
|
typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
struct grpc_fd **fds, |
|
|
|
|
size_t fd_count); |
|
|
|
|
extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; |
|
|
|
|
typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, |
|
|
|
|
struct grpc_fd **fds, |
|
|
|
|
size_t fd_count); |
|
|
|
|
static platform_become_multipoller_type platform_become_multipoller; |
|
|
|
|
|
|
|
|
|
void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, struct grpc_fd **fds, |
|
|
|
|
size_t fd_count); |
|
|
|
|
|
|
|
|
|
/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
|
|
|
|
|
/* Return 1 if the pollset has active threads in pollset_work (pollset must
|
|
|
|
|
* be locked) */ |
|
|
|
|
int grpc_pollset_has_workers(grpc_pollset *pollset); |
|
|
|
|
static int pollset_has_workers(grpc_pollset *pollset); |
|
|
|
|
|
|
|
|
|
void grpc_remove_fd_from_all_epoll_sets(int fd); |
|
|
|
|
static void remove_fd_from_all_epoll_sets(int fd); |
|
|
|
|
|
|
|
|
|
/* override to allow tests to hook poll() usage */ |
|
|
|
|
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); |
|
|
|
@ -401,9 +396,9 @@ static void unref_by(grpc_fd *fd, int n) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } |
|
|
|
|
static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } |
|
|
|
|
|
|
|
|
|
void grpc_fd_global_shutdown(void) { |
|
|
|
|
static void fd_global_shutdown(void) { |
|
|
|
|
gpr_mu_lock(&fd_freelist_mu); |
|
|
|
|
gpr_mu_unlock(&fd_freelist_mu); |
|
|
|
|
while (fd_freelist != NULL) { |
|
|
|
@ -414,7 +409,7 @@ void grpc_fd_global_shutdown(void) { |
|
|
|
|
gpr_mu_destroy(&fd_freelist_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_fd *grpc_fd_create(int fd, const char *name) { |
|
|
|
|
static grpc_fd *fd_create(int fd, const char *name) { |
|
|
|
|
grpc_fd *r = alloc_fd(fd); |
|
|
|
|
char *name2; |
|
|
|
|
gpr_asprintf(&name2, "%s fd=%d", name, fd); |
|
|
|
@ -426,15 +421,15 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { |
|
|
|
|
return r; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_fd_is_orphaned(grpc_fd *fd) { |
|
|
|
|
static bool fd_is_orphaned(grpc_fd *fd) { |
|
|
|
|
return (gpr_atm_acq_load(&fd->refst) & 1) == 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_kick_locked(grpc_fd_watcher *watcher) { |
|
|
|
|
gpr_mu_lock(watcher->pollset->mu); |
|
|
|
|
GPR_ASSERT(watcher->worker); |
|
|
|
|
grpc_pollset_kick_ext(watcher->pollset, watcher->worker, |
|
|
|
|
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); |
|
|
|
|
pollset_kick_ext(watcher->pollset, watcher->worker, |
|
|
|
|
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); |
|
|
|
|
gpr_mu_unlock(watcher->pollset->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -472,12 +467,12 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
if (!fd->released) { |
|
|
|
|
close(fd->fd); |
|
|
|
|
} else { |
|
|
|
|
grpc_remove_fd_from_all_epoll_sets(fd->fd); |
|
|
|
|
remove_fd_from_all_epoll_sets(fd->fd); |
|
|
|
|
} |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_fd_wrapped_fd(grpc_fd *fd) { |
|
|
|
|
static int fd_wrapped_fd(grpc_fd *fd) { |
|
|
|
|
if (fd->released || fd->closed) { |
|
|
|
|
return -1; |
|
|
|
|
} else { |
|
|
|
@ -485,8 +480,9 @@ int grpc_fd_wrapped_fd(grpc_fd *fd) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, |
|
|
|
|
int *release_fd, const char *reason) { |
|
|
|
|
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *on_done, int *release_fd, |
|
|
|
|
const char *reason) { |
|
|
|
|
fd->on_done_closure = on_done; |
|
|
|
|
fd->released = release_fd != NULL; |
|
|
|
|
if (!fd->released) { |
|
|
|
@ -507,18 +503,19 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, |
|
|
|
|
|
|
|
|
|
/* increment refcount by two to avoid changing the orphan bit */ |
|
|
|
|
#ifdef GRPC_FD_REF_COUNT_DEBUG |
|
|
|
|
void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) { |
|
|
|
|
static void fd_ref(grpc_fd *fd, const char *reason, const char *file, |
|
|
|
|
int line) { |
|
|
|
|
ref_by(fd, 2, reason, file, line); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, |
|
|
|
|
int line) { |
|
|
|
|
static void fd_unref(grpc_fd *fd, const char *reason, const char *file, |
|
|
|
|
int line) { |
|
|
|
|
unref_by(fd, 2, reason, file, line); |
|
|
|
|
} |
|
|
|
|
#else |
|
|
|
|
void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } |
|
|
|
|
static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } |
|
|
|
|
|
|
|
|
|
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } |
|
|
|
|
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
@ -566,7 +563,7 @@ static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
gpr_mu_lock(&fd->mu); |
|
|
|
|
GPR_ASSERT(!fd->shutdown); |
|
|
|
|
fd->shutdown = 1; |
|
|
|
@ -575,23 +572,23 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
gpr_mu_lock(&fd->mu); |
|
|
|
|
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
gpr_mu_lock(&fd->mu); |
|
|
|
|
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uint32_t grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, uint32_t read_mask, |
|
|
|
|
uint32_t write_mask, grpc_fd_watcher *watcher) { |
|
|
|
|
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker *worker, uint32_t read_mask, |
|
|
|
|
uint32_t write_mask, grpc_fd_watcher *watcher) { |
|
|
|
|
uint32_t mask = 0; |
|
|
|
|
grpc_closure *cur; |
|
|
|
|
int requested; |
|
|
|
@ -640,8 +637,8 @@ uint32_t grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, |
|
|
|
|
return mask; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
int got_read, int got_write) { |
|
|
|
|
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
int got_read, int got_write) { |
|
|
|
|
int was_polling = 0; |
|
|
|
|
int kick = 0; |
|
|
|
|
grpc_fd *fd = watcher->fd; |
|
|
|
@ -686,7 +683,7 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
if (kick) { |
|
|
|
|
maybe_wake_one_watcher_locked(fd); |
|
|
|
|
} |
|
|
|
|
if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { |
|
|
|
|
if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { |
|
|
|
|
close_fd_locked(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
@ -694,11 +691,11 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, |
|
|
|
|
GRPC_FD_UNREF(fd, "poll"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
set_ready(exec_ctx, fd, &fd->read_closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
set_ready(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -724,12 +721,12 @@ static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { |
|
|
|
|
worker->next->prev = worker->prev; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_pollset_has_workers(grpc_pollset *p) { |
|
|
|
|
static int pollset_has_workers(grpc_pollset *p) { |
|
|
|
|
return p->root_worker.next != &p->root_worker; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { |
|
|
|
|
if (grpc_pollset_has_workers(p)) { |
|
|
|
|
if (pollset_has_workers(p)) { |
|
|
|
|
grpc_pollset_worker *w = p->root_worker.next; |
|
|
|
|
remove_worker(p, w); |
|
|
|
|
return w; |
|
|
|
@ -750,17 +747,15 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { |
|
|
|
|
worker->prev->next = worker->next->prev = worker; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } |
|
|
|
|
|
|
|
|
|
void grpc_pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
grpc_pollset_worker *specific_worker, |
|
|
|
|
uint32_t flags) { |
|
|
|
|
GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0); |
|
|
|
|
static void pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
grpc_pollset_worker *specific_worker, |
|
|
|
|
uint32_t flags) { |
|
|
|
|
GPR_TIMER_BEGIN("pollset_kick_ext", 0); |
|
|
|
|
|
|
|
|
|
/* pollset->mu already held */ |
|
|
|
|
if (specific_worker != NULL) { |
|
|
|
|
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { |
|
|
|
|
GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0); |
|
|
|
|
GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0); |
|
|
|
|
GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); |
|
|
|
|
for (specific_worker = p->root_worker.next; |
|
|
|
|
specific_worker != &p->root_worker; |
|
|
|
@ -768,7 +763,7 @@ void grpc_pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); |
|
|
|
|
} |
|
|
|
|
p->kicked_without_pollers = 1; |
|
|
|
|
GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); |
|
|
|
|
GPR_TIMER_END("pollset_kick_ext.broadcast", 0); |
|
|
|
|
} else if (gpr_tls_get(&g_current_thread_worker) != |
|
|
|
|
(intptr_t)specific_worker) { |
|
|
|
|
GPR_TIMER_MARK("different_thread_worker", 0); |
|
|
|
@ -812,36 +807,37 @@ void grpc_pollset_kick_ext(grpc_pollset *p, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_TIMER_END("grpc_pollset_kick_ext", 0); |
|
|
|
|
GPR_TIMER_END("pollset_kick_ext", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { |
|
|
|
|
grpc_pollset_kick_ext(p, specific_worker, 0); |
|
|
|
|
static void pollset_kick(grpc_pollset *p, |
|
|
|
|
grpc_pollset_worker *specific_worker) { |
|
|
|
|
pollset_kick_ext(p, specific_worker, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* global state management */ |
|
|
|
|
|
|
|
|
|
void grpc_pollset_global_init(void) { |
|
|
|
|
static void pollset_global_init(void) { |
|
|
|
|
gpr_tls_init(&g_current_thread_poller); |
|
|
|
|
gpr_tls_init(&g_current_thread_worker); |
|
|
|
|
grpc_wakeup_fd_global_init(); |
|
|
|
|
grpc_wakeup_fd_init(&grpc_global_wakeup_fd); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_global_shutdown(void) { |
|
|
|
|
static void pollset_global_shutdown(void) { |
|
|
|
|
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); |
|
|
|
|
gpr_tls_destroy(&g_current_thread_poller); |
|
|
|
|
gpr_tls_destroy(&g_current_thread_worker); |
|
|
|
|
grpc_wakeup_fd_global_destroy(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } |
|
|
|
|
static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } |
|
|
|
|
|
|
|
|
|
/* main interface */ |
|
|
|
|
|
|
|
|
|
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); |
|
|
|
|
|
|
|
|
|
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) { |
|
|
|
|
static void pollset_init(grpc_pollset *pollset, gpr_mu *mu) { |
|
|
|
|
pollset->mu = mu; |
|
|
|
|
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; |
|
|
|
|
pollset->in_flight_cbs = 0; |
|
|
|
@ -854,9 +850,9 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) { |
|
|
|
|
become_basic_pollset(pollset, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
static void pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(pollset->in_flight_cbs == 0); |
|
|
|
|
GPR_ASSERT(!grpc_pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
|
|
|
|
pollset->vtable->destroy(pollset); |
|
|
|
|
while (pollset->local_wakeup_cache) { |
|
|
|
@ -867,10 +863,10 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_reset(grpc_pollset *pollset) { |
|
|
|
|
static void pollset_reset(grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(pollset->shutting_down); |
|
|
|
|
GPR_ASSERT(pollset->in_flight_cbs == 0); |
|
|
|
|
GPR_ASSERT(!grpc_pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); |
|
|
|
|
pollset->vtable->destroy(pollset); |
|
|
|
|
pollset->shutting_down = 0; |
|
|
|
@ -879,8 +875,8 @@ void grpc_pollset_reset(grpc_pollset *pollset) { |
|
|
|
|
become_basic_pollset(pollset, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd) { |
|
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd) { |
|
|
|
|
gpr_mu_lock(pollset->mu); |
|
|
|
|
pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); |
|
|
|
|
/* the following (enabled only in debug) will reacquire and then release
|
|
|
|
@ -899,9 +895,9 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker **worker_hdl, gpr_timespec now, |
|
|
|
|
gpr_timespec deadline) { |
|
|
|
|
static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_pollset_worker **worker_hdl, gpr_timespec now, |
|
|
|
|
gpr_timespec deadline) { |
|
|
|
|
grpc_pollset_worker worker; |
|
|
|
|
*worker_hdl = &worker; |
|
|
|
|
|
|
|
|
@ -910,7 +906,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
int locked = 1; |
|
|
|
|
int queued_work = 0; |
|
|
|
|
int keep_polling = 0; |
|
|
|
|
GPR_TIMER_BEGIN("grpc_pollset_work", 0); |
|
|
|
|
GPR_TIMER_BEGIN("pollset_work", 0); |
|
|
|
|
/* this must happen before we (potentially) drop pollset->mu */ |
|
|
|
|
worker.next = worker.prev = NULL; |
|
|
|
|
worker.reevaluate_polling_on_wakeup = 0; |
|
|
|
@ -924,20 +920,20 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
worker.kicked_specifically = 0; |
|
|
|
|
/* If there's work waiting for the pollset to be idle, and the
|
|
|
|
|
pollset is idle, then do that work */ |
|
|
|
|
if (!grpc_pollset_has_workers(pollset) && |
|
|
|
|
if (!pollset_has_workers(pollset) && |
|
|
|
|
!grpc_closure_list_empty(pollset->idle_jobs)) { |
|
|
|
|
GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); |
|
|
|
|
GPR_TIMER_MARK("pollset_work.idle_jobs", 0); |
|
|
|
|
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
/* If we're shutting down then we don't execute any extended work */ |
|
|
|
|
if (pollset->shutting_down) { |
|
|
|
|
GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); |
|
|
|
|
GPR_TIMER_MARK("pollset_work.shutting_down", 0); |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
/* Give do_promote priority so we don't starve it out */ |
|
|
|
|
if (pollset->in_flight_cbs) { |
|
|
|
|
GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); |
|
|
|
|
GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0); |
|
|
|
|
gpr_mu_unlock(pollset->mu); |
|
|
|
|
locked = 0; |
|
|
|
|
goto done; |
|
|
|
@ -962,7 +958,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
locked = 0; |
|
|
|
|
gpr_tls_set(&g_current_thread_poller, 0); |
|
|
|
|
} else { |
|
|
|
|
GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0); |
|
|
|
|
GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); |
|
|
|
|
pollset->kicked_without_pollers = 0; |
|
|
|
|
} |
|
|
|
|
/* Finished execution - start cleaning up.
|
|
|
|
@ -975,7 +971,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
gpr_mu_lock(pollset->mu); |
|
|
|
|
locked = 1; |
|
|
|
|
} |
|
|
|
|
/* If we're forced to re-evaluate polling (via grpc_pollset_kick with
|
|
|
|
|
/* If we're forced to re-evaluate polling (via pollset_kick with
|
|
|
|
|
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force |
|
|
|
|
a loop */ |
|
|
|
|
if (worker.reevaluate_polling_on_wakeup) { |
|
|
|
@ -998,8 +994,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
pollset->local_wakeup_cache = worker.wakeup_fd; |
|
|
|
|
/* check shutdown conditions */ |
|
|
|
|
if (pollset->shutting_down) { |
|
|
|
|
if (grpc_pollset_has_workers(pollset)) { |
|
|
|
|
grpc_pollset_kick(pollset, NULL); |
|
|
|
|
if (pollset_has_workers(pollset)) { |
|
|
|
|
pollset_kick(pollset, NULL); |
|
|
|
|
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
gpr_mu_unlock(pollset->mu); |
|
|
|
@ -1007,7 +1003,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_exec_ctx_flush(exec_ctx); |
|
|
|
|
/* Continuing to access pollset here is safe -- it is the caller's
|
|
|
|
|
* responsibility to not destroy when it has outstanding calls to |
|
|
|
|
* grpc_pollset_work. |
|
|
|
|
* pollset_work. |
|
|
|
|
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ |
|
|
|
|
gpr_mu_lock(pollset->mu); |
|
|
|
|
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) { |
|
|
|
@ -1018,27 +1014,27 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
*worker_hdl = NULL; |
|
|
|
|
GPR_TIMER_END("grpc_pollset_work", 0); |
|
|
|
|
GPR_TIMER_END("pollset_work", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
GPR_ASSERT(!pollset->shutting_down); |
|
|
|
|
pollset->shutting_down = 1; |
|
|
|
|
pollset->shutdown_done = closure; |
|
|
|
|
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
if (!grpc_pollset_has_workers(pollset)) { |
|
|
|
|
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
if (!pollset_has_workers(pollset)) { |
|
|
|
|
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); |
|
|
|
|
} |
|
|
|
|
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && |
|
|
|
|
!grpc_pollset_has_workers(pollset)) { |
|
|
|
|
!pollset_has_workers(pollset)) { |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
finish_shutdown(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, |
|
|
|
|
gpr_timespec now) { |
|
|
|
|
static int poll_deadline_to_millis_timeout(gpr_timespec deadline, |
|
|
|
|
gpr_timespec now) { |
|
|
|
|
gpr_timespec timeout; |
|
|
|
|
static const int64_t max_spin_polling_us = 10; |
|
|
|
|
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { |
|
|
|
@ -1084,7 +1080,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(pollset->mu); |
|
|
|
|
/* First we need to ensure that nobody is polling concurrently */ |
|
|
|
|
GPR_ASSERT(!grpc_pollset_has_workers(pollset)); |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
|
|
|
|
|
gpr_free(up_args); |
|
|
|
|
/* At this point the pollset may no longer be a unary poller. In that case
|
|
|
|
@ -1099,7 +1095,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, |
|
|
|
|
pollset->called_shutdown = 1; |
|
|
|
|
finish_shutdown(exec_ctx, pollset); |
|
|
|
|
} |
|
|
|
|
} else if (grpc_fd_is_orphaned(fd)) { |
|
|
|
|
} else if (fd_is_orphaned(fd)) { |
|
|
|
|
/* Don't try to add it to anything, we'll drop our ref on it below */ |
|
|
|
|
} else if (pollset->vtable != original_vtable) { |
|
|
|
|
pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); |
|
|
|
@ -1108,9 +1104,8 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, |
|
|
|
|
fds[0] = pollset->data.ptr; |
|
|
|
|
fds[1] = fd; |
|
|
|
|
|
|
|
|
|
if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { |
|
|
|
|
grpc_platform_become_multipoller(exec_ctx, pollset, fds, |
|
|
|
|
GPR_ARRAY_SIZE(fds)); |
|
|
|
|
if (fds[0] && !fd_is_orphaned(fds[0])) { |
|
|
|
|
platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); |
|
|
|
|
GRPC_FD_UNREF(fds[0], "basicpoll"); |
|
|
|
|
} else { |
|
|
|
|
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
|
|
|
|
@ -1135,7 +1130,7 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
GPR_ASSERT(fd); |
|
|
|
|
if (fd == pollset->data.ptr) goto exit; |
|
|
|
|
|
|
|
|
|
if (!grpc_pollset_has_workers(pollset)) { |
|
|
|
|
if (!pollset_has_workers(pollset)) { |
|
|
|
|
/* Fast path -- no in flight cbs */ |
|
|
|
|
/* TODO(klempner): Comment this out and fix any test failures or establish
|
|
|
|
|
* they are due to timing issues */ |
|
|
|
@ -1146,9 +1141,8 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
if (fds[0] == NULL) { |
|
|
|
|
pollset->data.ptr = fd; |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll"); |
|
|
|
|
} else if (!grpc_fd_is_orphaned(fds[0])) { |
|
|
|
|
grpc_platform_become_multipoller(exec_ctx, pollset, fds, |
|
|
|
|
GPR_ARRAY_SIZE(fds)); |
|
|
|
|
} else if (!fd_is_orphaned(fds[0])) { |
|
|
|
|
platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); |
|
|
|
|
GRPC_FD_UNREF(fds[0], "basicpoll"); |
|
|
|
|
} else { |
|
|
|
|
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
|
|
|
|
@ -1172,7 +1166,7 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
up_args->promotion_closure.cb_arg = up_args; |
|
|
|
|
|
|
|
|
|
grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); |
|
|
|
|
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
|
|
|
|
|
exit: |
|
|
|
|
if (and_unlock_pollset) { |
|
|
|
@ -1196,11 +1190,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
nfds_t nfds; |
|
|
|
|
|
|
|
|
|
fd = pollset->data.ptr; |
|
|
|
|
if (fd && grpc_fd_is_orphaned(fd)) { |
|
|
|
|
if (fd && fd_is_orphaned(fd)) { |
|
|
|
|
GRPC_FD_UNREF(fd, "basicpoll"); |
|
|
|
|
fd = pollset->data.ptr = NULL; |
|
|
|
|
} |
|
|
|
|
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
timeout = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); |
|
|
|
|
pfd[0].events = POLLIN; |
|
|
|
|
pfd[0].revents = 0; |
|
|
|
@ -1213,8 +1207,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
pfd[2].revents = 0; |
|
|
|
|
GRPC_FD_REF(fd, "basicpoll_begin"); |
|
|
|
|
gpr_mu_unlock(pollset->mu); |
|
|
|
|
pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, |
|
|
|
|
POLLOUT, &fd_watcher); |
|
|
|
|
pfd[2].events = |
|
|
|
|
(short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); |
|
|
|
|
if (pfd[2].events != 0) { |
|
|
|
|
nfds++; |
|
|
|
|
} |
|
|
|
@ -1237,11 +1231,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); |
|
|
|
|
} |
|
|
|
|
if (fd) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
} else if (r == 0) { |
|
|
|
|
if (fd) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (pfd[0].revents & POLLIN_CHECK) { |
|
|
|
@ -1251,10 +1245,10 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); |
|
|
|
|
} |
|
|
|
|
if (nfds > 2) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, |
|
|
|
|
pfd[2].revents & POLLOUT_CHECK); |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, |
|
|
|
|
pfd[2].revents & POLLOUT_CHECK); |
|
|
|
|
} else if (fd) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &fd_watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1286,6 +1280,8 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { |
|
|
|
|
* pollset_multipoller_with_poll_posix.c |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
#ifndef GPR_LINUX_MULTIPOLL_WITH_EPOLL |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
/* all polled fds */ |
|
|
|
|
size_t fd_count; |
|
|
|
@ -1335,7 +1331,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( |
|
|
|
|
struct pollfd *pfds; |
|
|
|
|
|
|
|
|
|
h = pollset->data.ptr; |
|
|
|
|
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
timeout = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
/* TODO(ctiller): perform just one malloc here if we exceed the inline case */ |
|
|
|
|
pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); |
|
|
|
|
watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); |
|
|
|
@ -1348,7 +1344,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( |
|
|
|
|
pfds[1].events = POLLIN; |
|
|
|
|
pfds[1].revents = 0; |
|
|
|
|
for (i = 0; i < h->fd_count; i++) { |
|
|
|
|
int remove = grpc_fd_is_orphaned(h->fds[i]); |
|
|
|
|
int remove = fd_is_orphaned(h->fds[i]); |
|
|
|
|
for (j = 0; !remove && j < h->del_count; j++) { |
|
|
|
|
if (h->fds[i] == h->dels[j]) remove = 1; |
|
|
|
|
} |
|
|
|
@ -1370,8 +1366,8 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( |
|
|
|
|
gpr_mu_unlock(pollset->mu); |
|
|
|
|
|
|
|
|
|
for (i = 2; i < pfd_count; i++) { |
|
|
|
|
pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, |
|
|
|
|
POLLIN, POLLOUT, &watchers[i]); |
|
|
|
|
pfds[i].events = (short)fd_begin_poll(watchers[i].fd, pollset, worker, |
|
|
|
|
POLLIN, POLLOUT, &watchers[i]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
|
|
|
|
@ -1385,11 +1381,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( |
|
|
|
|
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); |
|
|
|
|
} |
|
|
|
|
for (i = 2; i < pfd_count; i++) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &watchers[i], 0, 0); |
|
|
|
|
} |
|
|
|
|
} else if (r == 0) { |
|
|
|
|
for (i = 2; i < pfd_count; i++) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &watchers[i], 0, 0); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (pfds[0].revents & POLLIN_CHECK) { |
|
|
|
@ -1400,11 +1396,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( |
|
|
|
|
} |
|
|
|
|
for (i = 2; i < pfd_count; i++) { |
|
|
|
|
if (watchers[i].fd == NULL) { |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &watchers[i], 0, 0); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, |
|
|
|
|
pfds[i].revents & POLLOUT_CHECK); |
|
|
|
|
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, |
|
|
|
|
pfds[i].revents & POLLOUT_CHECK); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1439,9 +1435,9 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = { |
|
|
|
|
multipoll_with_poll_pollset_finish_shutdown, |
|
|
|
|
multipoll_with_poll_pollset_destroy}; |
|
|
|
|
|
|
|
|
|
void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, grpc_fd **fds, |
|
|
|
|
size_t nfds) { |
|
|
|
|
static void poll_become_multipoller(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset *pollset, grpc_fd **fds, |
|
|
|
|
size_t nfds) { |
|
|
|
|
size_t i; |
|
|
|
|
poll_hdr *h = gpr_malloc(sizeof(poll_hdr)); |
|
|
|
|
pollset->vtable = &multipoll_with_poll_pollset; |
|
|
|
@ -1458,6 +1454,8 @@ void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#endif /* !GPR_LINUX_MULTIPOLL_WITH_EPOLL */ |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* pollset_multipoller_with_epoll_posix.c |
|
|
|
|
*/ |
|
|
|
@ -1518,7 +1516,7 @@ static void remove_epoll_fd_from_global_list(int epoll_fd) { |
|
|
|
|
gpr_mu_unlock(&epoll_fd_list_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_remove_fd_from_all_epoll_sets(int fd) { |
|
|
|
|
static void remove_fd_from_all_epoll_sets(int fd) { |
|
|
|
|
int err; |
|
|
|
|
gpr_once_init(&init_epoll_fd_list_mu, init_mu); |
|
|
|
|
gpr_mu_lock(&epoll_fd_list_mu); |
|
|
|
@ -1554,7 +1552,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
/* We pretend to be polling whilst adding an fd to keep the fd from being
|
|
|
|
|
closed during the add. This may result in a spurious wakeup being assigned |
|
|
|
|
to this pollset whilst adding, but that should be benign. */ |
|
|
|
|
GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0); |
|
|
|
|
GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0); |
|
|
|
|
if (watcher.fd != NULL) { |
|
|
|
|
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); |
|
|
|
|
ev.data.ptr = fd; |
|
|
|
@ -1567,14 +1565,14 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
grpc_fd_end_poll(exec_ctx, &watcher, 0, 0); |
|
|
|
|
fd_end_poll(exec_ctx, &watcher, 0, 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
bool iomgr_status) { |
|
|
|
|
delayed_add *da = arg; |
|
|
|
|
|
|
|
|
|
if (!grpc_fd_is_orphaned(da->fd)) { |
|
|
|
|
if (!fd_is_orphaned(da->fd)) { |
|
|
|
|
finally_add_fd(exec_ctx, da->pollset, da->fd); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1633,7 +1631,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(pollset->mu); |
|
|
|
|
|
|
|
|
|
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
timeout_ms = poll_deadline_to_millis_timeout(deadline, now); |
|
|
|
|
|
|
|
|
|
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); |
|
|
|
|
pfds[0].events = POLLIN; |
|
|
|
@ -1681,10 +1679,10 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( |
|
|
|
|
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); |
|
|
|
|
} else { |
|
|
|
|
if (read_ev || cancel) { |
|
|
|
|
grpc_fd_become_readable(exec_ctx, fd); |
|
|
|
|
fd_become_readable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
if (write_ev || cancel) { |
|
|
|
|
grpc_fd_become_writable(exec_ctx, fd); |
|
|
|
|
fd_become_writable(exec_ctx, fd); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1743,12 +1741,9 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_platform_become_multipoller_type grpc_platform_become_multipoller = |
|
|
|
|
epoll_become_multipoller; |
|
|
|
|
|
|
|
|
|
#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ |
|
|
|
|
|
|
|
|
|
void grpc_remove_fd_from_all_epoll_sets(int fd) {} |
|
|
|
|
static void remove_fd_from_all_epoll_sets(int fd) {} |
|
|
|
|
|
|
|
|
|
#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ |
|
|
|
|
|
|
|
|
@ -1756,14 +1751,14 @@ void grpc_remove_fd_from_all_epoll_sets(int fd) {} |
|
|
|
|
* pollset_set_posix.c |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
grpc_pollset_set *grpc_pollset_set_create(void) { |
|
|
|
|
static grpc_pollset_set *pollset_set_create(void) { |
|
|
|
|
grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); |
|
|
|
|
memset(pollset_set, 0, sizeof(*pollset_set)); |
|
|
|
|
gpr_mu_init(&pollset_set->mu); |
|
|
|
|
return pollset_set; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { |
|
|
|
|
static void pollset_set_destroy(grpc_pollset_set *pollset_set) { |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_destroy(&pollset_set->mu); |
|
|
|
|
for (i = 0; i < pollset_set->fd_count; i++) { |
|
|
|
@ -1775,9 +1770,9 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { |
|
|
|
|
gpr_free(pollset_set); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
size_t i, j; |
|
|
|
|
gpr_mu_lock(&pollset_set->mu); |
|
|
|
|
if (pollset_set->pollset_count == pollset_set->pollset_capacity) { |
|
|
|
@ -1789,10 +1784,10 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
pollset_set->pollsets[pollset_set->pollset_count++] = pollset; |
|
|
|
|
for (i = 0, j = 0; i < pollset_set->fd_count; i++) { |
|
|
|
|
if (grpc_fd_is_orphaned(pollset_set->fds[i])) { |
|
|
|
|
if (fd_is_orphaned(pollset_set->fds[i])) { |
|
|
|
|
GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); |
|
|
|
|
} else { |
|
|
|
|
grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); |
|
|
|
|
pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); |
|
|
|
|
pollset_set->fds[j++] = pollset_set->fds[i]; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1800,9 +1795,9 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_mu_unlock(&pollset_set->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, |
|
|
|
|
grpc_pollset *pollset) { |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_lock(&pollset_set->mu); |
|
|
|
|
for (i = 0; i < pollset_set->pollset_count; i++) { |
|
|
|
@ -1816,9 +1811,9 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_mu_unlock(&pollset_set->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *bag, |
|
|
|
|
grpc_pollset_set *item) { |
|
|
|
|
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *bag, |
|
|
|
|
grpc_pollset_set *item) { |
|
|
|
|
size_t i, j; |
|
|
|
|
gpr_mu_lock(&bag->mu); |
|
|
|
|
if (bag->pollset_set_count == bag->pollset_set_capacity) { |
|
|
|
@ -1829,10 +1824,10 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
bag->pollset_sets[bag->pollset_set_count++] = item; |
|
|
|
|
for (i = 0, j = 0; i < bag->fd_count; i++) { |
|
|
|
|
if (grpc_fd_is_orphaned(bag->fds[i])) { |
|
|
|
|
if (fd_is_orphaned(bag->fds[i])) { |
|
|
|
|
GRPC_FD_UNREF(bag->fds[i], "pollset_set"); |
|
|
|
|
} else { |
|
|
|
|
grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); |
|
|
|
|
pollset_set_add_fd(exec_ctx, item, bag->fds[i]); |
|
|
|
|
bag->fds[j++] = bag->fds[i]; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1840,9 +1835,9 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_mu_unlock(&bag->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *bag, |
|
|
|
|
grpc_pollset_set *item) { |
|
|
|
|
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *bag, |
|
|
|
|
grpc_pollset_set *item) { |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_lock(&bag->mu); |
|
|
|
|
for (i = 0; i < bag->pollset_set_count; i++) { |
|
|
|
@ -1856,8 +1851,8 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_mu_unlock(&bag->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, grpc_fd *fd) { |
|
|
|
|
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, grpc_fd *fd) { |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_lock(&pollset_set->mu); |
|
|
|
|
if (pollset_set->fd_count == pollset_set->fd_capacity) { |
|
|
|
@ -1868,16 +1863,16 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
GRPC_FD_REF(fd, "pollset_set"); |
|
|
|
|
pollset_set->fds[pollset_set->fd_count++] = fd; |
|
|
|
|
for (i = 0; i < pollset_set->pollset_count; i++) { |
|
|
|
|
grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); |
|
|
|
|
pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); |
|
|
|
|
} |
|
|
|
|
for (i = 0; i < pollset_set->pollset_set_count; i++) { |
|
|
|
|
grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); |
|
|
|
|
pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pollset_set->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, grpc_fd *fd) { |
|
|
|
|
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_pollset_set *pollset_set, grpc_fd *fd) { |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_lock(&pollset_set->mu); |
|
|
|
|
for (i = 0; i < pollset_set->fd_count; i++) { |
|
|
|
@ -1890,9 +1885,61 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (i = 0; i < pollset_set->pollset_set_count; i++) { |
|
|
|
|
grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); |
|
|
|
|
pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pollset_set->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* event engine binding |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void shutdown_engine(void) { |
|
|
|
|
fd_global_shutdown(); |
|
|
|
|
pollset_global_shutdown(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_event_engine_vtable vtable = { |
|
|
|
|
.pollset_size = sizeof(grpc_pollset), |
|
|
|
|
|
|
|
|
|
.fd_create = fd_create, |
|
|
|
|
.fd_wrapped_fd = fd_wrapped_fd, |
|
|
|
|
.fd_orphan = fd_orphan, |
|
|
|
|
.fd_shutdown = fd_shutdown, |
|
|
|
|
.fd_notify_on_read = fd_notify_on_read, |
|
|
|
|
.fd_notify_on_write = fd_notify_on_write, |
|
|
|
|
|
|
|
|
|
.pollset_init = pollset_init, |
|
|
|
|
.pollset_shutdown = pollset_shutdown, |
|
|
|
|
.pollset_reset = pollset_reset, |
|
|
|
|
.pollset_destroy = pollset_destroy, |
|
|
|
|
.pollset_work = pollset_work, |
|
|
|
|
.pollset_kick = pollset_kick, |
|
|
|
|
.pollset_add_fd = pollset_add_fd, |
|
|
|
|
|
|
|
|
|
.pollset_set_create = pollset_set_create, |
|
|
|
|
.pollset_set_destroy = pollset_set_destroy, |
|
|
|
|
.pollset_set_add_pollset = pollset_set_add_pollset, |
|
|
|
|
.pollset_set_del_pollset = pollset_set_del_pollset, |
|
|
|
|
.pollset_set_add_pollset_set = pollset_set_add_pollset_set, |
|
|
|
|
.pollset_set_del_pollset_set = pollset_set_del_pollset_set, |
|
|
|
|
.pollset_set_add_fd = pollset_set_add_fd, |
|
|
|
|
.pollset_set_del_fd = pollset_set_del_fd, |
|
|
|
|
|
|
|
|
|
.kick_poller = kick_poller, |
|
|
|
|
|
|
|
|
|
.shutdown_engine = shutdown_engine, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { |
|
|
|
|
#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL |
|
|
|
|
platform_become_multipoller = epoll_become_multipoller; |
|
|
|
|
#else |
|
|
|
|
platform_become_multipoller = poll_become_multipoller; |
|
|
|
|
#endif |
|
|
|
|
fd_global_init(); |
|
|
|
|
pollset_global_init(); |
|
|
|
|
return &vtable; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#endif |
|
|
|
|