Merge pull request #271 from dklempner/fd_freelist

Freelist grpc_fd objects
pull/281/head
Craig Tiller 10 years ago
commit 7d543e67b4
  1. 84
      src/core/iomgr/fd_posix.c
  2. 4
      src/core/iomgr/fd_posix.h
  3. 2
      src/core/iomgr/iomgr.c
  4. 11
      src/core/iomgr/iomgr_posix.c

@ -47,12 +47,63 @@
enum descriptor_state { NOT_READY, READY, WAITING };
/* We need to keep a freelist not because of any concerns of malloc performance
* but instead so that implementations with multiple threads in (for example)
* epoll_wait deal with the race between pollset removal and incoming poll
* notifications.
*
* The problem is that the poller ultimately holds a reference to this
* object, so it is very difficult to know when is safe to free it, at least
* without some expensive synchronization.
*
* If we keep the object freelisted, in the worst case losing this race just
* becomes a spurious read notification on a reused fd.
*/
/* TODO(klempner): We could use some form of polling generation count to know
* when these are safe to free. */
/* TODO(klempner): Consider disabling freelisting if we don't have multiple
* threads in poll on the same fd */
/* TODO(klempner): Batch these allocations to reduce fragmentation */
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
gpr_free(fd->watchers);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
gpr_mu_unlock(&fd_freelist_mu);
}
static grpc_fd *alloc_fd(int fd) {
grpc_fd *r = NULL;
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist != NULL) {
r = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
}
gpr_mu_unlock(&fd_freelist_mu);
if (r == NULL) {
r = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst.state, NOT_READY);
gpr_atm_rel_store(&r->writest.state, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
r->watchers = NULL;
r->watcher_count = 0;
r->watcher_capacity = 0;
r->freelist_next = NULL;
return r;
}
static void destroy(grpc_fd *fd) {
grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
gpr_mu_destroy(&fd->set_state_mu);
gpr_free(fd->watchers);
gpr_mu_destroy(&fd->watcher_mu);
gpr_free(fd);
grpc_iomgr_unref();
}
static void ref_by(grpc_fd *fd, int n) {
@ -61,25 +112,30 @@ static void ref_by(grpc_fd *fd, int n) {
static void unref_by(grpc_fd *fd, int n) {
if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) {
grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
freelist_fd(fd);
grpc_iomgr_unref();
}
}
void grpc_fd_global_init(void) {
gpr_mu_init(&fd_freelist_mu);
}
void grpc_fd_global_shutdown(void) {
while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
destroy(fd);
}
gpr_mu_destroy(&fd_freelist_mu);
}
static void do_nothing(void *ignored, int success) {}
grpc_fd *grpc_fd_create(int fd) {
grpc_fd *r = gpr_malloc(sizeof(grpc_fd));
grpc_fd *r = alloc_fd(fd);
grpc_iomgr_ref();
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst.state, NOT_READY);
gpr_atm_rel_store(&r->writest.state, NOT_READY);
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
r->watchers = NULL;
r->watcher_count = 0;
r->watcher_capacity = 0;
grpc_pollset_add_fd(grpc_backup_pollset(), r);
return r;
}

@ -69,6 +69,7 @@ typedef struct grpc_fd {
grpc_iomgr_cb_func on_done;
void *on_done_user_data;
struct grpc_fd *freelist_next;
} grpc_fd;
/* Create a wrapped file descriptor.
@ -135,4 +136,7 @@ void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
void grpc_fd_ref(grpc_fd *fd);
void grpc_fd_unref(grpc_fd *fd);
void grpc_fd_global_init(void);
void grpc_fd_global_shutdown(void);
#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */

@ -98,7 +98,6 @@ void grpc_iomgr_shutdown(void) {
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
grpc_iomgr_platform_shutdown();
gpr_mu_lock(&g_mu);
g_shutdown = 1;
@ -129,6 +128,7 @@ void grpc_iomgr_shutdown(void) {
gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
grpc_iomgr_platform_shutdown();
grpc_alarm_list_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_cv);

@ -32,7 +32,14 @@
*/
#include "src/core/iomgr/iomgr_posix.h"
#include "src/core/iomgr/fd_posix.h"
void grpc_iomgr_platform_init(void) { grpc_pollset_global_init(); }
void grpc_iomgr_platform_init(void) {
grpc_fd_global_init();
grpc_pollset_global_init();
}
void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
void grpc_iomgr_platform_shutdown(void) {
grpc_pollset_global_shutdown();
grpc_fd_global_shutdown();
}

Loading…
Cancel
Save