|
|
|
@ -207,12 +207,7 @@ struct grpc_pollset { |
|
|
|
|
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ |
|
|
|
|
grpc_closure *shutdown_done; /* Called after after shutdown is complete */ |
|
|
|
|
|
|
|
|
|
/* The polling island to which this pollset belongs to and the mutex
|
|
|
|
|
protecting the field */ |
|
|
|
|
/* TODO: sreek: This lock might actually be adding more overhead to the
|
|
|
|
|
critical path (i.e pollset_work() function). Consider removing this lock |
|
|
|
|
and just using the overall pollset lock */ |
|
|
|
|
gpr_mu pi_mu; |
|
|
|
|
/* The polling island to which this pollset belongs to */ |
|
|
|
|
struct polling_island *polling_island; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -488,8 +483,21 @@ static void polling_island_delete(polling_island *pi) { |
|
|
|
|
gpr_mu_unlock(&g_pi_freelist_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Attempts to gets the last polling island in the linked list (liked by the
|
|
|
|
|
* 'merged_to' field). Since this does not lock the polling island, there are no |
|
|
|
|
* guarantees that the island returned is the last island */ |
|
|
|
|
static polling_island *polling_island_maybe_get_latest(polling_island *pi) { |
|
|
|
|
polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); |
|
|
|
|
while (next != NULL) { |
|
|
|
|
pi = next; |
|
|
|
|
next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return pi; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Gets the lock on the *latest* polling island i.e the last polling island in
|
|
|
|
|
the linked list (linked by 'merged_to' link). Call gpr_mu_unlock on the |
|
|
|
|
the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the |
|
|
|
|
returned polling island's mu. |
|
|
|
|
Usage: To lock/unlock polling island "pi", do the following: |
|
|
|
|
polling_island *pi_latest = polling_island_lock(pi); |
|
|
|
@ -497,22 +505,25 @@ static void polling_island_delete(polling_island *pi) { |
|
|
|
|
... critical section .. |
|
|
|
|
... |
|
|
|
|
gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
|
|
|
|
|
polling_island *polling_island_lock(polling_island *pi) { |
|
|
|
|
static polling_island *polling_island_lock(polling_island *pi) { |
|
|
|
|
polling_island *next = NULL; |
|
|
|
|
|
|
|
|
|
while (true) { |
|
|
|
|
next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); |
|
|
|
|
if (next == NULL) { |
|
|
|
|
/* pi is the last node in the linked list. Get the lock and check again
|
|
|
|
|
(under the pi->mu lock) that pi is still the last node (because a merge |
|
|
|
|
may have happend after the (next == NULL) check above and before |
|
|
|
|
getting the pi->mu lock. |
|
|
|
|
If pi is the last node, we are done. If not, unlock and continue |
|
|
|
|
traversing the list */ |
|
|
|
|
/* Looks like 'pi' is the last node in the linked list but unless we check
|
|
|
|
|
this by holding the pi->mu lock, we cannot be sure (i.e without the |
|
|
|
|
pi->mu lock, we don't prevent island merges). |
|
|
|
|
To be absolutely sure, check once more by holding the pi->mu lock */ |
|
|
|
|
gpr_mu_lock(&pi->mu); |
|
|
|
|
next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); |
|
|
|
|
if (next == NULL) { |
|
|
|
|
/* pi is infact the last node and we have the pi->mu lock. we're done */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
|
|
|
|
|
* isn't the lock we are interested in. Continue traversing the list */ |
|
|
|
|
gpr_mu_unlock(&pi->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -526,11 +537,11 @@ polling_island *polling_island_lock(polling_island *pi) { |
|
|
|
|
This function is needed because calling the following block of code to obtain |
|
|
|
|
locks on polling islands (*p and *q) is prone to deadlocks. |
|
|
|
|
{ |
|
|
|
|
polling_island_lock(*p); |
|
|
|
|
polling_island_lock(*q); |
|
|
|
|
polling_island_lock(*p, true); |
|
|
|
|
polling_island_lock(*q, true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Usage/exmaple: |
|
|
|
|
Usage/example: |
|
|
|
|
polling_island *p1; |
|
|
|
|
polling_island *p2; |
|
|
|
|
.. |
|
|
|
@ -551,7 +562,7 @@ polling_island *polling_island_lock(polling_island *pi) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
*/ |
|
|
|
|
void polling_island_lock_pair(polling_island **p, polling_island **q) { |
|
|
|
|
static void polling_island_lock_pair(polling_island **p, polling_island **q) { |
|
|
|
|
polling_island *pi_1 = *p; |
|
|
|
|
polling_island *pi_2 = *q; |
|
|
|
|
polling_island *next_1 = NULL; |
|
|
|
@ -611,7 +622,8 @@ void polling_island_lock_pair(polling_island **p, polling_island **q) { |
|
|
|
|
*q = pi_2; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
polling_island *polling_island_merge(polling_island *p, polling_island *q) { |
|
|
|
|
static polling_island *polling_island_merge(polling_island *p, |
|
|
|
|
polling_island *q) { |
|
|
|
|
/* Get locks on both the polling islands */ |
|
|
|
|
polling_island_lock_pair(&p, &q); |
|
|
|
|
|
|
|
|
@ -1124,7 +1136,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { |
|
|
|
|
pollset->finish_shutdown_called = false; |
|
|
|
|
pollset->shutdown_done = NULL; |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&pollset->pi_mu); |
|
|
|
|
pollset->polling_island = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1170,12 +1181,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_release_polling_island(grpc_pollset *ps, char *reason) { |
|
|
|
|
gpr_mu_lock(&ps->pi_mu); |
|
|
|
|
if (ps->polling_island != NULL) { |
|
|
|
|
PI_UNREF(ps->polling_island, reason); |
|
|
|
|
} |
|
|
|
|
ps->polling_island = NULL; |
|
|
|
|
gpr_mu_unlock(&ps->pi_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -1215,7 +1224,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
* here */ |
|
|
|
|
static void pollset_destroy(grpc_pollset *pollset) { |
|
|
|
|
GPR_ASSERT(!pollset_has_workers(pollset)); |
|
|
|
|
gpr_mu_destroy(&pollset->pi_mu); |
|
|
|
|
gpr_mu_destroy(&pollset->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1250,22 +1258,25 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
|
|
|
|
|
latest polling island pointed by pollset->polling_island. |
|
|
|
|
Acquire the following locks: |
|
|
|
|
- pollset->mu (which we already have) |
|
|
|
|
- pollset->pi_mu |
|
|
|
|
- pollset->polling_island lock */ |
|
|
|
|
gpr_mu_lock(&pollset->pi_mu); |
|
|
|
|
|
|
|
|
|
Since epoll_fd is immutable, we can read it without obtaining the polling |
|
|
|
|
island lock. There is however a possibility that the polling island (from |
|
|
|
|
which we got the epoll_fd) got merged with another island while we are |
|
|
|
|
in this function. This is still okay because in such a case, we will wakeup |
|
|
|
|
right-away from epoll_wait() and pick up the latest polling_island the next |
|
|
|
|
this function (i.e pollset_work_and_unlock()) is called. |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
if (pollset->polling_island == NULL) { |
|
|
|
|
pollset->polling_island = polling_island_create(NULL); |
|
|
|
|
PI_ADD_REF(pollset->polling_island, "ps"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pi = polling_island_lock(pollset->polling_island); |
|
|
|
|
pi = polling_island_maybe_get_latest(pollset->polling_island); |
|
|
|
|
epoll_fd = pi->epoll_fd; |
|
|
|
|
|
|
|
|
|
/* Update the pollset->polling_island since the island being pointed by
|
|
|
|
|
pollset->polling_island may not be the latest (i.e pi) */ |
|
|
|
|
pollset->polling_island maybe older than the one pointed by pi) */ |
|
|
|
|
if (pollset->polling_island != pi) { |
|
|
|
|
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
|
|
|
|
|
polling island to be deleted */ |
|
|
|
@ -1278,9 +1289,6 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, |
|
|
|
|
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the |
|
|
|
|
epoll_fd */ |
|
|
|
|
PI_ADD_REF(pi, "ps_work"); |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pi->mu); |
|
|
|
|
gpr_mu_unlock(&pollset->pi_mu); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
|
|
|
|
|
do { |
|
|
|
@ -1413,7 +1421,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
grpc_fd *fd) { |
|
|
|
|
gpr_mu_lock(&pollset->mu); |
|
|
|
|
gpr_mu_lock(&pollset->pi_mu); |
|
|
|
|
gpr_mu_lock(&fd->pi_mu); |
|
|
|
|
|
|
|
|
|
polling_island *pi_new = NULL; |
|
|
|
@ -1465,7 +1472,6 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&fd->pi_mu); |
|
|
|
|
gpr_mu_unlock(&pollset->pi_mu); |
|
|
|
|
gpr_mu_unlock(&pollset->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1627,9 +1633,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) { |
|
|
|
|
void *grpc_pollset_get_polling_island(grpc_pollset *ps) { |
|
|
|
|
polling_island *pi; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&ps->pi_mu); |
|
|
|
|
gpr_mu_lock(&ps->mu); |
|
|
|
|
pi = ps->polling_island; |
|
|
|
|
gpr_mu_unlock(&ps->pi_mu); |
|
|
|
|
gpr_mu_unlock(&ps->mu); |
|
|
|
|
|
|
|
|
|
return pi; |
|
|
|
|
} |
|
|
|
|