|
|
|
@ -149,7 +149,7 @@ typedef struct epoll_set { |
|
|
|
|
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */ |
|
|
|
|
grpc_wakeup_fd workqueue_wakeup_fd; |
|
|
|
|
|
|
|
|
|
/* Is the polling island shutdown */ |
|
|
|
|
/* Is the epoll set shutdown */ |
|
|
|
|
gpr_atm is_shutdown; |
|
|
|
|
|
|
|
|
|
/* The fd of the underlying epoll set */ |
|
|
|
@ -197,12 +197,12 @@ gpr_thd_id *g_poller_threads = NULL; |
|
|
|
|
* return */ |
|
|
|
|
grpc_pollset g_read_notifier; |
|
|
|
|
|
|
|
|
|
static void add_fd_to_dedicated_eps(grpc_fd *fd); |
|
|
|
|
static bool init_dedicated_epoll_sets(); |
|
|
|
|
static void shutdown_dedicated_epoll_sets(); |
|
|
|
|
static void add_fd_to_eps(grpc_fd *fd); |
|
|
|
|
static bool init_epoll_sets(); |
|
|
|
|
static void shutdown_epoll_sets(); |
|
|
|
|
static void poller_thread_loop(void *arg); |
|
|
|
|
static void start_dedicated_poller_threads(); |
|
|
|
|
static void shutdown_dedicated_poller_threads(); |
|
|
|
|
static void start_poller_threads(); |
|
|
|
|
static void shutdown_poller_threads(); |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* Common helpers |
|
|
|
@ -223,15 +223,15 @@ static bool append_error(grpc_error **composite, grpc_error *error, |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/* The wakeup fd that is used to wake up all threads in a Polling island. This
|
|
|
|
|
is useful in the polling island merge operation where we need to wakeup all |
|
|
|
|
the threads currently polling the smaller polling island (so that they can |
|
|
|
|
start polling the new/merged polling island) |
|
|
|
|
is useful in the epoll set merge operation where we need to wakeup all |
|
|
|
|
the threads currently polling the smaller epoll set (so that they can |
|
|
|
|
start polling the new/merged epoll set) |
|
|
|
|
|
|
|
|
|
NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the |
|
|
|
|
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */ |
|
|
|
|
static grpc_wakeup_fd epoll_set_wakeup_fd; |
|
|
|
|
|
|
|
|
|
/* The polling island being polled right now.
|
|
|
|
|
/* The epoll set being polled right now.
|
|
|
|
|
See comments in workqueue_maybe_wakeup for why this is tracked. */ |
|
|
|
|
static __thread epoll_set *g_current_thread_epoll_set; |
|
|
|
|
|
|
|
|
@ -310,9 +310,9 @@ static void eps_add_ref(epoll_set *eps) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps) { |
|
|
|
|
/* If ref count went to zero, delete the polling island. This deletion is
|
|
|
|
|
/* If ref count went to zero, delete the epoll set. This deletion is
|
|
|
|
|
not done under a lock since once the ref count goes to zero, we are |
|
|
|
|
guaranteed that no one else holds a reference to the polling island (and |
|
|
|
|
guaranteed that no one else holds a reference to the epoll set (and |
|
|
|
|
that there is no racing eps_add_ref() call either).*/ |
|
|
|
|
if (1 == gpr_atm_full_fetch_add(&eps->ref_count, -1)) { |
|
|
|
|
epoll_set_delete(eps); |
|
|
|
@ -588,8 +588,8 @@ static grpc_fd *fd_create(int fd, const char *name) { |
|
|
|
|
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name); |
|
|
|
|
gpr_free(fd_name); |
|
|
|
|
|
|
|
|
|
/* Associate the fd with one of the dedicated eps */ |
|
|
|
|
add_fd_to_dedicated_eps(new_fd); |
|
|
|
|
/* Associate the fd with one of the eps */ |
|
|
|
|
add_fd_to_eps(new_fd); |
|
|
|
|
return new_fd; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -625,7 +625,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
|
|
|
|
|
fd->orphaned = true; |
|
|
|
|
|
|
|
|
|
/* Remove the fd from the polling island */ |
|
|
|
|
/* Remove the fd from the epoll set */ |
|
|
|
|
if (fd->eps != NULL) { |
|
|
|
|
epoll_set_remove_fd(fd->eps, fd, is_fd_closed, &error); |
|
|
|
|
unref_eps = fd->eps; |
|
|
|
@ -640,8 +640,8 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
add_fd_to_freelist(fd); |
|
|
|
|
|
|
|
|
|
if (unref_eps != NULL) { |
|
|
|
|
/* Unref stale polling island here, outside the fd lock above.
|
|
|
|
|
The polling island owns a workqueue which owns an fd, and unreffing |
|
|
|
|
/* Unref stale epoll set here, outside the fd lock above.
|
|
|
|
|
The epoll set owns a workqueue which owns an fd, and unreffing |
|
|
|
|
inside the lock can cause an eventual lock loop that makes TSAN very |
|
|
|
|
unhappy. */ |
|
|
|
|
EPS_UNREF(exec_ctx, unref_eps, "fd_orphan"); |
|
|
|
@ -853,7 +853,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |
|
|
|
|
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
|
|
|
|
|
|
|
|
/* If the pollset has any workers, we cannot call finish_shutdown_locked()
|
|
|
|
|
because it would release the underlying polling island. In such a case, we |
|
|
|
|
because it would release the underlying epoll set. In such a case, we |
|
|
|
|
let the last worker call finish_shutdown_locked() from pollset_work() */ |
|
|
|
|
if (!pollset_has_workers(pollset)) { |
|
|
|
|
GPR_ASSERT(!pollset->finish_shutdown_called); |
|
|
|
@ -958,7 +958,7 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps, |
|
|
|
|
GPR_TIMER_BEGIN("epoll_set_work", 0); |
|
|
|
|
|
|
|
|
|
/* Since epoll_fd is immutable, it is safe to read it without a lock on the
|
|
|
|
|
polling island. */ |
|
|
|
|
epoll set. */ |
|
|
|
|
epoll_fd = eps->epoll_fd; |
|
|
|
|
|
|
|
|
|
/* Add an extra ref so that the island does not get destroyed (which means
|
|
|
|
@ -979,9 +979,9 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps, |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Before leaving, release the extra ref we added to the polling island. It
|
|
|
|
|
/* Before leaving, release the extra ref we added to the epoll set. It
|
|
|
|
|
is important to use "eps" here (i.e our old copy of pollset->eps |
|
|
|
|
that we got before releasing the polling island lock). This is because |
|
|
|
|
that we got before releasing the epoll set lock). This is because |
|
|
|
|
pollset->eps pointer might get udpated in other parts of the |
|
|
|
|
code when there is an island merge while we are doing epoll_wait() above */ |
|
|
|
|
EPS_UNREF(exec_ctx, eps, "ps_work"); |
|
|
|
@ -1108,8 +1108,8 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void shutdown_engine(void) { |
|
|
|
|
shutdown_dedicated_poller_threads(); |
|
|
|
|
shutdown_dedicated_epoll_sets(); |
|
|
|
|
shutdown_poller_threads(); |
|
|
|
|
shutdown_epoll_sets(); |
|
|
|
|
fd_global_shutdown(); |
|
|
|
|
pollset_global_shutdown(); |
|
|
|
|
epoll_set_global_shutdown(); |
|
|
|
@ -1157,9 +1157,9 @@ static const grpc_event_engine_vtable vtable = { |
|
|
|
|
/*****************************************************************************
|
|
|
|
|
* Dedicated polling threads and pollsets - Definitions |
|
|
|
|
*/ |
|
|
|
|
static void add_fd_to_dedicated_eps(grpc_fd *fd) { |
|
|
|
|
static void add_fd_to_eps(grpc_fd *fd) { |
|
|
|
|
GPR_ASSERT(fd->eps == NULL); |
|
|
|
|
GPR_TIMER_BEGIN("add_fd_to_dedicated_eps", 0); |
|
|
|
|
GPR_TIMER_BEGIN("add_fd_to_eps", 0); |
|
|
|
|
|
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
size_t idx = ((size_t)rand()) % g_num_eps; |
|
|
|
@ -1176,15 +1176,15 @@ static void add_fd_to_dedicated_eps(grpc_fd *fd) { |
|
|
|
|
EPS_ADD_REF(eps, "fd"); |
|
|
|
|
fd->eps = eps; |
|
|
|
|
|
|
|
|
|
GRPC_POLLING_TRACE("add_fd_to_dedicated_eps (fd: %d, eps idx = %ld)", fd->fd, |
|
|
|
|
GRPC_POLLING_TRACE("add_fd_to_eps (fd: %d, eps idx = %ld)", fd->fd, |
|
|
|
|
idx); |
|
|
|
|
gpr_mu_unlock(&fd->mu); |
|
|
|
|
|
|
|
|
|
GRPC_LOG_IF_ERROR("add_fd_to_dedicated_eps", error); |
|
|
|
|
GPR_TIMER_END("add_fd_to_dedicated_eps", 0); |
|
|
|
|
GRPC_LOG_IF_ERROR("add_fd_to_eps", error); |
|
|
|
|
GPR_TIMER_END("add_fd_to_eps", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool init_dedicated_epoll_sets() { |
|
|
|
|
static bool init_epoll_sets() { |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
bool is_success = true; |
|
|
|
|
|
|
|
|
@ -1193,32 +1193,32 @@ static bool init_dedicated_epoll_sets() { |
|
|
|
|
for (size_t i = 0; i < g_num_eps; i++) { |
|
|
|
|
g_epoll_sets[i] = epoll_set_create(&error); |
|
|
|
|
if (g_epoll_sets[i] == NULL) { |
|
|
|
|
gpr_log(GPR_ERROR, "Error in creating a dedicated polling island"); |
|
|
|
|
gpr_log(GPR_ERROR, "Error in creating a epoll set"); |
|
|
|
|
g_num_eps = i; /* Helps cleanup */ |
|
|
|
|
shutdown_dedicated_epoll_sets(); |
|
|
|
|
shutdown_epoll_sets(); |
|
|
|
|
is_success = false; |
|
|
|
|
goto done; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EPS_ADD_REF(g_epoll_sets[i], "init_dedicated_epoll_sets"); |
|
|
|
|
EPS_ADD_REF(g_epoll_sets[i], "init_epoll_sets"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu *mu; |
|
|
|
|
pollset_init(&g_read_notifier, &mu); |
|
|
|
|
|
|
|
|
|
done: |
|
|
|
|
GRPC_LOG_IF_ERROR("init_dedicated_epoll_sets", error); |
|
|
|
|
GRPC_LOG_IF_ERROR("init_epoll_sets", error); |
|
|
|
|
return is_success; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void shutdown_dedicated_epoll_sets() { |
|
|
|
|
static void shutdown_epoll_sets() { |
|
|
|
|
if (!g_epoll_sets) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
for (size_t i = 0; i < g_num_eps; i++) { |
|
|
|
|
EPS_UNREF(&exec_ctx, g_epoll_sets[i], "shutdown_dedicated_epoll_sets"); |
|
|
|
|
EPS_UNREF(&exec_ctx, g_epoll_sets[i], "shutdown_epoll_sets"); |
|
|
|
|
} |
|
|
|
|
grpc_exec_ctx_finish(&exec_ctx); |
|
|
|
|
|
|
|
|
@ -1242,7 +1242,7 @@ static void poller_thread_loop(void *arg) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* g_epoll_sets MUST be initialized before calling this */ |
|
|
|
|
static void start_dedicated_poller_threads() { |
|
|
|
|
static void start_poller_threads() { |
|
|
|
|
GPR_ASSERT(g_epoll_sets); |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "Starting poller threads"); |
|
|
|
@ -1258,7 +1258,7 @@ static void start_dedicated_poller_threads() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void shutdown_dedicated_poller_threads() { |
|
|
|
|
static void shutdown_poller_threads() { |
|
|
|
|
GPR_ASSERT(g_poller_threads); |
|
|
|
|
GPR_ASSERT(g_epoll_sets); |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
@ -1275,8 +1275,8 @@ static void shutdown_dedicated_poller_threads() { |
|
|
|
|
gpr_thd_join(g_poller_threads[i]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, "polling island delete called"); |
|
|
|
|
GRPC_LOG_IF_ERROR("shutdown_dedicated_poller_threads", error); |
|
|
|
|
gpr_log(GPR_ERROR, "epoll set delete called"); |
|
|
|
|
GRPC_LOG_IF_ERROR("shutdown_poller_threads", error); |
|
|
|
|
gpr_free(g_poller_threads); |
|
|
|
|
g_poller_threads = NULL; |
|
|
|
|
} |
|
|
|
@ -1317,14 +1317,14 @@ const grpc_event_engine_vtable *grpc_init_epoll_thread_pool_linux(void) { |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!init_dedicated_epoll_sets()) { |
|
|
|
|
if (!init_epoll_sets()) { |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO (sreek): Maynot be a good idea to start threads here (especially if
|
|
|
|
|
* this engine doesn't get picked. Consider introducing an engine_init |
|
|
|
|
* function in the vtable */ |
|
|
|
|
start_dedicated_poller_threads(); |
|
|
|
|
start_poller_threads(); |
|
|
|
|
return &vtable; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|