diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 3877ceb1e26..f9808bbda18 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -82,7 +82,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); -/* Causes any pending read/write callbacks to run immediately with +/* Causes any pending and future read/write callbacks to run immediately with success==0 */ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 943c404f917..ef8bbd62772 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -515,7 +515,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { - if (*st == CLOSURE_NOT_READY) { + if (fd->shutdown) { + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { @@ -557,13 +559,24 @@ static void set_read_notifier_pollset_locked( static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); - GPR_ASSERT(!fd->shutdown); - fd->shutdown = 1; - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + /* only shutdown once */ + if (!fd->shutdown) { + fd->shutdown = 1; + /* signal read/write closed to OS so that future operations fail */ + shutdown(fd->fd, SHUT_RDWR); + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + } gpr_mu_unlock(&fd->mu); } +static bool fd_is_shutdown(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + bool r = fd->shutdown; + gpr_mu_unlock(&fd->mu); + return r; +} + static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { gpr_mu_lock(&fd->mu); @@ -1938,6 +1951,7 @@ static const grpc_event_engine_vtable vtable = { .fd_wrapped_fd = fd_wrapped_fd, .fd_orphan = fd_orphan, .fd_shutdown = fd_shutdown, + .fd_is_shutdown = fd_is_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 0167999dada..63130328693 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -421,7 +421,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { - if (*st == CLOSURE_NOT_READY) { + if (fd->shutdown) { + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { @@ -463,13 +465,24 @@ static void set_read_notifier_pollset_locked( static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); - GPR_ASSERT(!fd->shutdown); - fd->shutdown = 1; - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + /* only shutdown once */ + if (!fd->shutdown) { + fd->shutdown = 1; + /* signal read/write closed to OS so that future operations fail */ + shutdown(fd->fd, SHUT_RDWR); + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + } gpr_mu_unlock(&fd->mu); } +static bool fd_is_shutdown(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + bool r = fd->shutdown; + gpr_mu_unlock(&fd->mu); + return r; +} + static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { gpr_mu_lock(&fd->mu); @@ -1172,6 +1185,7 @@ static const grpc_event_engine_vtable vtable = { .fd_wrapped_fd = fd_wrapped_fd, .fd_orphan = fd_orphan, .fd_shutdown = fd_shutdown, + .fd_is_shutdown = fd_is_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 6477b05dcd8..a17ee17945a 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -153,6 +153,10 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { g_event_engine->fd_shutdown(exec_ctx, fd); } +bool grpc_fd_is_shutdown(grpc_fd *fd) { + return g_event_engine->fd_is_shutdown(fd); +} + void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { g_event_engine->fd_notify_on_read(exec_ctx, fd, closure); diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 344bf63438a..53457aa9d53 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -55,6 +55,7 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); + bool (*fd_is_shutdown)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -116,7 +117,10 @@ int grpc_fd_wrapped_fd(grpc_fd *fd); void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason); -/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ +/* Has grpc_fd_shutdown been called on an fd? */ +bool grpc_fd_is_shutdown(grpc_fd *fd); + +/* Cause any current and future callbacks to fail. */ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd); /* Register read interest, causing read_cb to be called once when fd becomes diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index e2869224f1f..1bfc190f7aa 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -408,7 +408,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); + grpc_exec_ctx_enqueue(exec_ctx, cb, !grpc_fd_is_shutdown(tcp->em_fd), NULL); return; } tcp->outgoing_buffer = buf; diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 6d15ecf141d..0f26c0edfdd 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -222,7 +222,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; read_and_write_test_write_handler(&exec_ctx, &state, 1); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, &state.done_read); @@ -233,7 +233,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_log(GPR_DEBUG, "shutdown write"); grpc_endpoint_shutdown(&exec_ctx, state.write_ep); } - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!state.read_done || !state.write_done) { @@ -243,7 +243,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); end_test(config); gpr_slice_buffer_destroy(&state.outgoing); @@ -253,11 +253,64 @@ static void read_and_write_test(grpc_endpoint_test_config config, grpc_exec_ctx_finish(&exec_ctx); } +static void inc_on_failure(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + *(int *)arg += (success == false); +} + +static void wait_for_fail_count(grpc_exec_ctx *exec_ctx, int *fail_count, + int want_fail_count) { + grpc_exec_ctx_flush(exec_ctx); + for (int i = 0; i < 5 && *fail_count < want_fail_count; i++) { + grpc_pollset_worker *worker = NULL; + gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); + gpr_mu_lock(g_mu); + grpc_pollset_work(exec_ctx, g_pollset, &worker, now, deadline); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_flush(exec_ctx); + } + GPR_ASSERT(*fail_count == want_fail_count); +} + +static void multiple_shutdown_test(grpc_endpoint_test_config config) { + grpc_endpoint_test_fixture f = + begin_test(config, "multiple_shutdown_test", 128); + int fail_count = 0; + + gpr_slice_buffer slice_buffer; + gpr_slice_buffer_init(&slice_buffer); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + wait_for_fail_count(&exec_ctx, &fail_count, 0); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + wait_for_fail_count(&exec_ctx, &fail_count, 1); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + wait_for_fail_count(&exec_ctx, &fail_count, 2); + gpr_slice_buffer_add(&slice_buffer, gpr_slice_from_copied_string("a")); + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + wait_for_fail_count(&exec_ctx, &fail_count, 3); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + wait_for_fail_count(&exec_ctx, &fail_count, 3); + + gpr_slice_buffer_destroy(&slice_buffer); + + grpc_endpoint_destroy(&exec_ctx, f.client_ep); + grpc_endpoint_destroy(&exec_ctx, f.server_ep); + grpc_exec_ctx_finish(&exec_ctx); +} + void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset, gpr_mu *mu) { size_t i; g_pollset = pollset; g_mu = mu; + multiple_shutdown_test(config); read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); @@ -265,4 +318,5 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config, read_and_write_test(config, 40320, i, i, 0); } g_pollset = NULL; + g_mu = NULL; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 19ba258303e..1112e8a3aa5 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -517,8 +517,8 @@ int main(int argc, char **argv) { grpc_init(); g_pollset = gpr_malloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); - run_tests(); grpc_endpoint_tests(configs[0], g_pollset, g_mu); + run_tests(); grpc_closure_init(&destroyed, destroy_pollset, g_pollset); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx);