Start sketching hierarchical turnstile

pull/10932/head
Craig Tiller 8 years ago
parent 7068eafd89
commit 6de0593e82
  1. 53
      src/core/lib/iomgr/ev_epoll1_linux.c

@ -48,6 +48,7 @@
#include <unistd.h>
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/tls.h>
@ -113,18 +114,32 @@ struct grpc_pollset_worker {
gpr_cv cv;
};
typedef struct pollset_neighbourhood {
gpr_mu mu;
grpc_pollset *active_root;
grpc_pollset *inactive_root;
bool seen_inactive;
char pad[GPR_CACHELINE_SIZE];
} pollset_neighbourhood;
struct grpc_pollset {
gpr_mu mu;
pollset_neighbourhood *neighbourhood;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
bool seen_inactive;
bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
grpc_pollset *next;
grpc_pollset *prev;
};
/*******************************************************************************
* Pollset-set Declarations
*/
struct grpc_pollset_set {};
/*******************************************************************************
@ -303,6 +318,11 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
* Pollset Definitions
*/
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods;
/* Return true if first in list */
static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
grpc_pollset_worker *worker) {
@ -342,15 +362,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root,
}
}
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
static gpr_mu g_pollset_mu;
static grpc_pollset_worker *g_root_worker;
static grpc_error *pollset_global_init(void) {
gpr_mu_init(&g_pollset_mu);
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
gpr_atm_no_barrier_store(&g_active_poller, 0);
global_wakeup_fd.read_fd = -1;
grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
if (err != GRPC_ERROR_NONE) return err;
@ -363,14 +378,32 @@ static grpc_error *pollset_global_init(void) {
}
static void pollset_global_shutdown(void) {
gpr_mu_destroy(&g_pollset_mu);
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &g_pollset_mu;
gpr_mu_init(&pollset->mu);
*mu = &pollset->mu;
pollset->neighbourhood = &g_neighbourhoods[gpr_cpu_current_cpu()];
pollset->seen_inactive = true;
pollset->next = pollset->prev = pollset;
}
static void pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
gpr_mu_lock(&pollset->neighbourhood->mu);
pollset->prev->next = pollset->next;
pollset->next->prev = pollset->prev;
if (pollset == pollset->neighbourhood->active_root) {
pollset->neighbourhood->active_root =
pollset->next == pollset ? NULL : pollset->next;
} else if (pollset == pollset->neighbourhood->inactive_root) {
pollset->neighbourhood->inactive_root =
pollset->next == pollset ? NULL : pollset->next;
}
gpr_mu_unlock(&pollset->neighbourhood->mu);
}
static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
@ -408,8 +441,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
static void pollset_destroy(grpc_pollset *pollset) {}
#define MAX_EPOLL_EVENTS 100
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,

Loading…
Cancel
Save