Remove refcnt from fd

pull/10970/head
Sree Kuchibhotla 8 years ago
parent 50f85f726b
commit 8ed56f5a4b
  1. 112
      src/core/lib/iomgr/ev_epoll_thread_pool_linux.c

@ -72,6 +72,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
/* The alarm system needs to be able to wakeup 'some poller' sometimes
* (specifically when a new alarm needs to be triggered earlier than the next
* alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
* case occurs. */
/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
* sure to wake up one polling thread (which can wake up other threads if
* needed) */
@ -87,15 +92,9 @@ struct grpc_fd {
struct polling_island *pi;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
bits 1-n : refcount
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
/* The fd is either closed or we relinquished control of it. In either
cases, this indicates that the 'fd' on this structure is no longer
valid */
/* The fd is either closed or we relinquished control of it. In either cases,
this indicates that the 'fd' on this structure is no longer valid */
bool orphaned;
gpr_atm read_closure;
@ -182,9 +181,7 @@ struct grpc_pollset {
/*******************************************************************************
* Pollset-set Declarations
*/
struct grpc_pollset_set {
void *no_op;
};
struct grpc_pollset_set {};
/*****************************************************************************
* Dedicated polling threads and pollsets - Declarations
@ -521,57 +518,31 @@ static void polling_island_global_shutdown() {
* becomes a spurious read notification on a reused fd.
*/
/* The alarm system needs to be able to wakeup 'some poller' sometimes
* (specifically when a new alarm needs to be triggered earlier than the next
* alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
* case occurs. */
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
// #define GRPC_FD_REF_COUNT_DEBUG
#ifdef GRPC_FD_REF_COUNT_DEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
int line) {
gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
(void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
#else
#define REF_BY(fd, n, reason) ref_by(fd, n)
#define UNREF_BY(fd, n, reason) unref_by(fd, n)
static void ref_by(grpc_fd *fd, int n) {
#endif
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
}
static grpc_fd *get_fd_from_freelist() {
grpc_fd *new_fd = NULL;
#ifdef GRPC_FD_REF_COUNT_DEBUG
static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
int line) {
gpr_atm old;
gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
(void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
#else
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old;
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
/* Add the fd to the freelist */
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
grpc_lfev_destroy(&fd->write_closure);
gpr_mu_unlock(&fd_freelist_mu);
} else {
GPR_ASSERT(old > n);
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist != NULL) {
new_fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
}
gpr_mu_unlock(&fd_freelist_mu);
return new_fd;
}
static void add_fd_to_freelist(grpc_fd *fd) {
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
grpc_lfev_destroy(&fd->write_closure);
gpr_mu_unlock(&fd_freelist_mu);
}
static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
@ -589,15 +560,7 @@ static void fd_global_shutdown(void) {
}
static grpc_fd *fd_create(int fd, const char *name) {
grpc_fd *new_fd = NULL;
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist != NULL) {
new_fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
}
gpr_mu_unlock(&fd_freelist_mu);
grpc_fd *new_fd = get_fd_from_freelist();
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->mu);
@ -609,7 +572,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
gpr_mu_lock(&new_fd->mu);
new_fd->pi = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
new_fd->orphaned = false;
grpc_lfev_init(&new_fd->read_closure);
@ -623,9 +585,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
#ifdef GRPC_FD_REF_COUNT_DEBUG
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
#endif
gpr_free(fd_name);
/* Associate the fd with one of the dedicated pi */
@ -665,14 +625,9 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->orphaned = true;
/* Remove the active status but keep referenced. We want this grpc_fd struct
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
/* Remove the fd from the polling island */
if (fd->pi != NULL) {
polling_island *pi = fd->pi;
polling_island_remove_fd(pi, fd, is_fd_closed, &error);
polling_island_remove_fd(fd->pi, fd, is_fd_closed, &error);
unref_pi = fd->pi;
fd->pi = NULL;
}
@ -680,7 +635,10 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
/* We are done with this fd. Release it (i.e add back to freelist) */
add_fd_to_freelist(fd);
if (unref_pi != NULL) {
/* Unref stale polling island here, outside the fd lock above.
The polling island owns a workqueue which owns an fd, and unreffing

Loading…
Cancel
Save