|
|
@ -150,28 +150,84 @@ typedef struct polling_island { |
|
|
|
static gpr_mu g_pi_freelist_mu; |
|
|
|
static gpr_mu g_pi_freelist_mu; |
|
|
|
static polling_island *g_pi_freelist = NULL; |
|
|
|
static polling_island *g_pi_freelist = NULL; |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ? */ |
|
|
|
/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
|
|
|
|
static void add_fd_to_polling_island_locked(polling_island *pi, grpc_fd *fd) { |
|
|
|
* TODO: sreek - Should this add a ref to the grpc_fd ? */ |
|
|
|
|
|
|
|
/* The caller is expected to hold pi->mu lock before calling this function */ |
|
|
|
|
|
|
|
static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, |
|
|
|
|
|
|
|
size_t fd_count) { |
|
|
|
int err; |
|
|
|
int err; |
|
|
|
|
|
|
|
size_t i; |
|
|
|
struct epoll_event ev; |
|
|
|
struct epoll_event ev; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < fd_count; i++) { |
|
|
|
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); |
|
|
|
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); |
|
|
|
ev.data.ptr = fd; |
|
|
|
ev.data.ptr = fds[i]; |
|
|
|
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); |
|
|
|
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev); |
|
|
|
|
|
|
|
|
|
|
|
if (err < 0 && errno != EEXIST) { |
|
|
|
if (err < 0 && errno != EEXIST) { |
|
|
|
gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s", fd->fd, |
|
|
|
gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s", |
|
|
|
strerror(errno)); |
|
|
|
fds[i]->fd, strerror(errno)); |
|
|
|
return; |
|
|
|
/* TODO: sreek - Not sure if it is a good idea to continue here. We need a
|
|
|
|
|
|
|
|
* better way to bubble up this error instead of doing an abort() */ |
|
|
|
|
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pi->fd_cnt == pi->fd_capacity) { |
|
|
|
pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2); |
|
|
|
pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2); |
|
|
|
pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity); |
|
|
|
pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity); |
|
|
|
pi->fds[pi->fd_cnt++] = fd; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pi->fds[pi->fd_cnt++] = fds[i]; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
|
|
|
|
|
|
|
|
* TODO: sreek - Might have to unref the fds (assuming whether we add a ref to |
|
|
|
|
|
|
|
* the fd when adding it to the epollset) */ |
|
|
|
|
|
|
|
/* The caller is expected to hold pi->mu lock before calling this function */ |
|
|
|
|
|
|
|
static void polling_island_clear_fds_locked(polling_island *pi) { |
|
|
|
|
|
|
|
int err; |
|
|
|
|
|
|
|
size_t i; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < pi->fd_cnt; i++) { |
|
|
|
|
|
|
|
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (err < 0 && errno != ENOENT) { |
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
|
|
|
"epoll_ctl delete for fds[i]: %d failed with error: %s", i, |
|
|
|
|
|
|
|
pi->fds[i]->fd, strerror(errno)); |
|
|
|
|
|
|
|
/* TODO: sreek - Not sure if it is a good idea to continue here. We need a
|
|
|
|
|
|
|
|
* better way to bubble up this error instead of doing an abort() */ |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pi->fd_cnt = 0; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
|
|
|
|
|
|
|
|
* TODO: sreek - Might have to unref the fd (assuming whether we add a ref to |
|
|
|
|
|
|
|
* the fd when adding it to the epollset) */ |
|
|
|
|
|
|
|
/* The caller is expected to hold pi->mu lock before calling this function */ |
|
|
|
|
|
|
|
static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd) { |
|
|
|
|
|
|
|
int err; |
|
|
|
|
|
|
|
size_t i; |
|
|
|
|
|
|
|
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); |
|
|
|
|
|
|
|
if (err < 0 && errno != ENOENT) { |
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, "epoll_ctl delete for fd: %d failed with error; %s", |
|
|
|
|
|
|
|
fd->fd, strerror(errno)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < pi->fd_cnt; i++) { |
|
|
|
|
|
|
|
if (pi->fds[i] == fd) { |
|
|
|
|
|
|
|
pi->fds[i] = pi->fds[--pi->fd_cnt]; |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static polling_island *polling_island_create(int initial_ref_cnt, |
|
|
|
static polling_island *polling_island_create(grpc_fd *initial_fd, |
|
|
|
grpc_fd *initial_fd) { |
|
|
|
int initial_ref_cnt) { |
|
|
|
polling_island *pi = NULL; |
|
|
|
polling_island *pi = NULL; |
|
|
|
gpr_mu_lock(&g_pi_freelist_mu); |
|
|
|
gpr_mu_lock(&g_pi_freelist_mu); |
|
|
|
if (g_pi_freelist != NULL) { |
|
|
|
if (g_pi_freelist != NULL) { |
|
|
@ -202,17 +258,151 @@ static polling_island *polling_island_create(int initial_ref_cnt, |
|
|
|
pi->next_free = NULL; |
|
|
|
pi->next_free = NULL; |
|
|
|
|
|
|
|
|
|
|
|
if (initial_fd != NULL) { |
|
|
|
if (initial_fd != NULL) { |
|
|
|
/* add_fd_to_polling_island_locked() expects the caller to hold a pi->mu
|
|
|
|
/* polling_island_add_fds_locked() expects the caller to hold a pi->mu
|
|
|
|
* lock. However, since this is a new polling island (and no one has a |
|
|
|
* lock. However, since this is a new polling island (and no one has a |
|
|
|
* reference to it yet), it is okay to not acquire pi->mu here */ |
|
|
|
* reference to it yet), it is okay to not acquire pi->mu here */ |
|
|
|
add_fd_to_polling_island_locked(pi, initial_fd); |
|
|
|
polling_island_add_fds_locked(pi, &initial_fd, 1); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return pi; |
|
|
|
return pi; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void polling_island_delete(polling_island *pi) { |
|
|
|
|
|
|
|
GPR_ASSERT(pi->ref_cnt == 0); |
|
|
|
|
|
|
|
GPR_ASSERT(pi->fd_cnt == 0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pi->merged_to = NULL; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&g_pi_freelist_mu); |
|
|
|
|
|
|
|
pi->next_free = g_pi_freelist; |
|
|
|
|
|
|
|
g_pi_freelist = pi; |
|
|
|
|
|
|
|
gpr_mu_unlock(&g_pi_freelist_mu); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void polling_island_unref_and_unlock(polling_island *pi, int unref_by) { |
|
|
|
|
|
|
|
pi->ref_cnt -= unref_by; |
|
|
|
|
|
|
|
int ref_cnt = pi->ref_cnt; |
|
|
|
|
|
|
|
GPR_ASSERT(ref_cnt >= 0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&pi->mu); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (ref_cnt == 0) { |
|
|
|
|
|
|
|
polling_island_delete(pi); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by, |
|
|
|
|
|
|
|
int add_ref_by) { |
|
|
|
|
|
|
|
polling_island *next = NULL; |
|
|
|
|
|
|
|
gpr_mu_lock(&pi->mu); |
|
|
|
|
|
|
|
while (pi->merged_to != NULL) { |
|
|
|
|
|
|
|
next = pi->merged_to; |
|
|
|
|
|
|
|
polling_island_unref_and_unlock(pi, unref_by); |
|
|
|
|
|
|
|
pi = next; |
|
|
|
|
|
|
|
gpr_mu_lock(&pi->mu); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pi->ref_cnt += add_ref_by; |
|
|
|
|
|
|
|
return pi; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void polling_island_pair_update_and_lock(polling_island **p, |
|
|
|
|
|
|
|
polling_island **q) { |
|
|
|
|
|
|
|
polling_island *pi_1 = *p; |
|
|
|
|
|
|
|
polling_island *pi_2 = *q; |
|
|
|
|
|
|
|
polling_island *temp = NULL; |
|
|
|
|
|
|
|
bool pi_1_locked = false; |
|
|
|
|
|
|
|
bool pi_2_locked = false; |
|
|
|
|
|
|
|
int num_swaps = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) { |
|
|
|
|
|
|
|
// pi_1 is NOT equal to pi_2
|
|
|
|
|
|
|
|
// pi_1 MAY be locked
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pi_1 > pi_2) { |
|
|
|
|
|
|
|
if (pi_1_locked) { |
|
|
|
|
|
|
|
gpr_mu_unlock(&pi_1->mu); |
|
|
|
|
|
|
|
pi_1_locked = false; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GPR_SWAP(polling_island *, pi_1, pi_2); |
|
|
|
|
|
|
|
num_swaps++; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// p1 < p2
|
|
|
|
|
|
|
|
// p1 MAY BE locked
|
|
|
|
|
|
|
|
// p2 is NOT locked
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (!pi_1_locked) { |
|
|
|
|
|
|
|
gpr_mu_lock(&pi_1->mu); |
|
|
|
|
|
|
|
pi_1_locked = true; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pi_1->merged_to != NULL) { |
|
|
|
|
|
|
|
temp = pi_1->merged_to; |
|
|
|
|
|
|
|
polling_island_unref_and_unlock(pi_1, 1); |
|
|
|
|
|
|
|
pi_1 = temp; |
|
|
|
|
|
|
|
pi_1_locked = false; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// p1 is LOCKED
|
|
|
|
|
|
|
|
// p2 is UNLOCKED
|
|
|
|
|
|
|
|
// p1 != p2
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pi_2->mu); |
|
|
|
|
|
|
|
pi_2_locked = true; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pi_2->merged_to != NULL) { |
|
|
|
|
|
|
|
temp = pi_2->merged_to; |
|
|
|
|
|
|
|
polling_island_unref_and_unlock(pi_2, 1); |
|
|
|
|
|
|
|
pi_2 = temp; |
|
|
|
|
|
|
|
pi_2_locked = false; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Either pi_1 == pi_2 OR we got both locks!
|
|
|
|
|
|
|
|
if (pi_1 == pi_2) { |
|
|
|
|
|
|
|
GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked)); |
|
|
|
|
|
|
|
if (!pi_1_locked) { |
|
|
|
|
|
|
|
pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
GPR_ASSERT(pi_1_locked && pi_2_locked); |
|
|
|
|
|
|
|
if (num_swaps % 2 > 0) { |
|
|
|
|
|
|
|
GPR_SWAP(polling_island *, pi_1, pi_2); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*p = pi_1; |
|
|
|
|
|
|
|
*q = pi_2; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
polling_island *polling_island_merge(polling_island *p, polling_island *q) { |
|
|
|
|
|
|
|
polling_island *merged = NULL; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
polling_island_pair_update_and_lock(&p, &q); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* TODO: sreek: Think about this scenario some more. Is it possible ?. what
|
|
|
|
|
|
|
|
* does it mean, when would this happen */ |
|
|
|
|
|
|
|
if (p == q) { |
|
|
|
|
|
|
|
merged = p; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Move all the fds from polling_island p to polling_island q
|
|
|
|
|
|
|
|
polling_island_add_fds_locked(q, p->fds, p->fd_cnt); |
|
|
|
|
|
|
|
polling_island_clear_fds_locked(p); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
q->ref_cnt += p->ref_cnt; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
|
|
|
gpr_mu_unlock(&q->mu); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return merged; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void polling_island_global_init() { |
|
|
|
static void polling_island_global_init() { |
|
|
|
polling_island_create(0, NULL); /* TODO(sreek): Delete this line */ |
|
|
|
|
|
|
|
gpr_mu_init(&g_pi_freelist_mu); |
|
|
|
gpr_mu_init(&g_pi_freelist_mu); |
|
|
|
g_pi_freelist = NULL; |
|
|
|
g_pi_freelist = NULL; |
|
|
|
} |
|
|
|
} |
|
|
@ -319,7 +509,8 @@ struct grpc_pollset_set { |
|
|
|
* fd_posix.c |
|
|
|
* fd_posix.c |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
/* We need to keep a freelist not because of any concerns of malloc performance
|
|
|
|
/* We need to keep a freelist not because of any concerns of malloc
|
|
|
|
|
|
|
|
* performance |
|
|
|
* but instead so that implementations with multiple threads in (for example) |
|
|
|
* but instead so that implementations with multiple threads in (for example) |
|
|
|
* epoll_wait deal with the race between pollset removal and incoming poll |
|
|
|
* epoll_wait deal with the race between pollset removal and incoming poll |
|
|
|
* notifications. |
|
|
|
* notifications. |
|
|
@ -434,6 +625,7 @@ static void fd_global_shutdown(void) { |
|
|
|
|
|
|
|
|
|
|
|
static grpc_fd *fd_create(int fd, const char *name) { |
|
|
|
static grpc_fd *fd_create(int fd, const char *name) { |
|
|
|
grpc_fd *r = alloc_fd(fd); |
|
|
|
grpc_fd *r = alloc_fd(fd); |
|
|
|
|
|
|
|
|
|
|
|
char *name2; |
|
|
|
char *name2; |
|
|
|
gpr_asprintf(&name2, "%s fd=%d", name, fd); |
|
|
|
gpr_asprintf(&name2, "%s fd=%d", name, fd); |
|
|
|
grpc_iomgr_register_object(&r->iomgr_object, name2); |
|
|
|
grpc_iomgr_register_object(&r->iomgr_object, name2); |
|
|
@ -453,6 +645,20 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
if (!fd->released) { |
|
|
|
if (!fd->released) { |
|
|
|
close(fd->fd); |
|
|
|
close(fd->fd); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
|
|
|
|
/* TODO: sreek - Check for deadlocks */ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&fd->pi_mu); |
|
|
|
|
|
|
|
fd->polling_island = |
|
|
|
|
|
|
|
polling_island_update_and_lock(fd->polling_island, 1, 0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
polling_island_remove_fd_locked(fd->polling_island, fd); |
|
|
|
|
|
|
|
polling_island_unref_and_unlock(fd->polling_island, 1); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fd->polling_island = NULL; |
|
|
|
|
|
|
|
gpr_mu_unlock(&fd->pi_mu); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* TODO: sreek - This should be no longer needed */ |
|
|
|
remove_fd_from_all_epoll_sets(fd->fd); |
|
|
|
remove_fd_from_all_epoll_sets(fd->fd); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); |
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); |
|
|
@ -752,7 +958,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { |
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); |
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock declaration
|
|
|
|
/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock
|
|
|
|
|
|
|
|
* declaration |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static void multipoll_with_epoll_pollset_maybe_work_and_unlock( |
|
|
|
static void multipoll_with_epoll_pollset_maybe_work_and_unlock( |
|
|
|
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
|
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, |
|
|
@ -979,50 +1186,30 @@ static void remove_fd_from_all_epoll_sets(int fd) { |
|
|
|
* finally_add_fd() in ev_poll_and_epoll_posix.c */ |
|
|
|
* finally_add_fd() in ev_poll_and_epoll_posix.c */ |
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
grpc_fd *fd) { |
|
|
|
grpc_fd *fd) { |
|
|
|
|
|
|
|
|
|
|
|
/* TODO sreek - Check if we need to get a pollset->mu lock here */ |
|
|
|
/* TODO sreek - Check if we need to get a pollset->mu lock here */ |
|
|
|
|
|
|
|
gpr_mu_lock(&pollset->pi_mu); |
|
|
|
|
|
|
|
gpr_mu_lock(&fd->pi_mu); |
|
|
|
|
|
|
|
|
|
|
|
struct epoll_event ev; |
|
|
|
polling_island *pi_new = NULL; |
|
|
|
int err; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Hold a ref to the fd to keep it from being closed during the add. This may
|
|
|
|
|
|
|
|
result in a spurious wakeup being assigned to this pollset whilst adding, |
|
|
|
|
|
|
|
but that should be benign. */ |
|
|
|
|
|
|
|
/* TODO: (sreek): Understand how a spurious wake up migh be assinged to this
|
|
|
|
|
|
|
|
* pollset..and how holding a reference will prevent the fd from being closed |
|
|
|
|
|
|
|
* (and perhaps more importantly, see how can an fd be closed while being |
|
|
|
|
|
|
|
* added to the epollset */ |
|
|
|
|
|
|
|
GRPC_FD_REF(fd, "add fd"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&fd->mu); |
|
|
|
if (fd->polling_island == pollset->polling_island) { |
|
|
|
if (fd->shutdown) { |
|
|
|
pi_new = fd->polling_island; |
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
if (pi_new == NULL) { |
|
|
|
GRPC_FD_UNREF(fd, "add fd"); |
|
|
|
pi_new = polling_island_create(fd, 2); |
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
} else if (fd->polling_island == NULL) { |
|
|
|
|
|
|
|
pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1); |
|
|
|
|
|
|
|
|
|
|
|
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); |
|
|
|
} else if (pollset->polling_island == NULL) { |
|
|
|
ev.data.ptr = fd; |
|
|
|
pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1); |
|
|
|
err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); |
|
|
|
} else { // Non null and different
|
|
|
|
if (err < 0) { |
|
|
|
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island); |
|
|
|
/* FDs may be added to a pollset multiple times, so EEXIST is normal. */ |
|
|
|
|
|
|
|
if (errno != EEXIST) { |
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, |
|
|
|
|
|
|
|
strerror(errno)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* The fd might have been orphaned while we were adding it to the epoll set.
|
|
|
|
fd->polling_island = pollset->polling_island = pi_new; |
|
|
|
Close the fd in such a case (which will also take care of removing it from |
|
|
|
|
|
|
|
the epoll set */ |
|
|
|
|
|
|
|
gpr_mu_lock(&fd->mu); |
|
|
|
|
|
|
|
if (fd_is_orphaned(fd) && !fd->closed) { |
|
|
|
|
|
|
|
close_fd_locked(exec_ctx, fd); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GRPC_FD_UNREF(fd, "add fd"); |
|
|
|
gpr_mu_unlock(&fd->pi_mu); |
|
|
|
|
|
|
|
gpr_mu_unlock(&pollset->pi_mu); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* Creates an epoll fd and initializes the pollset */ |
|
|
|
/* Creates an epoll fd and initializes the pollset */ |
|
|
|