Merge pull request #6911 from ctiller/idempotent_endpoint_shutdown

Idempotent endpoint shutdown
pull/6352/head^2
Jan Tattermusch 9 years ago committed by GitHub
commit fa1433a4b6
  1. 2
      src/core/lib/iomgr/endpoint.h
  2. 24
      src/core/lib/iomgr/ev_poll_and_epoll_posix.c
  3. 24
      src/core/lib/iomgr/ev_poll_posix.c
  4. 4
      src/core/lib/iomgr/ev_posix.c
  5. 6
      src/core/lib/iomgr/ev_posix.h
  6. 2
      src/core/lib/iomgr/tcp_posix.c
  7. 60
      test/core/iomgr/endpoint_tests.c
  8. 2
      test/core/iomgr/tcp_posix_test.c

@ -82,7 +82,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb); gpr_slice_buffer *slices, grpc_closure *cb);
/* Causes any pending read/write callbacks to run immediately with /* Causes any pending and future read/write callbacks to run immediately with
success==0 */ success==0 */
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);

@ -515,7 +515,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) { grpc_closure **st, grpc_closure *closure) {
if (*st == CLOSURE_NOT_READY) { if (fd->shutdown) {
grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */ /* not ready ==> switch to a waiting state by setting the closure */
*st = closure; *st = closure;
} else if (*st == CLOSURE_READY) { } else if (*st == CLOSURE_READY) {
@ -557,13 +559,24 @@ static void set_read_notifier_pollset_locked(
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown); /* only shutdown once */
fd->shutdown = 1; if (!fd->shutdown) {
set_ready_locked(exec_ctx, fd, &fd->read_closure); fd->shutdown = 1;
set_ready_locked(exec_ctx, fd, &fd->write_closure); /* signal read/write closed to OS so that future operations fail */
shutdown(fd->fd, SHUT_RDWR);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
}
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->mu);
} }
static bool fd_is_shutdown(grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
bool r = fd->shutdown;
gpr_mu_unlock(&fd->mu);
return r;
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->mu);
@ -1938,6 +1951,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_wrapped_fd = fd_wrapped_fd, .fd_wrapped_fd = fd_wrapped_fd,
.fd_orphan = fd_orphan, .fd_orphan = fd_orphan,
.fd_shutdown = fd_shutdown, .fd_shutdown = fd_shutdown,
.fd_is_shutdown = fd_is_shutdown,
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,

@ -421,7 +421,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) { grpc_closure **st, grpc_closure *closure) {
if (*st == CLOSURE_NOT_READY) { if (fd->shutdown) {
grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */ /* not ready ==> switch to a waiting state by setting the closure */
*st = closure; *st = closure;
} else if (*st == CLOSURE_READY) { } else if (*st == CLOSURE_READY) {
@ -463,13 +465,24 @@ static void set_read_notifier_pollset_locked(
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown); /* only shutdown once */
fd->shutdown = 1; if (!fd->shutdown) {
set_ready_locked(exec_ctx, fd, &fd->read_closure); fd->shutdown = 1;
set_ready_locked(exec_ctx, fd, &fd->write_closure); /* signal read/write closed to OS so that future operations fail */
shutdown(fd->fd, SHUT_RDWR);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
}
gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&fd->mu);
} }
static bool fd_is_shutdown(grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
bool r = fd->shutdown;
gpr_mu_unlock(&fd->mu);
return r;
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->mu);
@ -1172,6 +1185,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_wrapped_fd = fd_wrapped_fd, .fd_wrapped_fd = fd_wrapped_fd,
.fd_orphan = fd_orphan, .fd_orphan = fd_orphan,
.fd_shutdown = fd_shutdown, .fd_shutdown = fd_shutdown,
.fd_is_shutdown = fd_is_shutdown,
.fd_notify_on_read = fd_notify_on_read, .fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write, .fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,

@ -153,6 +153,10 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
g_event_engine->fd_shutdown(exec_ctx, fd); g_event_engine->fd_shutdown(exec_ctx, fd);
} }
bool grpc_fd_is_shutdown(grpc_fd *fd) {
return g_event_engine->fd_is_shutdown(fd);
}
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) { grpc_closure *closure) {
g_event_engine->fd_notify_on_read(exec_ctx, fd, closure); g_event_engine->fd_notify_on_read(exec_ctx, fd, closure);

@ -55,6 +55,7 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure); grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure); grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd); grpc_fd *fd);
@ -116,7 +117,10 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason); int *release_fd, const char *reason);
/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ /* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd *fd);
/* Cause any current and future callbacks to fail. */
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd); void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
/* Register read interest, causing read_cb to be called once when fd becomes /* Register read interest, causing read_cb to be called once when fd becomes

@ -408,7 +408,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) { if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0); GPR_TIMER_END("tcp_write", 0);
grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); grpc_exec_ctx_enqueue(exec_ctx, cb, !grpc_fd_is_shutdown(tcp->em_fd), NULL);
return; return;
} }
tcp->outgoing_buffer = buf; tcp->outgoing_buffer = buf;

@ -222,7 +222,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
even when bytes_written is unsigned. */ even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size; state.bytes_written -= state.current_write_size;
read_and_write_test_write_handler(&exec_ctx, &state, 1); read_and_write_test_write_handler(&exec_ctx, &state, 1);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming,
&state.done_read); &state.done_read);
@ -233,7 +233,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_log(GPR_DEBUG, "shutdown write"); gpr_log(GPR_DEBUG, "shutdown write");
grpc_endpoint_shutdown(&exec_ctx, state.write_ep); grpc_endpoint_shutdown(&exec_ctx, state.write_ep);
} }
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu); gpr_mu_lock(g_mu);
while (!state.read_done || !state.write_done) { while (!state.read_done || !state.write_done) {
@ -243,7 +243,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_now(GPR_CLOCK_MONOTONIC), deadline); gpr_now(GPR_CLOCK_MONOTONIC), deadline);
} }
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
end_test(config); end_test(config);
gpr_slice_buffer_destroy(&state.outgoing); gpr_slice_buffer_destroy(&state.outgoing);
@ -253,11 +253,64 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void inc_on_failure(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
*(int *)arg += (success == false);
}
static void wait_for_fail_count(grpc_exec_ctx *exec_ctx, int *fail_count,
int want_fail_count) {
grpc_exec_ctx_flush(exec_ctx);
for (int i = 0; i < 5 && *fail_count < want_fail_count; i++) {
grpc_pollset_worker *worker = NULL;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec deadline =
gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN));
gpr_mu_lock(g_mu);
grpc_pollset_work(exec_ctx, g_pollset, &worker, now, deadline);
gpr_mu_unlock(g_mu);
grpc_exec_ctx_flush(exec_ctx);
}
GPR_ASSERT(*fail_count == want_fail_count);
}
static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_test_fixture f =
begin_test(config, "multiple_shutdown_test", 128);
int fail_count = 0;
gpr_slice_buffer slice_buffer;
gpr_slice_buffer_init(&slice_buffer);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
grpc_closure_create(inc_on_failure, &fail_count));
wait_for_fail_count(&exec_ctx, &fail_count, 0);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
wait_for_fail_count(&exec_ctx, &fail_count, 1);
grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
grpc_closure_create(inc_on_failure, &fail_count));
wait_for_fail_count(&exec_ctx, &fail_count, 2);
gpr_slice_buffer_add(&slice_buffer, gpr_slice_from_copied_string("a"));
grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer,
grpc_closure_create(inc_on_failure, &fail_count));
wait_for_fail_count(&exec_ctx, &fail_count, 3);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
wait_for_fail_count(&exec_ctx, &fail_count, 3);
gpr_slice_buffer_destroy(&slice_buffer);
grpc_endpoint_destroy(&exec_ctx, f.client_ep);
grpc_endpoint_destroy(&exec_ctx, f.server_ep);
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_endpoint_tests(grpc_endpoint_test_config config, void grpc_endpoint_tests(grpc_endpoint_test_config config,
grpc_pollset *pollset, gpr_mu *mu) { grpc_pollset *pollset, gpr_mu *mu) {
size_t i; size_t i;
g_pollset = pollset; g_pollset = pollset;
g_mu = mu; g_mu = mu;
multiple_shutdown_test(config);
read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1); read_and_write_test(config, 100000000, 100000, 1, 1);
@ -265,4 +318,5 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config,
read_and_write_test(config, 40320, i, i, 0); read_and_write_test(config, 40320, i, i, 0);
} }
g_pollset = NULL; g_pollset = NULL;
g_mu = NULL;
} }

@ -517,8 +517,8 @@ int main(int argc, char **argv) {
grpc_init(); grpc_init();
g_pollset = gpr_malloc(grpc_pollset_size()); g_pollset = gpr_malloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu); grpc_pollset_init(g_pollset, &g_mu);
run_tests();
grpc_endpoint_tests(configs[0], g_pollset, g_mu); grpc_endpoint_tests(configs[0], g_pollset, g_mu);
run_tests();
grpc_closure_init(&destroyed, destroy_pollset, g_pollset); grpc_closure_init(&destroyed, destroy_pollset, g_pollset);
grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);

Loading…
Cancel
Save