Fix races on tcp server shutdown on Windows

- we were treating an int as atomic, which is dubious at best
- it was possible to shutdown while an accept was being handled, and
process that shutdown accept before the real accept finished, leading to
a use-after-free up the stack
pull/8903/head
Craig Tiller 8 years ago
parent da81d1a43d
commit cbafdd1084
  1. 71
      src/core/lib/iomgr/tcp_server_windows.c

@ -73,6 +73,7 @@ struct grpc_tcp_listener {
/* The cached AcceptEx for that port. */ /* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down; int shutting_down;
int outstanding_calls;
/* closure for socket notification of accept being ready */ /* closure for socket notification of accept being ready */
grpc_closure on_accept; grpc_closure on_accept;
/* linked list */ /* linked list */
@ -140,10 +141,8 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
if (s->shutdown_complete != NULL) { grpc_tcp_server *s = arg;
grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
}
/* Now that the accepts have been aborted, we can destroy the sockets. /* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already The IOCP won't get notified on these, so we can flag them as already
@ -159,6 +158,14 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s); gpr_free(s);
} }
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
}
grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s), GRPC_ERROR_NONE, NULL);
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref_non_zero(&s->refs); gpr_ref_non_zero(&s->refs);
return s; return s;
@ -180,17 +187,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* First, shutdown all fd's. This will queue abortion calls for all /* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */ of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) { if (s->active_ports == 0) {
immediately_done = 1; finish_shutdown_locked(exec_ctx, s);
} } else {
for (sp = s->head; sp; sp = sp->next) { for (sp = s->head; sp; sp = sp->next) {
sp->shutting_down = 1; sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket); grpc_winsocket_shutdown(sp->socket);
} }
gpr_mu_unlock(&s->mu);
if (immediately_done) {
finish_shutdown(exec_ctx, s);
} }
gpr_mu_unlock(&s->mu);
} }
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
@ -251,24 +255,19 @@ failure:
return error; return error;
} }
static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_listener *sp) { grpc_tcp_listener *sp) {
int notify = 0; int notify = 0;
sp->shutting_down = 0; sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0); GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) { if (0 == --sp->server->active_ports) {
notify = 1; finish_shutdown_locked(exec_ctx, sp->server);
}
gpr_mu_unlock(&sp->server->mu);
if (notify) {
finish_shutdown(exec_ctx, sp->server);
} }
} }
/* In order to do an async accept, we need to create a socket first which /* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */ will be the one assigned to the new incoming connection. */
static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_listener *port) { grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET; SOCKET sock = INVALID_SOCKET;
BOOL success; BOOL success;
@ -276,6 +275,10 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
DWORD bytes_received = 0; DWORD bytes_received = 0;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
if (port->shutting_down) {
return GRPC_ERROR_NONE;
}
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED); WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {
@ -305,20 +308,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
immediately process an accept that happened in the meantime. */ immediately process an accept that happened in the meantime. */
port->new_socket = sock; port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept); grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
port->outstanding_calls++;
return error; return error;
failure: failure:
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
if (port->shutting_down) {
/* We are abandoning the listener port, take that into account to prevent
occasional hangs on shutdown. The hang happens when sp->shutting_down
change is not seen by on_accept and we proceed to trying new accept,
but we fail there because the listening port has been closed in the
meantime. */
decrement_active_ports_and_notify(exec_ctx, port);
GRPC_ERROR_UNREF(error);
return GRPC_ERROR_NONE;
}
if (sock != INVALID_SOCKET) closesocket(sock); if (sock != INVALID_SOCKET) closesocket(sock);
return error; return error;
} }
@ -338,6 +332,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
BOOL wsa_success; BOOL wsa_success;
int err; int err;
gpr_mu_lock(&sp->server->mu);
peer_name.len = sizeof(struct sockaddr_storage); peer_name.len = sizeof(struct sockaddr_storage);
/* The general mechanism for shutting down is to queue abortion calls. While /* The general mechanism for shutting down is to queue abortion calls. While
@ -347,6 +343,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
const char *msg = grpc_error_string(error); const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg); gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
grpc_error_free_string(msg); grpc_error_free_string(msg);
gpr_mu_unlock(&sp->server->mu);
return; return;
} }
@ -356,17 +353,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags); &transfered_bytes, FALSE, &flags);
if (!wsa_success) { if (!wsa_success) {
if (sp->shutting_down) { if (!sp->shutting_down) {
/* During the shutdown case, we ARE expecting an error. So that's well,
and we can wake up the shutdown thread. */
decrement_active_ports_and_notify(exec_ctx, sp);
return;
} else {
char *utf8_message = gpr_format_message(WSAGetLastError()); char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
closesocket(sock);
} }
closesocket(sock);
} else { } else {
if (!sp->shutting_down) { if (!sp->shutting_down) {
peer_name_string = NULL; peer_name_string = NULL;
@ -408,7 +400,11 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
the former socked we created has now either been destroy or assigned the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next to the new connection. We need to create a new one for the next
connection. */ connection. */
GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp))); GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
if (0 == --sp->outstanding_calls) {
decrement_active_ports_and_notify_locked(exec_ctx, sp);
}
gpr_mu_unlock(&sp->server->mu);
} }
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@ -456,6 +452,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener"); sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0; sp->shutting_down = 0;
sp->outstanding_calls = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET; sp->new_socket = INVALID_SOCKET;
sp->port = port; sp->port = port;
@ -553,7 +550,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb; s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg; s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) { for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp))); GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
s->active_ports++; s->active_ports++;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);

Loading…
Cancel
Save