Merge branch 'iq2-pi' into delayed-write

reviewable/pr6737/r3
Craig Tiller 9 years ago
commit 4c3e8e49e4
  1. 173
      src/core/lib/iomgr/ev_epoll_linux.c
  2. 13
      src/core/lib/iomgr/tcp_windows.c

@ -57,6 +57,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@ -156,12 +157,13 @@ static void fd_global_shutdown(void);
#ifdef GRPC_PI_REF_COUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(p, r) pi_unref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
@ -184,6 +186,9 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
/* The workqueue associated with this polling island */
grpc_workqueue *workqueue;
/* The fd of the underlying epoll set */
int epoll_fd;
@ -191,11 +196,6 @@ typedef struct polling_island {
size_t fd_cnt;
size_t fd_capacity;
grpc_fd **fds;
/* Polling islands that are no longer needed are kept in a freelist so that
they can be reused. This field points to the next polling island in the
free list */
struct polling_island *next_free;
} polling_island;
/*******************************************************************************
@ -275,11 +275,8 @@ static void append_error(grpc_error **composite, grpc_error *error,
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
/* Polling island freelist */
static gpr_mu g_pi_freelist_mu;
static polling_island *g_pi_freelist = NULL;
static void polling_island_delete(); /* Forward declaration */
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@ -293,27 +290,29 @@ gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
#ifdef GRPC_PI_REF_COUNT_DEBUG
void pi_add_ref(polling_island *pi);
void pi_unref(polling_island *pi);
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
int line) {
long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, old_cnt + 1, reason, file, line);
}
void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
char *reason, char *file, int line) {
long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
pi_unref(pi);
pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
#endif
void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
static void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
void pi_unref(polling_island *pi) {
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
/* If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
@ -324,7 +323,7 @@ void pi_unref(polling_island *pi) {
*/
if (gpr_unref(&pi->ref_count)) {
polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
polling_island_delete(pi);
polling_island_delete(exec_ctx, pi);
if (next != NULL) {
PI_UNREF(next, "pi_delete"); /* Recursive call */
}
@ -462,29 +461,22 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
}
/* Might return NULL in case of an error */
static polling_island *polling_island_create(grpc_fd *initial_fd,
static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
grpc_fd *initial_fd,
grpc_error **error) {
polling_island *pi = NULL;
char *err_msg;
const char *err_desc = "polling_island_create";
/* Try to get one from the polling island freelist */
gpr_mu_lock(&g_pi_freelist_mu);
if (g_pi_freelist != NULL) {
pi = g_pi_freelist;
g_pi_freelist = g_pi_freelist->next_free;
pi->next_free = NULL;
}
gpr_mu_unlock(&g_pi_freelist_mu);
/* Create new polling island if we could not get one from the free list */
if (pi == NULL) {
pi = gpr_malloc(sizeof(*pi));
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
pi->fds = NULL;
}
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
pi->workqueue = NULL;
gpr_ref_init(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
@ -492,39 +484,49 @@ static polling_island *polling_island_create(grpc_fd *initial_fd,
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
} else {
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
pi->next_free = NULL;
append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
goto done;
}
if (initial_fd != NULL) {
/* Lock the polling island here just in case we got this structure from
the freelist and the polling island lock was not released yet (by the
code that adds the polling island to the freelist) */
gpr_mu_lock(&pi->mu);
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
gpr_mu_unlock(&pi->mu);
}
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
if (initial_fd != NULL) {
/* Lock the polling island here just in case we got this structure from
the freelist and the polling island lock was not released yet (by the
code that adds the polling island to the freelist) */
gpr_mu_lock(&pi->mu);
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
gpr_mu_unlock(&pi->mu);
}
append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
err_desc);
done:
if (*error != GRPC_ERROR_NONE) {
if (pi->epoll_fd < 0) {
close(pi->epoll_fd);
}
if (pi->workqueue != NULL) {
GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
}
gpr_mu_destroy(&pi->mu);
gpr_free(pi);
pi = NULL;
}
return pi;
}
static void polling_island_delete(polling_island *pi) {
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
GPR_ASSERT(pi->fd_cnt == 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
close(pi->epoll_fd);
pi->epoll_fd = -1;
gpr_mu_lock(&g_pi_freelist_mu);
pi->next_free = g_pi_freelist;
g_pi_freelist = pi;
gpr_mu_unlock(&g_pi_freelist_mu);
GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
gpr_mu_destroy(&pi->mu);
gpr_free(pi->fds);
gpr_free(pi);
}
/* Attempts to gets the last polling island in the linked list (liked by the
@ -704,9 +706,6 @@ static polling_island *polling_island_merge(polling_island *p,
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_init(&g_pi_freelist_mu);
g_pi_freelist = NULL;
error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
if (error == GRPC_ERROR_NONE) {
error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
@ -716,18 +715,6 @@ static grpc_error *polling_island_global_init() {
}
static void polling_island_global_shutdown() {
polling_island *next;
gpr_mu_lock(&g_pi_freelist_mu);
gpr_mu_unlock(&g_pi_freelist_mu);
while (g_pi_freelist != NULL) {
next = g_pi_freelist->next_free;
gpr_mu_destroy(&g_pi_freelist->mu);
gpr_free(g_pi_freelist->fds);
gpr_free(g_pi_freelist);
g_pi_freelist = next;
}
gpr_mu_destroy(&g_pi_freelist_mu);
grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
}
@ -929,7 +916,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
PI_UNREF(fd->polling_island, "fd_orphan");
PI_UNREF(exec_ctx, fd->polling_island, "fd_orphan");
fd->polling_island = NULL;
}
gpr_mu_unlock(&fd->pi_mu);
@ -1037,7 +1024,16 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->pi_mu);
grpc_workqueue *workqueue = NULL;
if (fd->polling_island != NULL) {
workqueue =
GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
}
gpr_mu_unlock(&fd->pi_mu);
return workqueue;
}
/*******************************************************************************
* Pollset Definitions
@ -1229,9 +1225,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu);
}
static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
grpc_pollset *ps, char *reason) {
if (ps->polling_island != NULL) {
PI_UNREF(ps->polling_island, reason);
PI_UNREF(exec_ctx, ps->polling_island, reason);
}
ps->polling_island = NULL;
}
@ -1244,7 +1241,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->polling_island to NULL */
pollset_release_polling_island(pollset, "ps_shutdown");
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
@ -1276,14 +1273,14 @@ static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
static void pollset_reset(grpc_pollset *pollset) {
static void pollset_reset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
pollset_release_polling_island(pollset, "ps_reset");
GPR_ASSERT(pollset->polling_island == NULL);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
@ -1311,7 +1308,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) {
pollset->polling_island = polling_island_create(NULL, error);
pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
if (pollset->polling_island == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
@ -1331,7 +1328,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
PI_UNREF(pollset->polling_island, "ps");
PI_UNREF(exec_ctx, pollset->polling_island, "ps");
pollset->polling_island = pi;
}
@ -1402,7 +1399,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
that we got before releasing the polling island lock). This is because
pollset->polling_island pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
PI_UNREF(pi, "ps_work");
PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
@ -1540,7 +1537,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island;
if (pi_new == NULL) {
pi_new = polling_island_create(fd, &error);
pi_new = polling_island_create(exec_ctx, fd, &error);
GRPC_POLLING_TRACE(
"pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
@ -1581,7 +1578,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island != pi_new) {
PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) {
PI_UNREF(fd->polling_island, "fd");
PI_UNREF(exec_ctx, fd->polling_island, "fd");
}
fd->polling_island = pi_new;
}
@ -1589,7 +1586,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->polling_island != pi_new) {
PI_ADD_REF(pi_new, "ps");
if (pollset->polling_island != NULL) {
PI_UNREF(pollset->polling_island, "ps");
PI_UNREF(exec_ctx, pollset->polling_island, "ps");
}
pollset->polling_island = pi_new;
}

@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
static grpc_endpoint_vtable vtable = {
win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
win_shutdown, win_destroy, win_get_peer};
static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_get_workqueue,
win_add_to_pollset,
win_add_to_pollset_set,
win_shutdown,
win_destroy,
win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));

Loading…
Cancel
Save