pollset_kick optimization (do not kick any other thread if the current

thread can be kicked)
pull/6803/head
Sree Kuchibhotla 9 years ago
parent f325ee367a
commit 8e4926c0ee
  1. 153
      src/core/lib/iomgr/ev_epoll_linux.c

@ -844,6 +844,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/*******************************************************************************
* Pollset Definitions
*/
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
static void sig_handler(int sig_num) {
#ifdef GRPC_EPOLL_DEBUG
@ -859,11 +861,15 @@ static void poller_kick_init() {
/* Global state management */
static void pollset_global_init(void) {
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
poller_kick_init();
}
static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
static void pollset_worker_kick(grpc_pollset_worker *worker) {
@ -915,7 +921,9 @@ static void pollset_kick(grpc_pollset *p,
GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
for (worker = p->root_worker.next; worker != &p->root_worker;
worker = worker->next) {
pollset_worker_kick(worker);
if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
pollset_worker_kick(worker);
}
}
} else {
p->kicked_without_pollers = true;
@ -923,9 +931,18 @@ static void pollset_kick(grpc_pollset *p,
GPR_TIMER_END("pollset_kick.broadcast", 0);
} else {
GPR_TIMER_MARK("kicked_specifically", 0);
pollset_worker_kick(worker);
if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
pollset_worker_kick(worker);
}
}
} else {
} else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
/* Since worker == NULL, it means that we can kick "any" worker on this
pollset 'p'. If 'p' happens to be the same pollset this thread is
currently polling (i.e in pollset_work() function), then there is no need
to kick any other worker since the current thread can just absorb the
kick. This is the reason why we enter this case only when
g_current_thread_pollset is != p */
GPR_TIMER_MARK("kick_anonymous", 0);
worker = pop_front_worker(p);
if (worker != NULL) {
@ -999,6 +1016,69 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu);
}
/* Release the reference to pollset->polling_island and set it to NULL.
pollset->mu must be held */
static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
gpr_mu_lock(&pollset->pi_mu);
if (pollset->polling_island) {
pollset->polling_island =
polling_island_update_and_lock(pollset->polling_island, 1, 0);
polling_island_unref_and_unlock(pollset->polling_island, 1);
pollset->polling_island = NULL;
}
gpr_mu_unlock(&pollset->pi_mu);
}
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
/* The pollset cannot have any workers if we are at this stage */
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->finish_shutdown_called = true;
pollset_release_polling_island_locked(pollset);
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
}
/* pollset->mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = true;
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
/* If the pollset has any workers, we cannot call finish_shutdown_locked()
because it would release the underlying polling island. In such a case, we
let the last worker call finish_shutdown_locked() from pollset_work() */
if (!pollset_has_workers(pollset)) {
GPR_ASSERT(!pollset->finish_shutdown_called);
GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
finish_shutdown_locked(exec_ctx, pollset);
}
GPR_TIMER_END("pollset_shutdown", 0);
}
/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
* than destroying the mutexes, there is nothing special that needs to be done
* here */
static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
gpr_mu_destroy(&pollset->pi_mu);
gpr_mu_destroy(&pollset->mu);
}
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
pollset_release_polling_island_locked(pollset);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, int timeout_ms,
@ -1103,69 +1183,6 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
/* Release the reference to pollset->polling_island and set it to NULL.
pollset->mu must be held */
static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
gpr_mu_lock(&pollset->pi_mu);
if (pollset->polling_island) {
pollset->polling_island =
polling_island_update_and_lock(pollset->polling_island, 1, 0);
polling_island_unref_and_unlock(pollset->polling_island, 1);
pollset->polling_island = NULL;
}
gpr_mu_unlock(&pollset->pi_mu);
}
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
/* The pollset cannot have any workers if we are at this stage */
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->finish_shutdown_called = true;
pollset_release_polling_island_locked(pollset);
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
}
/* pollset->mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = true;
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
/* If the pollset has any workers, we cannot call finish_shutdown_locked()
because it would release the underlying polling island. In such a case, we
let the last worker call finish_shutdown_locked() from pollset_work() */
if (!pollset_has_workers(pollset)) {
GPR_ASSERT(!pollset->finish_shutdown_called);
GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
finish_shutdown_locked(exec_ctx, pollset);
}
GPR_TIMER_END("pollset_shutdown", 0);
}
/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
* than destroying the mutexes, there is nothing special that needs to be done
* here */
static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
gpr_mu_destroy(&pollset->pi_mu);
gpr_mu_destroy(&pollset->mu);
}
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(!pollset_has_workers(pollset));
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
pollset_release_polling_island_locked(pollset);
}
/* pollset->mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->mu)
during the course of its execution but it will always re-acquire the lock and
@ -1184,6 +1201,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker.pt_id = pthread_self();
*worker_hdl = &worker;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
if (pollset->kicked_without_pollers) {
/* If the pollset was kicked without pollers, pretend that the current
@ -1226,6 +1245,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
*worker_hdl = NULL;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
GPR_TIMER_END("pollset_work", 0);
}

Loading…
Cancel
Save