Add an error to fd_shutdown (and recursively)

Allows diagnosing WHY a file descriptor was shutdown prematurely.
pull/9486/head
Craig Tiller 8 years ago
parent 46357c882d
commit cda759d658
  1. 6
      src/core/ext/client_channel/connector.c
  2. 7
      src/core/ext/client_channel/connector.h
  3. 10
      src/core/ext/client_channel/http_connect_handshaker.c
  4. 3
      src/core/ext/client_channel/subchannel.c
  5. 14
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  6. 12
      src/core/ext/transport/chttp2/server/chttp2_server.c
  7. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  8. 14
      src/core/lib/channel/handshaker.c
  9. 8
      src/core/lib/channel/handshaker.h
  10. 5
      src/core/lib/iomgr/endpoint.c
  11. 5
      src/core/lib/iomgr/endpoint.h
  12. 17
      src/core/lib/iomgr/ev_epoll_linux.c
  13. 17
      src/core/lib/iomgr/ev_poll_posix.c
  14. 4
      src/core/lib/iomgr/ev_posix.c
  15. 4
      src/core/lib/iomgr/ev_posix.h
  16. 3
      src/core/lib/iomgr/network_status_tracker.c
  17. 3
      src/core/lib/iomgr/tcp_client_posix.c
  18. 5
      src/core/lib/iomgr/tcp_posix.c
  19. 6
      src/core/lib/iomgr/tcp_server_posix.c
  20. 3
      src/core/lib/iomgr/udp_server.c
  21. 6
      src/core/lib/security/transport/secure_endpoint.c
  22. 13
      src/core/lib/security/transport/security_handshaker.c
  23. 6
      test/core/bad_client/bad_client.c
  24. 4
      test/core/client_channel/set_initial_connect_string_test.c
  25. 3
      test/core/end2end/bad_server_response_test.c
  26. 9
      test/core/end2end/fixtures/http_proxy.c
  27. 2
      test/core/internal_api_canaries/iomgr.c
  28. 12
      test/core/iomgr/endpoint_tests.c
  29. 3
      test/core/iomgr/ev_epoll_linux_test.c
  30. 3
      test/core/iomgr/fd_posix_test.c
  31. 3
      test/core/iomgr/tcp_client_posix_test.c
  32. 2
      test/core/iomgr/tcp_server_posix_test.c
  33. 6
      test/core/security/secure_endpoint_test.c
  34. 3
      test/core/security/ssl_server_fuzzer.c
  35. 2
      test/core/surface/concurrent_connectivity_test.c
  36. 8
      test/core/util/mock_endpoint.c
  37. 10
      test/core/util/passthru_endpoint.c
  38. 2
      test/core/util/reconnect_server.c

@ -49,7 +49,7 @@ void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify);
}
void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx,
grpc_connector* connector) {
connector->vtable->shutdown(exec_ctx, connector);
void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
grpc_error* why) {
connector->vtable->shutdown(exec_ctx, connector, why);
}

@ -68,7 +68,8 @@ struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
/** Implementation of grpc_connector_shutdown */
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_error *why);
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
const grpc_connect_in_args *in_args,
@ -83,7 +84,7 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_connect_out_args *out_args,
grpc_closure *notify);
/** Cancel any pending connection */
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_connector *connector);
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_error *why);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */

@ -123,7 +123,8 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
GRPC_ERROR_REF(error));
// Not shutting down, so the handshake failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(exec_ctx, handshaker);
@ -251,15 +252,18 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
grpc_handshaker* handshaker_in,
grpc_error* why) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
if (!handshaker->shutdown) {
handshaker->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
GRPC_ERROR_REF(why));
cleanup_args_for_failure_locked(exec_ctx, handshaker);
}
gpr_mu_unlock(&handshaker->mu);
GRPC_ERROR_UNREF(why);
}
static void http_connect_handshaker_do_handshake(

@ -273,7 +273,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = true;
grpc_connector_shutdown(exec_ctx, c->connector);
grpc_connector_shutdown(exec_ctx, c->connector,
GRPC_ERROR_CREATE("Subchannel disconnected"));
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");

@ -92,19 +92,21 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
}
static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_connector *con) {
grpc_connector *con, grpc_error *why) {
chttp2_connector *c = (chttp2_connector *)con;
gpr_mu_lock(&c->mu);
c->shutdown = true;
if (c->handshake_mgr != NULL) {
grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr,
GRPC_ERROR_REF(why));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!c->connecting && c->endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint);
grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&c->mu);
GRPC_ERROR_UNREF(why);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@ -121,7 +123,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, args->endpoint);
grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error));
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(exec_ctx, args->args);
grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@ -195,7 +197,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_closure_sched(exec_ctx, notify, error);
if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
if (c->endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
} else {

@ -101,16 +101,19 @@ static void pending_handshake_manager_remove_locked(
}
static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx,
server_state *state) {
server_state *state,
grpc_error *why) {
pending_handshake_manager_node *prev_node = NULL;
for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
node != NULL; node = node->next) {
grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr);
grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr,
GRPC_ERROR_REF(why));
gpr_free(prev_node);
prev_node = node;
}
gpr_free(prev_node);
state->pending_handshake_mgrs = NULL;
GRPC_ERROR_UNREF(why);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@ -129,7 +132,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, args->endpoint);
grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_NONE);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(exec_ctx, args->args);
grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@ -210,7 +213,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&state->mu);
grpc_closure *destroy_done = state->server_destroy_listener_done;
GPR_ASSERT(state->shutdown);
pending_handshake_manager_shutdown_locked(exec_ctx, state);
pending_handshake_manager_shutdown_locked(exec_ctx, state,
GRPC_ERROR_REF(error));
gpr_mu_unlock(&state->mu);
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.

@ -417,7 +417,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
grpc_endpoint_shutdown(exec_ctx, t->ep);
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;

@ -55,8 +55,8 @@ void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
}
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {
handshaker->vtable->shutdown(exec_ctx, handshaker);
grpc_handshaker* handshaker, grpc_error* why) {
handshaker->vtable->shutdown(exec_ctx, handshaker, why);
}
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
@ -141,14 +141,17 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
}
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
grpc_handshake_manager* mgr,
grpc_error* why) {
gpr_mu_lock(&mgr->mu);
// Shutdown the handshaker that's currently in progress, if any.
if (!mgr->shutdown && mgr->index > 0) {
mgr->shutdown = true;
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1],
GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&mgr->mu);
GRPC_ERROR_UNREF(why);
}
// Helper function to call either the next handshaker or the
@ -197,7 +200,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_handshake_manager* mgr = arg;
if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
grpc_handshake_manager_shutdown(exec_ctx, mgr);
grpc_handshake_manager_shutdown(exec_ctx, mgr,
GRPC_ERROR_CREATE("Handshake timed out"));
}
grpc_handshake_manager_unref(exec_ctx, mgr);
}

@ -86,7 +86,8 @@ typedef struct {
/// Shuts down the handshaker (e.g., to clean up when the operation is
/// aborted in the middle).
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
grpc_error* why);
/// Performs handshaking, modifying \a args as needed (e.g., to
/// replace \a endpoint with a wrapped endpoint).
@ -111,7 +112,7 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
grpc_handshaker* handshaker, grpc_error* why);
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
@ -141,7 +142,8 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
/// The caller must still call grpc_handshake_manager_destroy() after
/// calling this function.
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr);
grpc_handshake_manager* mgr,
grpc_error* why);
/// Invokes handshakers in the order they were added.
/// Takes ownership of \a endpoint, and then passes that ownership to

@ -54,8 +54,9 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set);
}
void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
ep->vtable->shutdown(exec_ctx, ep);
void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_error* why) {
ep->vtable->shutdown(exec_ctx, ep, why);
}
void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {

@ -57,7 +57,7 @@ struct grpc_endpoint_vtable {
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
@ -96,7 +96,8 @@ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from

@ -143,6 +143,7 @@ struct grpc_fd {
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown;
grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
/* The fd is either closed or we relinquished control of it. In either cases,
this indicates that the 'fd' on this structure is no longer valid */
@ -907,6 +908,7 @@ static void unref_by(grpc_fd *fd, int n) {
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_mu_unlock(&fd_freelist_mu);
} else {
@ -1058,11 +1060,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_ERROR_UNREF(error);
}
static grpc_error *fd_shutdown_error(bool shutdown) {
if (!shutdown) {
static grpc_error *fd_shutdown_error(grpc_fd *fd) {
if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
return GRPC_ERROR_CREATE("FD shutdown");
return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
}
@ -1076,7 +1078,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@ -1098,7 +1100,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@ -1123,17 +1125,20 @@ static bool fd_is_shutdown(grpc_fd *fd) {
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */
if (!fd->shutdown) {
fd->shutdown = true;
fd->shutdown_error = why;
shutdown(fd->fd, SHUT_RDWR);
/* Flush any pending read and write closures. Since fd->shutdown is 'true'
at this point, the closures would be called with 'success = false' */
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
} else {
GRPC_ERROR_UNREF(why);
}
gpr_mu_unlock(&fd->po.mu);
}

@ -82,6 +82,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
grpc_error *shutdown_error;
/* The watcher list.
@ -306,6 +307,7 @@ static void unref_by(grpc_fd *fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
GPR_ASSERT(old > n);
@ -444,11 +446,11 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static grpc_error *fd_shutdown_error(bool shutdown) {
if (!shutdown) {
static grpc_error *fd_shutdown_error(grpc_fd *fd) {
if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
return GRPC_ERROR_CREATE("FD shutdown");
return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
}
@ -462,7 +464,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@ -485,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@ -496,15 +498,18 @@ static void set_read_notifier_pollset_locked(
fd->read_notifier_pollset = read_notifier_pollset;
}
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
gpr_mu_lock(&fd->mu);
/* only shutdown once */
if (!fd->shutdown) {
fd->shutdown = 1;
fd->shutdown_error = why;
/* signal read/write closed to OS so that future operations fail */
shutdown(fd->fd, SHUT_RDWR);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
} else {
GRPC_ERROR_UNREF(why);
}
gpr_mu_unlock(&fd->mu);
}

@ -162,8 +162,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
g_event_engine->fd_shutdown(exec_ctx, fd);
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
g_event_engine->fd_shutdown(exec_ctx, fd, why);
}
bool grpc_fd_is_shutdown(grpc_fd *fd) {

@ -51,7 +51,7 @@ typedef struct grpc_event_engine_vtable {
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@ -140,7 +140,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
bool grpc_fd_is_shutdown(grpc_fd *fd);
/* Cause any current and future callbacks to fail. */
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
/* Register read interest, causing read_cb to be called once when fd becomes
readable, on deadline specified by deadline, or on shutdown triggered by

@ -117,7 +117,8 @@ void grpc_network_status_shutdown_all_endpoints() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) {
curr->ep->vtable->shutdown(&exec_ctx, curr->ep);
curr->ep->vtable->shutdown(&exec_ctx, curr->ep,
GRPC_ERROR_CREATE("Network unavailable"));
}
gpr_mu_unlock(&g_endpoint_mutex);
grpc_exec_ctx_finish(&exec_ctx);

@ -121,7 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
}
gpr_mu_lock(&ac->mu);
if (ac->fd != NULL) {
grpc_fd_shutdown(exec_ctx, ac->fd);
grpc_fd_shutdown(exec_ctx, ac->fd,
GRPC_ERROR_CREATE("connect() timed out"));
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);

@ -119,9 +119,10 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
grpc_fd_shutdown(exec_ctx, tcp->em_fd, why);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}

@ -276,7 +276,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->active_ports) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {
@ -773,7 +774,8 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
if (s->active_ports) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server shutdown"));
}
}
gpr_mu_unlock(&s->mu);

@ -203,7 +203,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {

@ -341,10 +341,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
}
static void endpoint_shutdown(grpc_exec_ctx *exec_ctx,
grpc_endpoint *secure_ep) {
static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
grpc_error *why) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep);
grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why);
}
static void endpoint_destroy(grpc_exec_ctx *exec_ctx,

@ -130,7 +130,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error));
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(exec_ctx, h);
@ -347,15 +347,17 @@ static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx,
}
static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
grpc_handshaker *handshaker) {
grpc_handshaker *handshaker,
grpc_error *why) {
security_handshaker *h = (security_handshaker *)handshaker;
gpr_mu_lock(&h->mu);
if (!h->shutdown) {
h->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why));
cleanup_args_for_failure_locked(exec_ctx, h);
}
gpr_mu_unlock(&h->mu);
GRPC_ERROR_UNREF(why);
}
static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
@ -417,7 +419,10 @@ static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx,
}
static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
grpc_handshaker *handshaker) {}
grpc_handshaker *handshaker,
grpc_error *why) {
GRPC_ERROR_UNREF(why);
}
static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_handshaker *handshaker,

@ -163,7 +163,8 @@ void grpc_run_bad_client_test(
gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)));
if (flags & GRPC_BAD_CLIENT_DISCONNECT) {
grpc_endpoint_shutdown(&exec_ctx, sfd.client);
grpc_endpoint_shutdown(&exec_ctx, sfd.client,
GRPC_ERROR_CREATE("Forced Disconnect"));
grpc_endpoint_destroy(&exec_ctx, sfd.client);
grpc_exec_ctx_finish(&exec_ctx);
sfd.client = NULL;
@ -189,7 +190,8 @@ void grpc_run_bad_client_test(
grpc_slice_buffer_destroy_internal(&exec_ctx, &args.incoming);
}
// Shutdown.
grpc_endpoint_shutdown(&exec_ctx, sfd.client);
grpc_endpoint_shutdown(&exec_ctx, sfd.client,
GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(&exec_ctx, sfd.client);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -81,7 +81,9 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
state.incoming_buffer.length, strlen(magic_connect_string));
if (state.incoming_buffer.length > strlen(magic_connect_string)) {
gpr_atm_rel_store(&state.done_atm, 1);
grpc_endpoint_shutdown(exec_ctx, state.tcp);
grpc_endpoint_shutdown(
exec_ctx, state.tcp,
GRPC_ERROR_CREATE("Incoming buffer longer than magic_connect_string"));
grpc_endpoint_destroy(exec_ctx, state.tcp);
} else {
grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer,

@ -298,7 +298,8 @@ static void run_test(const char *response_payload,
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
/* clean up */
grpc_endpoint_shutdown(&exec_ctx, state.tcp);
grpc_endpoint_shutdown(&exec_ctx, state.tcp,
GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(&exec_ctx, state.tcp);
cleanup_rpc(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);

@ -133,9 +133,12 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
const char* msg = grpc_error_string(error);
gpr_log(GPR_INFO, "%s: %s", prefix, msg);
grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint);
if (conn->server_endpoint != NULL)
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint);
grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
GRPC_ERROR_REF(error));
if (conn->server_endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
GRPC_ERROR_REF(error));
}
proxy_connection_unref(exec_ctx, conn);
}

@ -92,7 +92,7 @@ static void test_code(void) {
grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL);
grpc_endpoint_get_peer(&endpoint);
grpc_endpoint_write(&exec_ctx, &endpoint, NULL, NULL);
grpc_endpoint_shutdown(&exec_ctx, &endpoint);
grpc_endpoint_shutdown(&exec_ctx, &endpoint, GRPC_ERROR_CANCELLED);
grpc_endpoint_destroy(&exec_ctx, &endpoint);
grpc_endpoint_add_to_pollset(&exec_ctx, &endpoint, NULL);
grpc_endpoint_add_to_pollset_set(&exec_ctx, &endpoint, NULL);

@ -233,9 +233,11 @@ static void read_and_write_test(grpc_endpoint_test_config config,
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown(&exec_ctx, state.read_ep);
grpc_endpoint_shutdown(&exec_ctx, state.read_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
gpr_log(GPR_DEBUG, "shutdown write");
grpc_endpoint_shutdown(&exec_ctx, state.write_ep);
grpc_endpoint_shutdown(&exec_ctx, state.write_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
}
grpc_exec_ctx_flush(&exec_ctx);
@ -296,7 +298,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_closure_create(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
wait_for_fail_count(&exec_ctx, &fail_count, 0);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
wait_for_fail_count(&exec_ctx, &fail_count, 1);
grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
grpc_closure_create(inc_on_failure, &fail_count,
@ -307,7 +310,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_closure_create(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
wait_for_fail_count(&exec_ctx, &fail_count, 3);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
wait_for_fail_count(&exec_ctx, &fail_count, 3);
grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer);

@ -89,7 +89,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds,
int i;
for (i = 0; i < num_fds; i++) {
grpc_fd_shutdown(exec_ctx, tfds[i].fd);
grpc_fd_shutdown(exec_ctx, tfds[i].fd,
GRPC_ERROR_CREATE("test_fd_cleanup"));
grpc_exec_ctx_flush(exec_ctx);
grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");

@ -132,7 +132,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(exec_ctx, sv->em_fd);
grpc_fd_shutdown(exec_ctx, sv->em_fd,
GRPC_ERROR_CREATE("session_shutdown_cb"));
}
/* Called when data become readable in a session. */

@ -72,7 +72,8 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_ASSERT(g_connecting != NULL);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_shutdown(exec_ctx, g_connecting);
grpc_endpoint_shutdown(exec_ctx, g_connecting,
GRPC_ERROR_CREATE("must_succeed called"));
grpc_endpoint_destroy(exec_ctx, g_connecting);
g_connecting = NULL;
finish_connection();

@ -121,7 +121,7 @@ static void server_weak_ref_set(server_weak_ref *weak_ref,
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *pollset,
grpc_tcp_server_acceptor *acceptor) {
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);
on_connect_result temp_result;

@ -166,8 +166,10 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
GPR_ASSERT(incoming.count == 1);
GPR_ASSERT(grpc_slice_eq(s, incoming.slices[0]));
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
grpc_endpoint_shutdown(&exec_ctx, f.server_ep);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE("test_leftover end"));
grpc_endpoint_shutdown(&exec_ctx, f.server_ep,
GRPC_ERROR_CREATE("test_leftover end"));
grpc_endpoint_destroy(&exec_ctx, f.client_ep);
grpc_endpoint_destroy(&exec_ctx, f.server_ep);
grpc_exec_ctx_finish(&exec_ctx);

@ -121,7 +121,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
// server will wait for more data. Explicitly fail the server by shutting down
// the endpoint.
if (!state.done_callback_called) {
grpc_endpoint_shutdown(&exec_ctx, mock_endpoint);
grpc_endpoint_shutdown(&exec_ctx, mock_endpoint,
GRPC_ERROR_CREATE("Explicit close"));
grpc_exec_ctx_flush(&exec_ctx);
}

@ -107,7 +107,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp,
grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
struct server_thread_args *args = (struct server_thread_args *)vargs;
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL));
}

@ -78,16 +78,18 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
gpr_mu_lock(&m->mu);
if (m->on_read) {
grpc_closure_sched(exec_ctx, m->on_read,
GRPC_ERROR_CREATE("Endpoint Shutdown"));
grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE_REFERENCING(
"Endpoint Shutdown", &why, 1));
m->on_read = NULL;
}
gpr_mu_unlock(&m->mu);
grpc_resource_user_shutdown(exec_ctx, m->resource_user);
GRPC_ERROR_UNREF(why);
}
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

@ -109,21 +109,25 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
half *m = (half *)ep;
gpr_mu_lock(&m->parent->mu);
m->parent->shutdown = true;
if (m->on_read) {
grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"));
grpc_closure_sched(exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1));
m->on_read = NULL;
}
m = other_half(m);
if (m->on_read) {
grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"));
grpc_closure_sched(exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1));
m->on_read = NULL;
}
gpr_mu_unlock(&m->parent->mu);
grpc_resource_user_shutdown(exec_ctx, m->resource_user);
GRPC_ERROR_UNREF(why);
}
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

@ -80,7 +80,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list *new_tail;
peer = grpc_endpoint_get_peer(tcp);
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);
if (peer) {
last_colon = strrchr(peer, ':');

Loading…
Cancel
Save