Merge pull request #2416 from nicolasnoble/attempt-number-2-at-windows-kick

Better socket kick for Windows.
pull/2421/head
Jan Tattermusch 10 years ago
commit 478f55fe71
  1. 30
      src/core/iomgr/socket_windows.c
  2. 50
      src/core/iomgr/tcp_server_windows.c

@ -37,6 +37,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
@ -61,22 +62,27 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
operations to abort them. We need to do that this way because of the operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */ mutex hold states, and that'd be unsafe to call them directly. */
int grpc_winsocket_shutdown(grpc_winsocket *socket) { int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
int callbacks_set = 0; int callbacks_set = 0;
gpr_mu_lock(&socket->state_mu); SOCKET socket;
if (socket->read_info.cb) { gpr_mu_lock(&winsocket->state_mu);
socket = winsocket->socket;
if (winsocket->read_info.cb) {
callbacks_set++; callbacks_set++;
grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb, grpc_iomgr_closure_init(&winsocket->shutdown_closure,
socket->read_info.opaque); winsocket->read_info.cb,
grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); winsocket->read_info.opaque);
grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
} }
if (socket->write_info.cb) { if (winsocket->write_info.cb) {
callbacks_set++; callbacks_set++;
grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb, grpc_iomgr_closure_init(&winsocket->shutdown_closure,
socket->write_info.opaque); winsocket->write_info.cb,
grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); winsocket->write_info.opaque);
grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
} }
gpr_mu_unlock(&socket->state_mu); gpr_mu_unlock(&winsocket->state_mu);
closesocket(socket);
return callbacks_set; return callbacks_set;
} }
@ -87,14 +93,12 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
an "idle" socket which is neither trying to read or write, we'd start leaking an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */ both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) { void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
grpc_iomgr_unregister_object(&winsocket->iomgr_object); grpc_iomgr_unregister_object(&winsocket->iomgr_object);
if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket); grpc_iocp_socket_orphan(winsocket);
} else { } else {
grpc_winsocket_destroy(winsocket); grpc_winsocket_destroy(winsocket);
} }
closesocket(socket);
} }
void grpc_winsocket_destroy(grpc_winsocket *winsocket) { void grpc_winsocket_destroy(grpc_winsocket *winsocket) {

@ -108,9 +108,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
size_t i; size_t i;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
/* First, shutdown all fd's. This will queue abortion calls for all /* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */ of the pending accepts due to the normal operation mechanism. */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i]; server_port *sp = &s->ports[i];
sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket); grpc_winsocket_shutdown(sp->socket);
} }
/* This happens asynchronously. Wait while that happens. */ /* This happens asynchronously. Wait while that happens. */
@ -243,11 +244,21 @@ static void on_accept(void *arg, int from_iocp) {
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
/* The shutdown sequence is done in two parts. This is the second /* The general mechanism for shutting down is to queue abortion calls. While
part here, acknowledging the IOCP notification, and doing nothing this is necessary in the read/write case, it's useless for the accept
else, especially not queuing a new accept. */ case. Let's do nothing. */
if (!from_iocp) return;
/* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
if (sp->shutting_down) { if (sp->shutting_down) {
GPR_ASSERT(from_iocp); /* During the shutdown case, we ARE expecting an error. So that's swell,
and we can wake up the shutdown thread. */
sp->shutting_down = 0; sp->shutting_down = 0;
sp->socket->read_info.outstanding = 0; sp->socket->read_info.outstanding = 0;
gpr_mu_lock(&sp->server->mu); gpr_mu_lock(&sp->server->mu);
@ -256,40 +267,17 @@ static void on_accept(void *arg, int from_iocp) {
} }
gpr_mu_unlock(&sp->server->mu); gpr_mu_unlock(&sp->server->mu);
return; return;
} } else {
if (from_iocp) {
/* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
char *utf8_message = gpr_format_message(WSAGetLastError()); char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
closesocket(sock); closesocket(sock);
}
} else { } else {
if (!sp->shutting_down) {
/* TODO(ctiller): add sockaddr address to label */ /* TODO(ctiller): add sockaddr address to label */
ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
} }
} else {
/* If we're not notified from the IOCP, it means we are asked to shutdown.
This will initiate that shutdown. Calling closesocket will trigger an
IOCP notification, that will call this function a second time, from
the IOCP thread. Of course, this only works if the socket was, in fact,
listening. If that's not the case, we'd wait indefinitely. That's a bit
of a degenerate case, but it can happen if you create a server, but
don't start it. So let's support that by recursing once. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
if (sock != INVALID_SOCKET) {
closesocket(sock);
} else {
on_accept(sp, 1);
}
return;
} }
/* The only time we should call our callback, is where we successfully /* The only time we should call our callback, is where we successfully

Loading…
Cancel
Save