[iomgr] Fix Windows socket abandonment due to premature IOCP shutdown (#33099)

In collaboration with @Vignesh2208 . This supersedes
https://github.com/grpc/grpc/pull/32622. The original description is
below.

---- 

The current endpoint semantics are as follows:

On endpoint shutdown, the socket is immediately closed regardless and
any pending registered read/write closures are not immediately executed.
The pending read/write closures get executed with aborted error whenever
the next grpc_iocp_work method runs.

However the grpc_iocp_work may run only during grpc_shutdown and
grpc_shutdown may only get scheduled after the pending registered
read/write closures execute.

This PR changes the shutdown semantics to match shutdown semantics used
in posix - i.e On endpoint shutdown, the socket is immediately closed
and any pending registered read/write closures are executed immediately.
Additional care is taken to ensure that the socket is not immediately
deleted because the pending I/O ops still need to be flushed later
during grpc_shutdown.

---------

Co-authored-by: Vignesh Babu <vigneshbabu@google.com>
pull/33105/head
AJ Heller 2 years ago committed by GitHub
parent eecc8978b6
commit d71858487d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      src/core/ext/transport/chttp2/transport/internal.h
  2. 27
      src/core/lib/iomgr/iocp_windows.cc
  3. 11
      src/core/lib/iomgr/iocp_windows.h
  4. 68
      src/core/lib/iomgr/socket_windows.cc
  5. 11
      src/core/lib/iomgr/socket_windows.h

@ -244,21 +244,7 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
struct grpc_chttp2_transport
// TODO(ctiller): #31319 fixed a crash on Linux & Mac whereby iomgr was
// accessed after shutdown by chttp2. We've not seen similar behavior on
// Windows afaik, but this fix has exposed another refcounting bug whereby
// transports leak on Windows and prevent test shutdown.
// This hack attempts to compromise between two things that are blocking our CI
// from giving us a good quality signal, but are unlikely to be problems for
// most customers. We should continue tracking down what's causing the failure,
// but this gives us some runway to do so - and given that we're actively
// working on removing the problematic code paths, it may be that effort brings
// the result we need.
#ifndef GPR_WINDOWS
: public grpc_core::KeepsGrpcInitialized
#endif
{
struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
grpc_endpoint* ep, bool is_client);
~grpc_chttp2_transport();

@ -43,6 +43,7 @@ static ULONG g_iocp_kick_token;
static OVERLAPPED g_iocp_custom_overlap;
static gpr_atm g_custom_events = 0;
static gpr_atm g_pending_socket_shutdowns = 0;
static HANDLE g_iocp;
@ -90,6 +91,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_core::Timestamp deadline) {
} else {
abort();
}
gpr_mu_lock(&socket->state_mu);
if (socket->shutdown_called) {
info->bytes_transferred = 0;
info->wsa_error = WSA_OPERATION_ABORTED;
@ -100,7 +102,11 @@ grpc_iocp_work_status grpc_iocp_work(grpc_core::Timestamp deadline) {
info->wsa_error = success ? 0 : WSAGetLastError();
}
GPR_ASSERT(overlapped == &info->overlapped);
grpc_socket_become_ready(socket, info);
bool should_destroy = grpc_socket_become_ready(socket, info);
gpr_mu_unlock(&socket->state_mu);
if (should_destroy) {
grpc_winsocket_finish(socket);
}
return GRPC_IOCP_WORK_WORK;
}
@ -122,11 +128,13 @@ void grpc_iocp_kick(void) {
void grpc_iocp_flush(void) {
grpc_core::ExecCtx exec_ctx;
grpc_iocp_work_status work_status;
// This method is called during grpc_shutdown. We make the loop
// spin until any pending socket shutdowns are complete.
do {
work_status = grpc_iocp_work(grpc_core::Timestamp::InfPast());
} while (work_status == GRPC_IOCP_WORK_KICK ||
grpc_core::ExecCtx::Get()->Flush());
grpc_core::ExecCtx::Get()->Flush() ||
gpr_atm_acq_load(&g_pending_socket_shutdowns) != 0);
}
void grpc_iocp_shutdown(void) {
@ -155,4 +163,17 @@ void grpc_iocp_add_socket(grpc_winsocket* socket) {
GPR_ASSERT(ret == g_iocp);
}
void grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket* socket) {
if (!socket->shutdown_registered) {
socket->shutdown_registered = true;
gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, 1);
}
}
void grpc_iocp_finish_socket_shutdown(grpc_winsocket* socket) {
if (socket->shutdown_registered) {
gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, -1);
}
}
#endif // GRPC_WINSOCK_SOCKET

@ -43,6 +43,17 @@ void grpc_iocp_flush(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket*);
// Register that this socket has started shutting down.
// This prevents gRPC from completing its own shutdown until this socket's
// shutdown is finished. IOCP must continue doing work until all such sockets
// have finished shutting down. The socket's state_mu must be locked.
void grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket* socket);
// Mark that this socket has finished shutting down.
// The socket's state lock does not need to be held since this function is only
// called once the socket is ready to be destroyed.
void grpc_iocp_finish_socket_shutdown(grpc_winsocket* socket);
#endif
#endif // GRPC_SRC_CORE_LIB_IOMGR_IOCP_WINDOWS_H

@ -76,6 +76,51 @@ void grpc_winsocket_shutdown(grpc_winsocket* winsocket) {
return;
}
winsocket->shutdown_called = true;
bool register_shutdown = false;
// If there is already a scheduled read closure, run it immediately. This
// follows the same semantics applied to posix endpoint which also runs any
// already registered closure immediately in the event of a shutdown.
if (winsocket->read_info.closure && !winsocket->read_info.has_pending_iocp) {
winsocket->read_info.bytes_transferred = 0;
winsocket->read_info.wsa_error = WSA_OPERATION_ABORTED;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, winsocket->read_info.closure,
absl::OkStatus());
// Note that while the read_info.closure closure is run, it is not set to
// NULL here. This ensures that the socket cannot get deleted yet until any
// pending I/O operations are flushed by the thread executing
// grpc_iocp_work. We set read_info.closure_already_executed_at_shutdown to
// true so that when the pending read I/O operations are flushed, the
// associated closure is not executed in the grpc_socket_became_ready
// function.
winsocket->read_info.closure_already_executed_at_shutdown = true;
register_shutdown = true;
}
// If there is already a scheduled write closure, run it immediately. This
// follows the same semantics applied to posix endpoint which also runs any
// already registered closure immediately in the event of a shutdown.
if (winsocket->write_info.closure &&
!winsocket->write_info.has_pending_iocp) {
winsocket->write_info.bytes_transferred = 0;
winsocket->write_info.wsa_error = WSA_OPERATION_ABORTED;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, winsocket->write_info.closure,
absl::OkStatus());
// Note that while the write_info.closure closure is run, it is not set to
// NULL here. This ensures that the socket cannot get deleted yet until any
// pending I/O operations are flushed by the thread executing
// grpc_iocp_work. We set
// write_info.closure.closure_already_executed_at_shutdown to true so that
// when the pending write I/O operations are flushed, the associated closure
// is not executed in the grpc_socket_became_ready function.
winsocket->write_info.closure_already_executed_at_shutdown = true;
register_shutdown = true;
}
if (register_shutdown) {
// Instruct gRPC to avoid completing any shutdowns until this socket is
// cleaned up.
grpc_iocp_register_socket_shutdown_socket_locked(winsocket);
}
gpr_mu_unlock(&winsocket->state_mu);
status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
@ -90,6 +135,8 @@ void grpc_winsocket_shutdown(grpc_winsocket* winsocket) {
utf8_message);
gpr_free(utf8_message);
}
// Calling closesocket triggers invocation of any pending I/O operations with
// ABORTED status.
closesocket(winsocket->socket);
}
@ -105,13 +152,20 @@ static bool check_destroyable(grpc_winsocket* winsocket) {
winsocket->read_info.closure == NULL;
}
void grpc_winsocket_finish(grpc_winsocket* winsocket) {
grpc_iocp_finish_socket_shutdown(winsocket);
destroy(winsocket);
}
void grpc_winsocket_destroy(grpc_winsocket* winsocket) {
gpr_mu_lock(&winsocket->state_mu);
GPR_ASSERT(!winsocket->destroy_called);
winsocket->destroy_called = true;
bool should_destroy = check_destroyable(winsocket);
gpr_mu_unlock(&winsocket->state_mu);
if (should_destroy) destroy(winsocket);
if (should_destroy) {
grpc_winsocket_finish(winsocket);
}
}
// Calling notify_on_read or write means either of two things:
@ -140,19 +194,19 @@ void grpc_socket_notify_on_read(grpc_winsocket* socket, grpc_closure* closure) {
socket_notify_on_iocp(socket, closure, &socket->read_info);
}
void grpc_socket_become_ready(grpc_winsocket* socket,
bool grpc_socket_become_ready(grpc_winsocket* socket,
grpc_winsocket_callback_info* info) {
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, info->closure, absl::OkStatus());
// Only run the closure once at shutdown.
if (!info->closure_already_executed_at_shutdown) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, info->closure, absl::OkStatus());
}
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
}
bool should_destroy = check_destroyable(socket);
gpr_mu_unlock(&socket->state_mu);
if (should_destroy) destroy(socket);
return check_destroyable(socket);
}
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;

@ -61,6 +61,9 @@ typedef struct grpc_winsocket_callback_info {
// The results of the overlapped operation.
DWORD bytes_transferred;
int wsa_error;
// Tracks whether the final closure has already been run when the socket is
// shut down. This allows closures to be run immediately upon socket shutdown.
bool closure_already_executed_at_shutdown = false;
} grpc_winsocket_callback_info;
// This is a wrapper to a Windows socket. A socket can have one outstanding
@ -81,6 +84,7 @@ typedef struct grpc_winsocket {
gpr_mu state_mu;
bool shutdown_called;
bool shutdown_registered;
// You can't add the same socket twice to the same IO Completion Port.
// This prevents that.
@ -109,8 +113,8 @@ void grpc_socket_notify_on_write(grpc_winsocket* winsocket,
void grpc_socket_notify_on_read(grpc_winsocket* winsocket,
grpc_closure* closure);
void grpc_socket_become_ready(grpc_winsocket* winsocket,
grpc_winsocket_callback_info* ci);
bool grpc_socket_become_ready(grpc_winsocket* socket,
grpc_winsocket_callback_info* info);
// Returns true if this system can create AF_INET6 sockets bound to ::1.
// The value is probed once, and cached for the life of the process.
@ -120,6 +124,9 @@ void grpc_wsa_socket_flags_init();
DWORD grpc_get_default_wsa_socket_flags();
// Final cleanup operations on the socket prior to deletion.
void grpc_winsocket_finish(grpc_winsocket*);
#endif
#endif // GRPC_SRC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H

Loading…
Cancel
Save