From e503cd53984eab4b11da1193bc4f35df8dff008e Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Tue, 14 Jul 2015 02:27:23 +0200 Subject: [PATCH] Better socket kick for Windows. Now calling tcp_shutdown will in fact close the socket, which cascades into properly cleaning out all the pending requests. The tcp_server_windows's shutdown logic had to be rewritted (simplified) in order to take this into account. --- src/core/iomgr/socket_windows.c | 30 +++++++----- src/core/iomgr/tcp_server_windows.c | 76 ++++++++++++----------------- 2 files changed, 49 insertions(+), 57 deletions(-) diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 897408ded29..f6ddfff0ad9 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -37,6 +37,7 @@ #include #include +#include #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr_internal.h" @@ -61,22 +62,27 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) { operations to abort them. We need to do that this way because of the various callsites of that function, which happens to be in various mutex hold states, and that'd be unsafe to call them directly. */ -int grpc_winsocket_shutdown(grpc_winsocket *socket) { +int grpc_winsocket_shutdown(grpc_winsocket *winsocket) { int callbacks_set = 0; - gpr_mu_lock(&socket->state_mu); - if (socket->read_info.cb) { + SOCKET socket; + gpr_mu_lock(&winsocket->state_mu); + socket = winsocket->socket; + if (winsocket->read_info.cb) { callbacks_set++; - grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb, - socket->read_info.opaque); - grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); + grpc_iomgr_closure_init(&winsocket->shutdown_closure, + winsocket->read_info.cb, + winsocket->read_info.opaque); + grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); } - if (socket->write_info.cb) { + if (winsocket->write_info.cb) { callbacks_set++; - grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb, - socket->write_info.opaque); - grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); + grpc_iomgr_closure_init(&winsocket->shutdown_closure, + winsocket->write_info.cb, + winsocket->write_info.opaque); + grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); } - gpr_mu_unlock(&socket->state_mu); + gpr_mu_unlock(&winsocket->state_mu); + closesocket(socket); return callbacks_set; } @@ -87,14 +93,12 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) { an "idle" socket which is neither trying to read or write, we'd start leaking both memory and sockets. */ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { - SOCKET socket = winsocket->socket; grpc_iomgr_unregister_object(&winsocket->iomgr_object); if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { grpc_iocp_socket_orphan(winsocket); } else { grpc_winsocket_destroy(winsocket); } - closesocket(socket); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index d70968de885..dfd56f9c402 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -108,9 +108,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, size_t i; gpr_mu_lock(&s->mu); /* First, shutdown all fd's. This will queue abortion calls for all - of the pending accepts. */ + of the pending accepts due to the normal operation mechanism. */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; + sp->shutting_down = 1; grpc_winsocket_shutdown(sp->socket); } /* This happens asynchronously. Wait while that happens. */ @@ -243,62 +244,49 @@ static void on_accept(void *arg, int from_iocp) { grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; - /* The shutdown sequence is done in two parts. This is the second - part here, acknowledging the IOCP notification, and doing nothing - else, especially not queuing a new accept. */ - if (sp->shutting_down) { - GPR_ASSERT(from_iocp); - sp->shutting_down = 0; - sp->socket->read_info.outstanding = 0; - gpr_mu_lock(&sp->server->mu); - if (0 == --sp->server->active_ports) { - gpr_cv_broadcast(&sp->server->cv); - } - gpr_mu_unlock(&sp->server->mu); - return; - } - - if (from_iocp) { - /* The IOCP notified us of a completed operation. Let's grab the results, - and act accordingly. */ - DWORD transfered_bytes = 0; - DWORD flags; - BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, - &transfered_bytes, FALSE, &flags); - if (!wsa_success) { + /* The general mechanism for shutting down is to queue abortion calls. While + this is necessary in the read/write case, it's useless for the accept + case. Let's do nothing. */ + if (!from_iocp) return; + + /* The IOCP notified us of a completed operation. Let's grab the results, + and act accordingly. */ + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + &transfered_bytes, FALSE, &flags); + if (!wsa_success) { + if (sp->shutting_down) { + /* During the shutdown case, we ARE expecting an error. So that's swell, + and we can wake up the shutdown thread. */ + sp->shutting_down = 0; + sp->socket->read_info.outstanding = 0; + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); + return; + } else { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); gpr_free(utf8_message); closesocket(sock); - } else { - /* TODO(ctiller): add sockaddr address to label */ - ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); } } else { - /* If we're not notified from the IOCP, it means we are asked to shutdown. - This will initiate that shutdown. Calling closesocket will trigger an - IOCP notification, that will call this function a second time, from - the IOCP thread. Of course, this only works if the socket was, in fact, - listening. If that's not the case, we'd wait indefinitely. That's a bit - of a degenerate case, but it can happen if you create a server, but - don't start it. So let's support that by recursing once. */ - sp->shutting_down = 1; - sp->new_socket = INVALID_SOCKET; - if (sock != INVALID_SOCKET) { - closesocket(sock); - } else { - on_accept(sp, 1); + if (!sp->shutting_down) { + /* TODO(ctiller): add sockaddr address to label */ + ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); } - return; } /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ if (ep) sp->server->cb(sp->server->cb_arg, ep); /* As we were notified from the IOCP of one and exactly one accept, - the former socked we created has now either been destroy or assigned - to the new connection. We need to create a new one for the next - connection. */ + the former socked we created has now either been destroy or assigned + to the new connection. We need to create a new one for the next + connection. */ start_accept(sp); }