Merge pull request #8982 from kpayson64/cache_poll_threads2

Cache Poller threads for cv-poll engine
pull/12060/merge
Jan Tattermusch 8 years ago committed by GitHub
commit 0fcd99e168
  1. 398
      src/core/lib/iomgr/ev_poll_posix.c
  2. 6
      src/core/lib/iomgr/wakeup_fd_cv.h

@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/murmur_hash.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@ -239,22 +240,43 @@ struct grpc_pollset_set {
* condition variable polling definitions
*/
#define POLLCV_THREAD_GRACE_MS 1000
#define CV_POLL_PERIOD_MS 1000
#define CV_DEFAULT_TABLE_SIZE 16
typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
typedef struct poll_args {
typedef struct poll_result {
gpr_refcount refcount;
gpr_cv *cv;
cv_node *watchers;
int watchcount;
struct pollfd *fds;
nfds_t nfds;
int timeout;
int retval;
int err;
gpr_atm status;
int completed;
} poll_result;
typedef struct poll_args {
gpr_cv trigger;
int trigger_set;
struct pollfd *fds;
nfds_t nfds;
poll_result *result;
struct poll_args *next;
struct poll_args *prev;
} poll_args;
// This is a 2-tiered cache, we mantain a hash table
// of active poll calls, so we can wait on the result
// of that call. We also maintain a freelist of inactive
// poll threads.
typedef struct poll_hash_table {
poll_args *free_pollers;
poll_args **active_pollers;
unsigned int size;
unsigned int count;
} poll_hash_table;
poll_hash_table poll_cache;
cv_fd_table g_cvfds;
/*******************************************************************************
@ -1277,43 +1299,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
* Condition Variable polling extensions
*/
static void decref_poll_args(poll_args *args) {
if (gpr_unref(&args->refcount)) {
gpr_free(args->fds);
gpr_cv_destroy(args->cv);
gpr_free(args->cv);
gpr_free(args);
static void run_poll(void *args);
static void cache_poller_locked(poll_args *args);
static void cache_insert_locked(poll_args *args) {
uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
0xDEADBEEF);
key = key % poll_cache.size;
if (poll_cache.active_pollers[key]) {
poll_cache.active_pollers[key]->prev = args;
}
args->next = poll_cache.active_pollers[key];
args->prev = NULL;
poll_cache.active_pollers[key] = args;
poll_cache.count++;
}
// Poll in a background thread
static void run_poll(void *arg) {
int timeout, retval;
poll_args *pargs = (poll_args *)arg;
while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
if (pargs->timeout < 0) {
timeout = CV_POLL_PERIOD_MS;
} else {
timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
pargs->timeout -= timeout;
static void init_result(poll_args *pargs) {
pargs->result = gpr_malloc(sizeof(poll_result));
gpr_ref_init(&pargs->result->refcount, 1);
pargs->result->watchers = NULL;
pargs->result->watchcount = 0;
pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds);
memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
pargs->result->nfds = pargs->nfds;
pargs->result->retval = 0;
pargs->result->err = 0;
pargs->result->completed = 0;
}
// Creates a poll_args object for a given arguments to poll().
// This object may return a poll_args in the cache.
static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) {
uint32_t key =
gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
key = key % poll_cache.size;
poll_args *curr = poll_cache.active_pollers[key];
while (curr) {
if (curr->nfds == count &&
memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
gpr_free(fds);
return curr;
}
retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
if (retval != 0 || pargs->timeout == 0) {
pargs->retval = retval;
pargs->err = errno;
break;
curr = curr->next;
}
if (poll_cache.free_pollers) {
poll_args *pargs = poll_cache.free_pollers;
poll_cache.free_pollers = pargs->next;
if (poll_cache.free_pollers) {
poll_cache.free_pollers->prev = NULL;
}
pargs->fds = fds;
pargs->nfds = count;
pargs->next = NULL;
pargs->prev = NULL;
init_result(pargs);
cache_poller_locked(pargs);
return pargs;
}
poll_args *pargs = gpr_malloc(sizeof(struct poll_args));
gpr_cv_init(&pargs->trigger);
pargs->fds = fds;
pargs->nfds = count;
pargs->next = NULL;
pargs->prev = NULL;
pargs->trigger_set = 0;
init_result(pargs);
cache_poller_locked(pargs);
gpr_thd_id t_id;
gpr_thd_options opt = gpr_thd_options_default();
gpr_ref(&g_cvfds.pollcount);
gpr_thd_options_set_detached(&opt);
GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
return pargs;
}
static void cache_delete_locked(poll_args *args) {
if (!args->prev) {
uint32_t key = gpr_murmur_hash3(
args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
key = key % poll_cache.size;
GPR_ASSERT(poll_cache.active_pollers[key] == args);
poll_cache.active_pollers[key] = args->next;
} else {
args->prev->next = args->next;
}
gpr_mu_lock(&g_cvfds.mu);
if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
// Signal main thread that the poll completed
gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
gpr_cv_signal(pargs->cv);
if (args->next) {
args->next->prev = args->prev;
}
decref_poll_args(pargs);
g_cvfds.pollcount--;
if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
gpr_cv_signal(&g_cvfds.shutdown_complete);
poll_cache.count--;
if (poll_cache.free_pollers) {
poll_cache.free_pollers->prev = args;
}
args->prev = NULL;
args->next = poll_cache.free_pollers;
gpr_free(args->fds);
poll_cache.free_pollers = args;
}
static void cache_poller_locked(poll_args *args) {
if (poll_cache.count + 1 > poll_cache.size / 2) {
poll_args **old_active_pollers = poll_cache.active_pollers;
poll_cache.size = poll_cache.size * 2;
poll_cache.count = 0;
poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size);
for (unsigned int i = 0; i < poll_cache.size; i++) {
poll_cache.active_pollers[i] = NULL;
}
for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
poll_args *curr = old_active_pollers[i];
poll_args *next = NULL;
while (curr) {
next = curr->next;
cache_insert_locked(curr);
curr = next;
}
}
gpr_free(old_active_pollers);
}
cache_insert_locked(args);
}
static void cache_destroy_locked(poll_args *args) {
if (args->next) {
args->next->prev = args->prev;
}
if (args->prev) {
args->prev->next = args->next;
} else {
poll_cache.free_pollers = args->next;
}
gpr_free(args);
}
static void decref_poll_result(poll_result *res) {
if (gpr_unref(&res->refcount)) {
GPR_ASSERT(!res->watchers);
gpr_free(res->fds);
gpr_free(res);
}
}
void remove_cvn(cv_node **head, cv_node *target) {
if (target->next) {
target->next->prev = target->prev;
}
if (target->prev) {
target->prev->next = target->next;
} else {
*head = target->next;
}
}
gpr_timespec thread_grace;
// Poll in a background thread
static void run_poll(void *args) {
poll_args *pargs = (poll_args *)args;
while (1) {
poll_result *result = pargs->result;
int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
gpr_mu_lock(&g_cvfds.mu);
if (retval != 0) {
result->completed = 1;
result->retval = retval;
result->err = errno;
cv_node *watcher = result->watchers;
while (watcher) {
gpr_cv_signal(watcher->cv);
watcher = watcher->next;
}
}
if (result->watchcount == 0 || result->completed) {
cache_delete_locked(pargs);
decref_poll_result(result);
// Leave this polling thread alive for a grace period to do another poll()
// op
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
deadline = gpr_time_add(deadline, thread_grace);
pargs->trigger_set = 0;
gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
if (!pargs->trigger_set) {
cache_destroy_locked(pargs);
break;
}
}
gpr_mu_unlock(&g_cvfds.mu);
}
// We still have the lock here
if (gpr_unref(&g_cvfds.pollcount)) {
gpr_cv_signal(&g_cvfds.shutdown_cv);
}
gpr_mu_unlock(&g_cvfds.mu);
}
@ -1322,24 +1506,29 @@ static void run_poll(void *arg) {
static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
gpr_cv *pollcv;
cv_node *cvn, *prev;
cv_node *pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
gpr_thd_id t_id;
gpr_thd_options opt;
poll_args *pargs = NULL;
poll_result *result = NULL;
gpr_mu_lock(&g_cvfds.mu);
pollcv = gpr_malloc(sizeof(gpr_cv));
gpr_cv_init(pollcv);
pollcv = gpr_malloc(sizeof(cv_node));
pollcv->next = NULL;
gpr_cv pollcv_cv;
gpr_cv_init(&pollcv_cv);
pollcv->cv = &pollcv_cv;
cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node));
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
idx = FD_TO_IDX(fds[i].fd);
cvn = gpr_malloc(sizeof(cv_node));
cvn->cv = pollcv;
cvn->next = g_cvfds.cvfds[idx].cvs;
g_cvfds.cvfds[idx].cvs = cvn;
fd_cvs[i].cv = &pollcv_cv;
fd_cvs[i].prev = NULL;
fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
if (g_cvfds.cvfds[idx].cvs) {
g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
}
g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
// Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
skip_poll = 1;
@ -1349,81 +1538,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
}
}
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
if (timeout < 0) {
deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
} else {
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
}
res = 0;
if (!skip_poll && nsockfds > 0) {
pargs = gpr_malloc(sizeof(struct poll_args));
// Both the main thread and calling thread get a reference
gpr_ref_init(&pargs->refcount, 2);
pargs->cv = pollcv;
pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
pargs->nfds = nsockfds;
pargs->timeout = timeout;
pargs->retval = 0;
pargs->err = 0;
gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd >= 0) {
pargs->fds[idx].fd = fds[i].fd;
pargs->fds[idx].events = fds[i].events;
pargs->fds[idx].revents = 0;
pollfds[idx].fd = fds[i].fd;
pollfds[idx].events = fds[i].events;
pollfds[idx].revents = 0;
idx++;
}
}
g_cvfds.pollcount++;
opt = gpr_thd_options_default();
gpr_thd_options_set_detached(&opt);
GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
// We want the poll() thread to trigger the deadline, so wait forever here
gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
res = pargs->retval;
errno = pargs->err;
} else {
errno = 0;
gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
poll_args *pargs = get_poller_locked(pollfds, nsockfds);
result = pargs->result;
pollcv->next = result->watchers;
pollcv->prev = NULL;
if (result->watchers) {
result->watchers->prev = pollcv;
}
result->watchers = pollcv;
result->watchcount++;
gpr_ref(&result->refcount);
pargs->trigger_set = 1;
gpr_cv_signal(&pargs->trigger);
gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
res = result->retval;
errno = result->err;
result->watchcount--;
remove_cvn(&result->watchers, pollcv);
} else if (!skip_poll) {
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
}
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
prev = NULL;
while (cvn->cv != pollcv) {
prev = cvn;
cvn = cvn->next;
GPR_ASSERT(cvn);
}
if (!prev) {
g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
} else {
prev->next = cvn->next;
}
gpr_free(cvn);
remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
} else if (!skip_poll && fds[i].fd >= 0 &&
gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
fds[i].revents = pargs->fds[idx].revents;
} else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
fds[i].revents = result->fds[idx].revents;
idx++;
}
}
if (pargs) {
decref_poll_args(pargs);
} else {
gpr_cv_destroy(pollcv);
gpr_free(pollcv);
gpr_free(fd_cvs);
gpr_free(pollcv);
if (result) {
decref_poll_result(result);
}
gpr_mu_unlock(&g_cvfds.mu);
return res;
@ -1432,12 +1608,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
static void global_cv_fd_table_init() {
gpr_mu_init(&g_cvfds.mu);
gpr_mu_lock(&g_cvfds.mu);
gpr_cv_init(&g_cvfds.shutdown_complete);
g_cvfds.shutdown = 0;
g_cvfds.pollcount = 0;
gpr_cv_init(&g_cvfds.shutdown_cv);
gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = NULL;
thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = NULL;
@ -1447,23 +1623,35 @@ static void global_cv_fd_table_init() {
// Override the poll function with one that supports cvfds
g_cvfds.poll = grpc_poll_function;
grpc_poll_function = &cvfd_poll;
// Initialize the cache
poll_cache.size = 32;
poll_cache.count = 0;
poll_cache.free_pollers = NULL;
poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32);
for (unsigned int i = 0; i < poll_cache.size; i++) {
poll_cache.active_pollers[i] = NULL;
}
gpr_mu_unlock(&g_cvfds.mu);
}
static void global_cv_fd_table_shutdown() {
gpr_mu_lock(&g_cvfds.mu);
g_cvfds.shutdown = 1;
// Attempt to wait for all abandoned poll() threads to terminate
// Not doing so will result in reported memory leaks
if (g_cvfds.pollcount > 0) {
int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
if (!gpr_unref(&g_cvfds.pollcount)) {
int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(3, GPR_TIMESPAN)));
GPR_ASSERT(res == 0);
}
gpr_cv_destroy(&g_cvfds.shutdown_complete);
gpr_cv_destroy(&g_cvfds.shutdown_cv);
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
gpr_free(poll_cache.active_pollers);
gpr_mu_unlock(&g_cvfds.mu);
gpr_mu_destroy(&g_cvfds.mu);
}

@ -43,6 +43,7 @@
typedef struct cv_node {
gpr_cv* cv;
struct cv_node* next;
struct cv_node* prev;
} cv_node;
typedef struct fd_node {
@ -53,9 +54,8 @@ typedef struct fd_node {
typedef struct cv_fd_table {
gpr_mu mu;
int pollcount;
int shutdown;
gpr_cv shutdown_complete;
gpr_refcount pollcount;
gpr_cv shutdown_cv;
fd_node* cvfds;
fd_node* free_fds;
unsigned int size;

Loading…
Cancel
Save