From d178524a748be300bc370a2ade797beda1c5b7bc Mon Sep 17 00:00:00 2001 From: David Klempner Date: Wed, 28 Jan 2015 17:00:21 -0800 Subject: [PATCH] Freelist grpc_fd objects This is necessary for efficient implementations where multiple threads simultaneously sit in epoll_wait and the like on the same pollset. --- src/core/iomgr/fd_posix.c | 84 ++++++++++++++++++++++++++++++------ src/core/iomgr/fd_posix.h | 4 ++ src/core/iomgr/iomgr.c | 2 +- src/core/iomgr/iomgr_posix.c | 11 ++++- 4 files changed, 84 insertions(+), 17 deletions(-) diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 9f70a26c643..b67c6cde709 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -47,12 +47,63 @@ enum descriptor_state { NOT_READY, READY, WAITING }; +/* We need to keep a freelist not because of any concerns of malloc performance + * but instead so that implementations with multiple threads in (for example) + * epoll_wait deal with the race between pollset removal and incoming poll + * notifications. + * + * The problem is that the poller ultimately holds a reference to this + * object, so it is very difficult to know when is safe to free it, at least + * without some expensive synchronization. + * + * If we keep the object freelisted, in the worst case losing this race just + * becomes a spurious read notification on a reused fd. + */ +/* TODO(klempner): We could use some form of polling generation count to know + * when these are safe to free. */ +/* TODO(klempner): Consider disabling freelisting if we don't have multiple + * threads in poll on the same fd */ +/* TODO(klempner): Batch these allocations to reduce fragmentation */ +static grpc_fd *fd_freelist = NULL; +static gpr_mu fd_freelist_mu; + +static void freelist_fd(grpc_fd *fd) { + gpr_free(fd->watchers); + gpr_mu_lock(&fd_freelist_mu); + fd->freelist_next = fd_freelist; + fd_freelist = fd; + gpr_mu_unlock(&fd_freelist_mu); +} + +static grpc_fd *alloc_fd(int fd) { + grpc_fd *r = NULL; + gpr_mu_lock(&fd_freelist_mu); + if (fd_freelist != NULL) { + r = fd_freelist; + fd_freelist = fd_freelist->freelist_next; + } + gpr_mu_unlock(&fd_freelist_mu); + if (r == NULL) { + r = gpr_malloc(sizeof(grpc_fd)); + gpr_mu_init(&r->set_state_mu); + gpr_mu_init(&r->watcher_mu); + } + gpr_atm_rel_store(&r->refst, 1); + gpr_atm_rel_store(&r->readst.state, NOT_READY); + gpr_atm_rel_store(&r->writest.state, NOT_READY); + gpr_atm_rel_store(&r->shutdown, 0); + r->fd = fd; + r->watchers = NULL; + r->watcher_count = 0; + r->watcher_capacity = 0; + r->freelist_next = NULL; + return r; +} + static void destroy(grpc_fd *fd) { - grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); gpr_mu_destroy(&fd->set_state_mu); - gpr_free(fd->watchers); + gpr_mu_destroy(&fd->watcher_mu); gpr_free(fd); - grpc_iomgr_unref(); } static void ref_by(grpc_fd *fd, int n) { @@ -61,25 +112,30 @@ static void ref_by(grpc_fd *fd, int n) { static void unref_by(grpc_fd *fd, int n) { if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) { + grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + freelist_fd(fd); + grpc_iomgr_unref(); + } +} + +void grpc_fd_global_init(void) { + gpr_mu_init(&fd_freelist_mu); +} + +void grpc_fd_global_shutdown(void) { + while (fd_freelist != NULL) { + grpc_fd *fd = fd_freelist; + fd_freelist = fd_freelist->freelist_next; destroy(fd); } + gpr_mu_destroy(&fd_freelist_mu); } static void do_nothing(void *ignored, int success) {} grpc_fd *grpc_fd_create(int fd) { - grpc_fd *r = gpr_malloc(sizeof(grpc_fd)); + grpc_fd *r = alloc_fd(fd); grpc_iomgr_ref(); - gpr_atm_rel_store(&r->refst, 1); - gpr_atm_rel_store(&r->readst.state, NOT_READY); - gpr_atm_rel_store(&r->writest.state, NOT_READY); - gpr_mu_init(&r->set_state_mu); - gpr_mu_init(&r->watcher_mu); - gpr_atm_rel_store(&r->shutdown, 0); - r->fd = fd; - r->watchers = NULL; - r->watcher_count = 0; - r->watcher_capacity = 0; grpc_pollset_add_fd(grpc_backup_pollset(), r); return r; } diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 232de0c3e08..f42ae195790 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -69,6 +69,7 @@ typedef struct grpc_fd { grpc_iomgr_cb_func on_done; void *on_done_user_data; + struct grpc_fd *freelist_next; } grpc_fd; /* Create a wrapped file descriptor. @@ -135,4 +136,7 @@ void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); void grpc_fd_ref(grpc_fd *fd); void grpc_fd_unref(grpc_fd *fd); +void grpc_fd_global_init(void); +void grpc_fd_global_shutdown(void); + #endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 7f266ab235c..8989b491d58 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -98,7 +98,6 @@ void grpc_iomgr_shutdown(void) { gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); - grpc_iomgr_platform_shutdown(); gpr_mu_lock(&g_mu); g_shutdown = 1; @@ -129,6 +128,7 @@ void grpc_iomgr_shutdown(void) { gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + grpc_iomgr_platform_shutdown(); grpc_alarm_list_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_cv); diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 61fec6bc532..9297f08e99a 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -32,7 +32,14 @@ */ #include "src/core/iomgr/iomgr_posix.h" +#include "src/core/iomgr/fd_posix.h" -void grpc_iomgr_platform_init(void) { grpc_pollset_global_init(); } +void grpc_iomgr_platform_init(void) { + grpc_fd_global_init(); + grpc_pollset_global_init(); +} -void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } +void grpc_iomgr_platform_shutdown(void) { + grpc_pollset_global_shutdown(); + grpc_fd_global_shutdown(); +}