Significantly refactor the polling island locking and refcounting code

pull/6803/head
Sree Kuchibhotla 9 years ago
parent 2e12db9c31
commit 2f8ade0b9d
  1. 456
      src/core/lib/iomgr/ev_epoll_linux.c

@ -140,18 +140,40 @@ static void fd_global_shutdown(void);
#define CLOSURE_READY ((grpc_closure *)1) #define CLOSURE_READY ((grpc_closure *)1)
/******************************************************************************* /*******************************************************************************
* Polling-island Declarations * Polling island Declarations
*/ */
/* TODO: sree: Consider making ref_cnt and merged_to to gpr_atm - This would
* significantly reduce the number of mutex acquisition calls. */ // #define GRPC_PI_REF_COUNT_DEBUG
#ifdef GRPC_PI_REF_COUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
#define PI_UNREF(p, r) pi_unref((p), 1)
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
typedef struct polling_island { typedef struct polling_island {
gpr_mu mu; gpr_mu mu;
int ref_cnt; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
/* Points to the polling_island this merged into. Once the ref count becomes zero, this structure is destroyed which means
* If merged_to is not NULL, all the remaining fields (except mu and ref_cnt) we should ensure that there is never a scenario where a PI_ADD_REF() is
* are invalid and must be ignored */ racing with a PI_UNREF() that just made the ref_count zero. */
struct polling_island *merged_to; gpr_atm ref_count;
/* Pointer to the polling_island this merged into.
* merged_to value is only set once in polling_island's lifetime (and that too
* only if the island is merged with another island). Because of this, we can
* use gpr_atm type here so that we can do atomic access on this and reduce
* lock contention on 'mu' mutex.
*
* Note that if this field is not NULL (i.e not 0), all the remaining fields
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
/* The fd of the underlying epoll set */ /* The fd of the underlying epoll set */
int epoll_fd; int epoll_fd;
@ -236,6 +258,8 @@ static grpc_wakeup_fd polling_island_wakeup_fd;
static gpr_mu g_pi_freelist_mu; static gpr_mu g_pi_freelist_mu;
static polling_island *g_pi_freelist = NULL; static polling_island *g_pi_freelist = NULL;
static void polling_island_delete(); /* Forward declaration */
#ifdef GRPC_TSAN #ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and /* Currently TSAN may incorrectly flag data races between epoll_ctl and
epoll_wait for any grpc_fd structs that are added to the epoll set via epoll_wait for any grpc_fd structs that are added to the epoll set via
@ -247,6 +271,51 @@ static polling_island *g_pi_freelist = NULL;
gpr_atm g_epoll_sync; gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */ #endif /* defined(GRPC_TSAN) */
#ifdef GRPC_PI_REF_COUNT_DEBUG
long pi_add_ref(polling_island *pi, int ref_cnt);
long pi_unref(polling_island *pi, int ref_cnt);
void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
int line) {
long old_cnt = pi_add_ref(pi, ref_cnt);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
}
void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
int line) {
long old_cnt = pi_unref(pi, ref_cnt);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
}
#endif
long pi_add_ref(polling_island *pi, int ref_cnt) {
return gpr_atm_no_barrier_fetch_add(&pi->ref_count, ref_cnt);
}
long pi_unref(polling_island *pi, int ref_cnt) {
long old_cnt = gpr_atm_no_barrier_fetch_add(&pi->ref_count, -ref_cnt);
/* If ref count went to zero, delete the polling island. Note that this need
not be done under a lock. Once the ref count goes to zero, we are
guaranteed that no one else holds a reference to the polling island (and
that there is no racing pi_add_ref() call either.
Also, if we are deleting the polling island and the merged_to field is
non-empty, we should remove a ref to the merged_to polling island
*/
if (old_cnt == ref_cnt) {
polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
polling_island_delete(pi);
if (next != NULL) {
PI_UNREF(next, "pi_delete"); /* Recursive call */
}
}
return old_cnt;
}
/* The caller is expected to hold pi->mu lock before calling this function */ /* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
size_t fd_count, bool add_fd_refs) { size_t fd_count, bool add_fd_refs) {
@ -355,8 +424,7 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
} }
} }
static polling_island *polling_island_create(grpc_fd *initial_fd, static polling_island *polling_island_create(grpc_fd *initial_fd) {
int initial_ref_cnt) {
polling_island *pi = NULL; polling_island *pi = NULL;
/* Try to get one from the polling island freelist */ /* Try to get one from the polling island freelist */
@ -377,6 +445,9 @@ static polling_island *polling_island_create(grpc_fd *initial_fd,
pi->fds = NULL; pi->fds = NULL;
} }
gpr_atm_no_barrier_store(&pi->ref_count, 0);
gpr_atm_no_barrier_store(&pi->merged_to, NULL);
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) { if (pi->epoll_fd < 0) {
@ -387,14 +458,12 @@ static polling_island *polling_island_create(grpc_fd *initial_fd,
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd); polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
pi->ref_cnt = initial_ref_cnt;
pi->merged_to = NULL;
pi->next_free = NULL; pi->next_free = NULL;
if (initial_fd != NULL) { if (initial_fd != NULL) {
/* It is not really needed to get the pi->mu lock here. If this is a newly /* Lock the polling island here just in case we got this structure from the
created polling island (or one that we got from the freelist), no one freelist and the polling island lock was not released yet (by the code
else would be holding a lock to it anyway */ that adds the polling island to the freelist) */
gpr_mu_lock(&pi->mu); gpr_mu_lock(&pi->mu);
polling_island_add_fds_locked(pi, &initial_fd, 1, true); polling_island_add_fds_locked(pi, &initial_fd, 1, true);
gpr_mu_unlock(&pi->mu); gpr_mu_unlock(&pi->mu);
@ -404,140 +473,136 @@ static polling_island *polling_island_create(grpc_fd *initial_fd,
} }
static void polling_island_delete(polling_island *pi) { static void polling_island_delete(polling_island *pi) {
GPR_ASSERT(pi->ref_cnt == 0);
GPR_ASSERT(pi->fd_cnt == 0); GPR_ASSERT(pi->fd_cnt == 0);
gpr_atm_rel_store(&pi->merged_to, NULL);
close(pi->epoll_fd); close(pi->epoll_fd);
pi->epoll_fd = -1; pi->epoll_fd = -1;
pi->merged_to = NULL;
gpr_mu_lock(&g_pi_freelist_mu); gpr_mu_lock(&g_pi_freelist_mu);
pi->next_free = g_pi_freelist; pi->next_free = g_pi_freelist;
g_pi_freelist = pi; g_pi_freelist = pi;
gpr_mu_unlock(&g_pi_freelist_mu); gpr_mu_unlock(&g_pi_freelist_mu);
} }
void polling_island_unref_and_unlock(polling_island *pi, int unref_by) { /* Gets the lock on the *latest* polling island i.e the last polling island in
pi->ref_cnt -= unref_by; the linked list (linked by 'merged_to' link). Call gpr_mu_unlock on the
int ref_cnt = pi->ref_cnt; returned polling island's mu.
GPR_ASSERT(ref_cnt >= 0); Usage: To lock/unlock polling island "pi", do the following:
polling_island *pi_latest = polling_island_lock(pi);
gpr_mu_unlock(&pi->mu); ...
... critical section ..
if (ref_cnt == 0) { ...
polling_island_delete(pi); gpr_mu_unlock(&pi_latest->mu); //NOTE: use pi_latest->mu. NOT pi->mu */
polling_island *polling_island_lock(polling_island *pi) {
polling_island *next = NULL;
while (true) {
next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
if (next == NULL) {
/* pi is the last node in the linked list. Get the lock and check again
(under the pi->mu lock) that pi is still the last node (because a merge
may have happend after the (next == NULL) check above and before
getting the pi->mu lock.
If pi is the last node, we are done. If not, unlock and continue
traversing the list */
gpr_mu_lock(&pi->mu);
next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
if (next == NULL) {
break;
} }
gpr_mu_unlock(&pi->mu);
} }
polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
int add_ref_by) {
polling_island *next = NULL;
gpr_mu_lock(&pi->mu);
while (pi->merged_to != NULL) {
next = pi->merged_to;
polling_island_unref_and_unlock(pi, unref_by);
pi = next; pi = next;
gpr_mu_lock(&pi->mu);
} }
pi->ref_cnt += add_ref_by;
return pi; return pi;
} }
void polling_island_pair_update_and_lock(polling_island **p, /* Gets the lock on the *latest* polling islands pointed by *p and *q.
polling_island **q) { This function is needed because calling the following block of code to obtain
locks on polling islands (*p and *q) is prone to deadlocks.
{
polling_island_lock(*p);
polling_island_lock(*q);
}
Usage/exmaple:
polling_island *p1;
polling_island *p2;
..
polling_island_lock_pair(&p1, &p2);
..
.. Critical section with both p1 and p2 locked
..
// Release locks
// **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
// polling_island_lock_pair() was called and if so, release the lock only
// once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
// they might be after the function returns:
if (p1 == p2) {
gpr_mu_unlock(&p1->mu)
} else {
gpr_mu_unlock(&p1->mu);
gpr_mu_unlock(&p2->mu);
}
*/
void polling_island_lock_pair(polling_island **p, polling_island **q) {
polling_island *pi_1 = *p; polling_island *pi_1 = *p;
polling_island *pi_2 = *q; polling_island *pi_2 = *q;
polling_island *temp = NULL; polling_island *next_1 = NULL;
bool pi_1_locked = false; polling_island *next_2 = NULL;
bool pi_2_locked = false;
int num_swaps = 0; /* The algorithm is simple:
- Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
/* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1 keep updating pi_1 and pi_2)
and pi_2 */ - Then obtain locks on the islands by following a lock order rule of
while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) { locking polling_island with lower address first
/* The following assertions are true at this point: Special case: Before obtaining the locks, check if pi_1 and pi_2 are
- pi_1 != pi_2 (else, the while loop would have exited) pointing to the same island. If that is the case, we can just call
- pi_1 MAY be locked polling_island_lock()
- pi_2 is NOT locked */ - After obtaining both the locks, double check that the polling islands
are still the last polling islands in their respective linked lists
/* To maintain lock order consistency, always lock polling_island node with (this is because there might have been polling island merges before
lower address first. we got the lock)
First, make sure pi_1 < pi_2 before proceeding any further. If it turns - If the polling islands are the last islands, we are done. If not,
out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked release the locks and continue the process from the first step */
at this point and having pi_1 locked would violate the lock order) and while (true) {
swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */ next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
if (pi_1 > pi_2) { while (next_1 != NULL) {
if (pi_1_locked) { pi_1 = next_1;
gpr_mu_unlock(&pi_1->mu); next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
pi_1_locked = false; }
next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
while (next_2 != NULL) {
pi_2 = next_2;
next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
} }
GPR_SWAP(polling_island *, pi_1, pi_2); if (pi_1 == pi_2) {
num_swaps++; pi_1 = pi_2 = polling_island_lock(pi_1);
break;
} }
/* The following assertions are true at this point: if (pi_1 < pi_2) {
- pi_1 != pi_2
- pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
- pi_1 MAYBE locked
- pi_2 is NOT locked */
/* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
if (!pi_1_locked) {
gpr_mu_lock(&pi_1->mu); gpr_mu_lock(&pi_1->mu);
pi_1_locked = true;
/* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
done locking this polling_island yet. Release the lock on this node and
advance pi_1 to the next node in the list; and go to the beginning of
the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
*/
if (pi_1->merged_to != NULL) {
temp = pi_1->merged_to;
polling_island_unref_and_unlock(pi_1, 1);
pi_1 = temp;
pi_1_locked = false;
continue;
}
}
/* The following assertions are true at this point:
- pi_1 is locked
- pi_2 is unlocked
- pi_1 != pi_2 */
gpr_mu_lock(&pi_2->mu); gpr_mu_lock(&pi_2->mu);
pi_2_locked = true; } else {
gpr_mu_lock(&pi_2->mu);
/* If pi_2 is not terminal node, we are not done locking this polling_island gpr_mu_lock(&pi_1->mu);
yet. Release the lock and update pi_2 to the next node in the list */
if (pi_2->merged_to != NULL) {
temp = pi_2->merged_to;
polling_island_unref_and_unlock(pi_2, 1);
pi_2 = temp;
pi_2_locked = false;
}
} }
/* At this point, either pi_1 == pi_2 AND/OR we got both locks */ next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
if (pi_1 == pi_2) { next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
/* We may or may not have gotten the lock. If we didn't, walk the rest of if (next_1 == NULL && next_2 == NULL) {
the polling_island list and get the lock */ break;
GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
if (!pi_1_locked) {
pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
}
} else {
GPR_ASSERT(pi_1_locked && pi_2_locked);
/* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
pi_1 and pi_2 point to the same polling_island lists they started off
with at the beginning of this function (i.e *p and *q respectively) */
if (num_swaps % 2 > 0) {
GPR_SWAP(polling_island *, pi_1, pi_2);
} }
gpr_mu_unlock(&pi_1->mu);
gpr_mu_unlock(&pi_2->mu);
} }
*p = pi_1; *p = pi_1;
@ -546,7 +611,7 @@ void polling_island_pair_update_and_lock(polling_island **p,
polling_island *polling_island_merge(polling_island *p, polling_island *q) { polling_island *polling_island_merge(polling_island *p, polling_island *q) {
/* Get locks on both the polling islands */ /* Get locks on both the polling islands */
polling_island_pair_update_and_lock(&p, &q); polling_island_lock_pair(&p, &q);
if (p == q) { if (p == q) {
/* Nothing needs to be done here */ /* Nothing needs to be done here */
@ -568,15 +633,14 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) {
/* Wakeup all the pollers (if any) on p so that they can pickup this change */ /* Wakeup all the pollers (if any) on p so that they can pickup this change */
polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd); polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
p->merged_to = q; /* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
/* - The merged polling island (i.e q) inherits all the ref counts of the gpr_mu_unlock(&p->mu);
island merging with it (i.e p) gpr_mu_unlock(&q->mu);
- The island p will lose a ref count */
q->ref_cnt += p->ref_cnt;
polling_island_unref_and_unlock(p, 1); /* Decrement refcount */
polling_island_unref_and_unlock(q, 0); /* Just Unlock. Don't decrement ref */
/* Return the merged polling island */
return q; return q;
} }
@ -667,6 +731,7 @@ static void unref_by(grpc_fd *fd, int n) {
fd->freelist_next = fd_freelist; fd->freelist_next = fd_freelist;
fd_freelist = fd; fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_iomgr_unregister_object(&fd->iomgr_object);
gpr_mu_unlock(&fd_freelist_mu); gpr_mu_unlock(&fd_freelist_mu);
} else { } else {
GPR_ASSERT(old > n); GPR_ASSERT(old > n);
@ -785,16 +850,20 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
REF_BY(fd, 1, reason); REF_BY(fd, 1, reason);
/* Remove the fd from the polling island: /* Remove the fd from the polling island:
- Update the fd->polling_island to point to the latest polling island - Get a lock on the latest polling island (i.e the last island in the
- Remove the fd from the polling island. linked list pointed by fd->polling_island). This is the island that
- Remove a ref to the polling island and set fd->polling_island to NULL */ would actually contain the fd
- Remove the fd from the latest polling island
- Unlock the latest polling island
- Set fd->polling_island to NULL (but remove the ref on the polling island
before doing this.) */
gpr_mu_lock(&fd->pi_mu); gpr_mu_lock(&fd->pi_mu);
if (fd->polling_island != NULL) { if (fd->polling_island != NULL) {
fd->polling_island = polling_island *pi_latest = polling_island_lock(fd->polling_island);
polling_island_update_and_lock(fd->polling_island, 1, 0); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed); gpr_mu_unlock(&pi_latest->mu);
polling_island_unref_and_unlock(fd->polling_island, 1); PI_UNREF(fd->polling_island, "fd_orphan");
fd->polling_island = NULL; fd->polling_island = NULL;
} }
gpr_mu_unlock(&fd->pi_mu); gpr_mu_unlock(&fd->pi_mu);
@ -1050,17 +1119,13 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->mu);
} }
/* Release the reference to pollset->polling_island and set it to NULL. static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
pollset->mu must be held */ gpr_mu_lock(&ps->pi_mu);
static void pollset_release_polling_island_locked(grpc_pollset *pollset) { if (ps->polling_island != NULL) {
gpr_mu_lock(&pollset->pi_mu); PI_UNREF(ps->polling_island, reason);
if (pollset->polling_island) {
pollset->polling_island =
polling_island_update_and_lock(pollset->polling_island, 1, 0);
polling_island_unref_and_unlock(pollset->polling_island, 1);
pollset->polling_island = NULL;
} }
gpr_mu_unlock(&pollset->pi_mu); ps->polling_island = NULL;
gpr_mu_unlock(&ps->pi_mu);
} }
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
@ -1069,8 +1134,9 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(!pollset_has_workers(pollset));
pollset->finish_shutdown_called = true; pollset->finish_shutdown_called = true;
pollset_release_polling_island_locked(pollset);
/* Release the ref and set pollset->polling_island to NULL */
pollset_release_polling_island(pollset, "ps_shutdown");
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
} }
@ -1110,7 +1176,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false; pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false; pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL; pollset->shutdown_done = NULL;
pollset_release_polling_island_locked(pollset); pollset_release_polling_island(pollset, "ps_reset");
} }
#define GRPC_EPOLL_MAX_EVENTS 1000 #define GRPC_EPOLL_MAX_EVENTS 1000
@ -1124,28 +1190,37 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
polling island pointed by pollset->polling_island. latest polling island pointed by pollset->polling_island.
Acquire the following locks: Acquire the following locks:
- pollset->mu (which we already have) - pollset->mu (which we already have)
- pollset->pi_mu - pollset->pi_mu
- pollset->polling_island->mu (call polling_island_update_and_lock())*/ - pollset->polling_island lock */
gpr_mu_lock(&pollset->pi_mu); gpr_mu_lock(&pollset->pi_mu);
pi = pollset->polling_island; if (pollset->polling_island == NULL) {
if (pi == NULL) { pollset->polling_island = polling_island_create(NULL);
pi = polling_island_create(NULL, 1); PI_ADD_REF(pollset->polling_island, "ps");
} }
/* In addition to locking the polling island, add a ref so that the island pi = polling_island_lock(pollset->polling_island);
does not get destroyed (which means the epoll_fd won't be closed) while
we are are doing an epoll_wait() on the epoll_fd */
pi = polling_island_update_and_lock(pi, 1, 1);
epoll_fd = pi->epoll_fd; epoll_fd = pi->epoll_fd;
/* Update the pollset->polling_island */ /* Update the pollset->polling_island since the island being pointed by
pollset->polling_island may not be the latest (i.e pi) */
if (pollset->polling_island != pi) {
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
PI_UNREF(pollset->polling_island, "ps");
pollset->polling_island = pi; pollset->polling_island = pi;
}
/* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
epoll_fd */
PI_ADD_REF(pi, "ps_work");
polling_island_unref_and_unlock(pollset->polling_island, 0); /* Keep the ref*/ gpr_mu_unlock(&pi->mu);
gpr_mu_unlock(&pollset->pi_mu); gpr_mu_unlock(&pollset->pi_mu);
gpr_mu_unlock(&pollset->mu); gpr_mu_unlock(&pollset->mu);
@ -1193,14 +1268,12 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pi != NULL); GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island */ /* Before leaving, release the extra ref we added to the polling island. It
/* It is important to note that at this point 'pi' may not be the same as is important to use "pi" here (i.e our old copy of pollset->polling_island
* pollset->polling_island. This is because pollset->polling_island pointer that we got before releasing the polling island lock). This is because
* gets updated whenever the underlying polling island is merged with another pollset->polling_island pointer might get udpated in other parts of the
* island and while we are doing epoll_wait() above, the polling island may code when there is an island merge while we are doing epoll_wait() above */
* have been merged */ PI_UNREF(pi, "ps_work");
pi = polling_island_update_and_lock(pi, 1, 0); /* No new ref added */
polling_island_unref_and_unlock(pi, 1);
GPR_TIMER_END("pollset_work_and_unlock", 0); GPR_TIMER_END("pollset_work_and_unlock", 0);
} }
@ -1297,20 +1370,34 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island == pollset->polling_island) { if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island; pi_new = fd->polling_island;
if (pi_new == NULL) { if (pi_new == NULL) {
pi_new = polling_island_create(fd, 2); pi_new = polling_island_create(fd);
} }
} else if (fd->polling_island == NULL) { } else if (fd->polling_island == NULL) {
pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1); pi_new = polling_island_lock(pollset->polling_island);
polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true); polling_island_add_fds_locked(pi_new, &fd, 1, true);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
} else if (pollset->polling_island == NULL) { } else if (pollset->polling_island == NULL) {
pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1); pi_new = polling_island_lock(fd->polling_island);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
} else { } else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island); pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
} }
fd->polling_island = pollset->polling_island = pi_new; if (fd->polling_island != pi_new) {
PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) {
PI_UNREF(fd->polling_island, "fd");
}
fd->polling_island = pi_new;
}
if (pollset->polling_island != pi_new) {
PI_ADD_REF(pi_new, "ps");
if (pollset->polling_island != NULL) {
PI_UNREF(pollset->polling_island, "ps");
}
pollset->polling_island = pi_new;
}
gpr_mu_unlock(&fd->pi_mu); gpr_mu_unlock(&fd->pi_mu);
gpr_mu_unlock(&pollset->pi_mu); gpr_mu_unlock(&pollset->pi_mu);
@ -1481,28 +1568,19 @@ void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
return pi; return pi;
} }
static polling_island *get_polling_island(polling_island *p) { bool grpc_are_polling_islands_equal(void *p, void *q) {
if (p == NULL) { polling_island *p1 = p;
return NULL; polling_island *p2 = q;
}
polling_island *next;
gpr_mu_lock(&p->mu);
while (p->merged_to != NULL) {
next = p->merged_to;
gpr_mu_unlock(&p->mu);
p = next;
gpr_mu_lock(&p->mu);
}
gpr_mu_unlock(&p->mu);
return p; polling_island_lock_pair(&p1, &p2);
if (p1 == p2) {
gpr_mu_unlock(&p1->mu);
} else {
gpr_mu_unlock(&p1->mu);
gpr_mu_unlock(&p2->mu);
} }
bool grpc_are_polling_islands_equal(void *p, void *q) { return p1 == p2;
p = get_polling_island(p);
q = get_polling_island(q);
return p == q;
} }
/******************************************************************************* /*******************************************************************************

Loading…
Cancel
Save