diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 51842fc2081..c60ff85898f 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -68,7 +68,7 @@ static int grpc_polling_trace = 0; /* Disabled by default */ gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \ } -/* Uncomment the following enable extra checks on poll_object operations */ +/* Uncomment the following to enable extra checks on poll_object operations */ /* #define PO_DEBUG */ static int grpc_wakeup_signal = -1; @@ -141,23 +141,26 @@ struct grpc_fd { gpr_atm refst; /* Indicates that the fd is shutdown and that any pending read/write closures - should fail */ - bool shutdown; - grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */ + should fail. */ + // TODO: sreek storing bool and grpc_error* + gpr_atm shutdown1; + gpr_atm shutdown_error1; /* reason for shutdown: set iff shutdown==true */ /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ bool orphaned; - /* TODO: sreek - Move this to a lockfree implementation */ - grpc_closure *read_closure; - grpc_closure *write_closure; + /* Closures to call when the fd is readable or writable. The actual type + stored in these is (grpc_closure *) */ + gpr_atm read_closure; + gpr_atm write_closure; struct grpc_fd *freelist_next; grpc_closure *on_done_closure; - /* The pollset that last noticed that the fd is readable */ - grpc_pollset *read_notifier_pollset; + /* The pollset that last noticed that the fd is readable. The actual type + * stored in this is (grpc_pollset *) */ + gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; }; @@ -180,8 +183,8 @@ static void fd_unref(grpc_fd *fd); static void fd_global_init(void); static void fd_global_shutdown(void); -#define CLOSURE_NOT_READY ((grpc_closure *)0) -#define CLOSURE_READY ((grpc_closure *)1) +#define CLOSURE_NOT_READY ((gpr_atm)0) +#define CLOSURE_READY ((gpr_atm)1) /******************************************************************************* * Polling island Declarations @@ -908,7 +911,12 @@ static void unref_by(grpc_fd *fd, int n) { fd->freelist_next = fd_freelist; fd_freelist = fd; grpc_iomgr_unregister_object(&fd->iomgr_object); - if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); + + if ((bool)gpr_atm_acq_load(&fd->shutdown1)) { + grpc_error *err = + (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1); + GRPC_ERROR_UNREF(err); + } gpr_mu_unlock(&fd_freelist_mu); } else { @@ -972,13 +980,15 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - new_fd->shutdown = false; + gpr_atm_rel_store(&new_fd->shutdown1, (gpr_atm) false); + gpr_atm_rel_store(&new_fd->shutdown_error1, (gpr_atm)GRPC_ERROR_NONE); new_fd->orphaned = false; - new_fd->read_closure = CLOSURE_NOT_READY; - new_fd->write_closure = CLOSURE_NOT_READY; + gpr_atm_rel_store(&new_fd->read_closure, CLOSURE_NOT_READY); + gpr_atm_rel_store(&new_fd->write_closure, CLOSURE_NOT_READY); + gpr_atm_rel_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); + new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; - new_fd->read_notifier_pollset = NULL; gpr_mu_unlock(&new_fd->po.mu); @@ -1061,100 +1071,159 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static grpc_error *fd_shutdown_error(grpc_fd *fd) { + grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1); + if (err != GRPC_ERROR_NONE) { + err = GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &err, 1); + } + + return err; + + /* TODO sreek - delete this */ + /* if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); } -} + */ +} + +static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, + grpc_closure *closure) { + bool is_done = false; + while (!is_done) { + is_done = true; + if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { + // CAS failed because the current value of 'state' is not + // 'CLOSURE_NOT_READY' + gpr_atm curr = gpr_atm_acq_load(state); + + switch (curr) { + case CLOSURE_NOT_READY: { + // The CAS above failed because the state was not 'CLOSURE_NOT_READY' + // but it seems to be back to 'CLOSURE_NOT_READY'. Lets retry CAS + // again + is_done = false; + break; + } -static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st, grpc_closure *closure) { - if (fd->shutdown) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); - } else if (*st == CLOSURE_NOT_READY) { - /* not ready ==> switch to a waiting state by setting the closure */ - *st = closure; - } else if (*st == CLOSURE_READY) { - /* already ready ==> queue the closure to run immediately */ - *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); - } else { - /* upcallptr was set to a different closure. This is an error! */ - gpr_log(GPR_ERROR, - "User called a notify_on function with a previous callback still " - "pending"); - abort(); + case CLOSURE_READY: { + // Change the state to CLOSURE_NOT_READY and if successful, schedule + // the closure + if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); + } else { + // Looks like the current state is not CLOSURE_READY anymore. Retry + // from the beginning + is_done = false; + } + } + + default: { + // The current state already contains a closure. This is a fatal error + gpr_log( + GPR_ERROR, + "User called notify_on function with a previous callback still " + "pending"); + abort(); + break; + } + } + } } } -/* returns 1 if state becomes not ready */ -static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st) { - if (*st == CLOSURE_READY) { - /* duplicate ready ==> ignore */ - return 0; - } else if (*st == CLOSURE_NOT_READY) { - /* not ready, and not waiting ==> flag ready */ - *st = CLOSURE_READY; - return 0; - } else { - /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); - *st = CLOSURE_NOT_READY; - return 1; - } +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { + /* Try the fast-path first (i.e expect current value to be CLOSURE_NOT_READY + * and then try to change it to CLOSURE_READY) */ + if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { + /* CAS failed since the current state is not CLOSURE_NOT_READY. Find out + what is the current state */ + gpr_atm curr = gpr_atm_acq_load(state); + switch (curr) { + case CLOSURE_READY: { + /* Already ready. We are done here */ + break; + } + + case CLOSURE_NOT_READY: { + /* The state was not CLOSURE_NOT_READY when we checked initially at the + beginning of this function but now it is CLOSURE_NOT_READY. This is + only possible if the state transitioned out of CLOSURE_NOT_READY to + either CLOSURE_READY or and then back to + CLOSURE_NOT_READY again. So there is no need to make the state + CLOSURE_READY now */ + break; + } + + default: { + /* 'curr' is a closure. This closure should be enqueued and the current + state should be changed to CLOSURE_NOT_READY */ + if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) { + grpc_closure_sched(exec_ctx, (grpc_closure *)*state, + fd_shutdown_error(fd)); + } /* else the state changed again. This can only happen due to another + racing set_ready function (which means, we do not have to do + anything else here */ + break; + } + } + } /* else fast-path succeeded. We are done */ } static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_pollset *notifier = NULL; - - gpr_mu_lock(&fd->po.mu); - notifier = fd->read_notifier_pollset; - gpr_mu_unlock(&fd->po.mu); - - return notifier; + gpr_atm notifier = gpr_atm_no_barrier_load(&fd->read_closure); + return (grpc_pollset *)notifier; } static bool fd_is_shutdown(grpc_fd *fd) { - gpr_mu_lock(&fd->po.mu); - const bool r = fd->shutdown; - gpr_mu_unlock(&fd->po.mu); - return r; + return (bool)gpr_atm_acq_load(&fd->shutdown1); } /* Might be called multiple times */ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { - gpr_mu_lock(&fd->po.mu); - /* Do the actual shutdown only once */ - if (!fd->shutdown) { - fd->shutdown = true; - fd->shutdown_error = why; + if (gpr_atm_acq_cas(&fd->shutdown1, (gpr_atm) false, (gpr_atm) true)) { + gpr_atm_rel_store(&fd->shutdown_error1, (gpr_atm)why); shutdown(fd->fd, SHUT_RDWR); - /* Flush any pending read and write closures. Since fd->shutdown is 'true' - at this point, the closures would be called with 'success = false' */ - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + + /* Flush any pending read and write closures at this point. Since + fd->shutdown_error1 is set, both the closures would be called with + success = false */ + set_ready(exec_ctx, fd, &fd->read_closure); + set_ready(exec_ctx, fd, &fd->write_closure); + } else { + // Shutdown already called GRPC_ERROR_UNREF(why); } - gpr_mu_unlock(&fd->po.mu); + + // gpr_mu_lock(&fd->po.mu); + /* Do the actual shutdown only once */ + // if (!fd->shutdown) { + // fd->shutdown = true; + // fd->shutdown_error = why; + + // shutdown(fd->fd, SHUT_RDWR); + /* Flush any pending read and write closures. Since fd->shutdown is 'true' + at this point, the closures would be called with 'success = false' */ + // set_ready(exec_ctx, fd, &fd->read_closure); + // set_ready(exec_ctx, fd, &fd->write_closure); + // } else { + // GRPC_ERROR_UNREF(why); + // } + // gpr_mu_unlock(&fd->po.mu); } static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - gpr_mu_lock(&fd->po.mu); - notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); - gpr_mu_unlock(&fd->po.mu); + notify_on(exec_ctx, fd, &fd->read_closure, closure); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - gpr_mu_lock(&fd->po.mu); - notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); - gpr_mu_unlock(&fd->po.mu); + notify_on(exec_ctx, fd, &fd->write_closure, closure); } static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { @@ -1343,18 +1412,18 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - /* Need the fd->po.mu since we might be racing with fd_notify_on_read */ - gpr_mu_lock(&fd->po.mu); - set_ready_locked(exec_ctx, fd, &fd->read_closure); - fd->read_notifier_pollset = notifier; - gpr_mu_unlock(&fd->po.mu); + set_ready(exec_ctx, fd, &fd->read_closure); + + // Note, it is possible that fd_become_readable might be called twice with + // different 'notifier's when an fd becomes readable and it is in two epoll + // sets (This can happen briefly during polling island merges). In such cases + // it does not really matter which notifer is set as the read_notifier_pollset + // (They would both point to the same polling island anyway) + gpr_atm_no_barrier_store(&fd->read_notifier_pollset, (gpr_atm)notifier); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - /* Need the fd->po.mu since we might be racing with fd_notify_on_write */ - gpr_mu_lock(&fd->po.mu); - set_ready_locked(exec_ctx, fd, &fd->write_closure); - gpr_mu_unlock(&fd->po.mu); + set_ready(exec_ctx, fd, &fd->write_closure); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,