diff --git a/src/core/lib/iomgr/ev_epoll_posix.c b/src/core/lib/iomgr/ev_epoll_posix.c index ce8d3981b3f..cb4a00a75c4 100644 --- a/src/core/lib/iomgr/ev_epoll_posix.c +++ b/src/core/lib/iomgr/ev_epoll_posix.c @@ -185,11 +185,6 @@ struct grpc_pollset_worker { }; struct grpc_pollset { - /* pollsets under posix can mutate representation as fds are added and - removed. - For example, we may choose a poll() based implementation on linux for - few fds, and an epoll() based implementation for many fds */ - const grpc_pollset_vtable *vtable; gpr_mu mu; grpc_pollset_worker root_worker; int in_flight_cbs; @@ -248,8 +243,6 @@ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd **fds, size_t fd_count); -static platform_become_multipoller_type platform_become_multipoller; - /* Return 1 if the pollset has active threads in pollset_work (pollset must * be locked) */ @@ -796,8 +789,6 @@ static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } /* main interface */ -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); - static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; @@ -809,14 +800,20 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; pollset->local_wakeup_cache = NULL; pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); + pollset->data.ptr = NULL; } +/* TODO(sreek): Maybe merge multipoll_*_destroy() with pollset_destroy() + * function */ +static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset); + static void pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); + + multipoll_with_epoll_pollset_destroy(pollset); + while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); @@ -831,17 +828,24 @@ static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); + + multipoll_with_epoll_pollset_destroy(pollset); + pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); } +/* TODO (sreek): Remove multipoll_with_epoll_add_fd declaration*/ +static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_fd *fd, + int and_unlock_pollset); + static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); + multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to add_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -852,12 +856,21 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, #endif } +/* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */ +static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset); + static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); - pollset->vtable->finish_shutdown(pollset); + multipoll_with_epoll_pollset_finish_shutdown(pollset); grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); } +/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock declaration + */ +static void multipoll_with_epoll_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); + static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { @@ -915,8 +928,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + + multipoll_with_epoll_pollset_maybe_work_and_unlock( + exec_ctx, pollset, &worker, deadline, now); + GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -1013,233 +1028,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } -/* - * basic_pollset - a vtable that provides polling for zero or one file - * descriptor via poll() - */ - -typedef struct grpc_unary_promote_args { - const grpc_pollset_vtable *original_vtable; - grpc_pollset *pollset; - grpc_fd *fd; - grpc_closure promotion_closure; -} grpc_unary_promote_args; - -static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - bool success) { - grpc_unary_promote_args *up_args = args; - const grpc_pollset_vtable *original_vtable = up_args->original_vtable; - grpc_pollset *pollset = up_args->pollset; - grpc_fd *fd = up_args->fd; - - /* - * This is quite tricky. There are a number of cases to keep in mind here: - * 1. fd may have been orphaned - * 2. The pollset may no longer be a unary poller (and we can't let case #1 - * leak to other pollset types!) - * 3. pollset's fd (which may have changed) may have been orphaned - * 4. The pollset may be shutting down. - */ - - gpr_mu_lock(&pollset->mu); - /* First we need to ensure that nobody is polling concurrently */ - GPR_ASSERT(!pollset_has_workers(pollset)); - - gpr_free(up_args); - /* At this point the pollset may no longer be a unary poller. In that case - * we should just call the right add function and be done. */ - /* TODO(klempner): If we're not careful this could cause infinite recursion. - * That's not a problem for now because empty_pollset has a trivial poller - * and we don't have any mechanism to unbecome multipoller. */ - pollset->in_flight_cbs--; - if (pollset->shutting_down) { - /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { - pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); - } - } else if (fd_is_orphaned(fd)) { - /* Don't try to add it to anything, we'll drop our ref on it below */ - } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); - } else if (fd != pollset->data.ptr) { - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] && !fd_is_orphaned(fds[0])) { - platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - /* Note that it is possible that fds[1] is also orphaned at this point. - * That's okay, we'll correct it at the next add or poll. */ - if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - } - - gpr_mu_unlock(&pollset->mu); - - /* Matching ref in basic_pollset_add_fd */ - GRPC_FD_UNREF(fd, "basicpoll_add"); -} - -static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd, int and_unlock_pollset) { - grpc_unary_promote_args *up_args; - GPR_ASSERT(fd); - if (fd == pollset->data.ptr) goto exit; - - if (!pollset_has_workers(pollset)) { - /* Fast path -- no in flight cbs */ - /* TODO(klempner): Comment this out and fix any test failures or establish - * they are due to timing issues */ - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] == NULL) { - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } else if (!fd_is_orphaned(fds[0])) { - platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - goto exit; - } - - /* Now we need to promote. This needs to happen when we're not polling. Since - * this may be called from poll, the wait needs to happen asynchronously. */ - GRPC_FD_REF(fd, "basicpoll_add"); - pollset->in_flight_cbs++; - up_args = gpr_malloc(sizeof(*up_args)); - up_args->fd = fd; - up_args->original_vtable = pollset->vtable; - up_args->pollset = pollset; - up_args->promotion_closure.cb = basic_do_promote; - up_args->promotion_closure.cb_arg = up_args; - - grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); - pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } -} - -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - struct pollfd pfd[3]; - grpc_fd *fd; - grpc_fd_watcher fd_watcher; - int timeout; - int r; - nfds_t nfds; - - fd = pollset->data.ptr; - if (fd && fd_is_orphaned(fd)) { - GRPC_FD_UNREF(fd, "basicpoll"); - fd = pollset->data.ptr = NULL; - } - timeout = poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfd[0].events = POLLIN; - pfd[0].revents = 0; - pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfd[1].events = POLLIN; - pfd[1].revents = 0; - nfds = 2; - if (fd) { - pfd[2].fd = fd->fd; - pfd[2].revents = 0; - GRPC_FD_REF(fd, "basicpoll_begin"); - gpr_mu_unlock(&pollset->mu); - pfd[2].events = - (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); - if (pfd[2].events != 0) { - nfds++; - } - } else { - gpr_mu_unlock(&pollset->mu); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - /* poll fd count (argument 2) is shortened by one if we have no events - to poll on - such that it only includes the kicker */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfd, nfds, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } else if (r == 0) { - if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } else { - if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } - if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - if (nfds > 2) { - fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, - pfd[2].revents & POLLOUT_CHECK); - } else if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } - - if (fd) { - GRPC_FD_UNREF(fd, "basicpoll_begin"); - } -} - -static void basic_pollset_destroy(grpc_pollset *pollset) { - if (pollset->data.ptr != NULL) { - GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); - pollset->data.ptr = NULL; - } -} - -static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, - basic_pollset_destroy, basic_pollset_destroy}; - -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { - pollset->vtable = &basic_pollset; - pollset->data.ptr = fd_or_null; - if (fd_or_null != NULL) { - GRPC_FD_REF(fd_or_null, "basicpoll"); - } -} - - /******************************************************************************* * pollset_multipoller_with_epoll_posix.c */ @@ -1274,6 +1062,7 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { set_ready(exec_ctx, fd, &fd->write_closure); } +/* TODO (sreek): Maybe this global list is not required. Double check*/ struct epoll_fd_list { int *epoll_fds; size_t count; @@ -1390,10 +1179,48 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(da); } +/* Creates an epoll fd and initializes the pollset */ +static void multipoll_with_epoll_pollset_create_efd(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); + struct epoll_event ev; + int err; + + /* Ensuring that the pollset is infact empty (with no epoll fd either) */ + GPR_ASSERT(pollset->data.ptr == NULL); + + pollset->data.ptr = h; + h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (h->epoll_fd < 0) { + gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); + abort(); + } + add_epoll_fd_to_global_list(h->epoll_fd); + + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = NULL; + + /* TODO (sreek): Double-check the use of grpc_global_wakeup_fd here (right now + * I do not know why this is used. I just copied this code from + * epoll_become_mutipoller() function in ev_poll_and_epoll_posix.c file */ + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), + strerror(errno)); + } +} + static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset) { + /* If there is no epoll fd on the pollset, create one */ + if (pollset->data.ptr == NULL) { + multipoll_with_epoll_pollset_create_efd(exec_ctx, pollset); + } + if (and_unlock_pollset) { gpr_mu_unlock(&pollset->mu); finally_add_fd(exec_ctx, pollset, fd); @@ -1500,45 +1327,6 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { gpr_free(h); } -static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, - multipoll_with_epoll_pollset_maybe_work_and_unlock, - multipoll_with_epoll_pollset_finish_shutdown, - multipoll_with_epoll_pollset_destroy}; - -static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); - struct epoll_event ev; - int err; - - pollset->vtable = &multipoll_with_epoll_pollset; - pollset->data.ptr = h; - h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (h->epoll_fd < 0) { - /* TODO(klempner): Fall back to poll here, especially on ENOSYS */ - gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); - abort(); - } - add_epoll_fd_to_global_list(h->epoll_fd); - - ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = NULL; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); - if (err < 0) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), - strerror(errno)); - } - - for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); - } -} - /******************************************************************************* * pollset_set_posix.c */ @@ -1724,7 +1512,6 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_epoll_posix(void) { - platform_become_multipoller = epoll_become_multipoller; fd_global_init(); pollset_global_init(); return &vtable;