diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index e7fc744ceb8..1cdf3da0d62 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -177,6 +177,10 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) { socket->orphan = 1; } +/* Calling notify_on_read or write means either of two things: + -) The IOCP already completed in the background, and we need to call + the callback now. + -) The IOCP hasn't completed yet, and we're queuing it for later. */ static void socket_notify_on_iocp(grpc_winsocket *socket, void(*cb)(void *, int), void *opaque, grpc_winsocket_callback_info *info) { diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c index f130ab9a078..74cd5a829b2 100644 --- a/src/core/iomgr/iomgr_windows.c +++ b/src/core/iomgr/iomgr_windows.c @@ -43,6 +43,10 @@ #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr.h" +/* Windows' io manager is going to be fully designed using IO completion + ports. All of what we're doing here is basically make sure that + Windows sockets are initialized in and out. */ + static void winsock_init(void) { WSADATA wsaData; int status = WSAStartup(MAKEWORD(2, 0), &wsaData); diff --git a/src/core/iomgr/pollset_kick_windows.h b/src/core/iomgr/pollset_kick_windows.h index 3836aa00820..c675c119aba 100644 --- a/src/core/iomgr/pollset_kick_windows.h +++ b/src/core/iomgr/pollset_kick_windows.h @@ -36,6 +36,9 @@ #include +/* There isn't really any such thing as a pollset under Windows, due to the + nature of the IO completion ports. */ + struct grpc_kick_fd_info; typedef struct grpc_pollset_kick_state { diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index bea67116116..5af0685f9d9 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -41,6 +41,11 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_windows.h" +/* There isn't really any such thing as a pollset under Windows, due to the + nature of the IO completion ports. We're still going to provide a minimal + set of features for the sake of the rest of grpc. But grpc_pollset_work + won't actually do any polling, and return as quickly as possible. */ + void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); gpr_cv_init(&pollset->cv); diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 266175abfb9..e1115bac4ff 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -40,10 +40,10 @@ #include "src/core/iomgr/pollset_kick.h" #include "src/core/iomgr/socket_windows.h" -/* forward declare only in this file to avoid leaking impl details via - pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not - use the struct tag */ -struct grpc_fd; +/* There isn't really any such thing as a pollset under Windows, due to the + nature of the IO completion ports. A Windows "pollset" is merely a mutex + and a condition variable, as this is the minimal set of features we need + implemented for the rest of grpc. But we won't use them directly. */ typedef struct grpc_pollset { gpr_mu mu; diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index fe0196d99cd..9306310d435 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -41,9 +41,9 @@ #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_windows.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" +#include "src/core/iomgr/socket_windows.h" grpc_winsocket *grpc_winsocket_create(SOCKET socket) { grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket)); @@ -55,16 +55,29 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) { return r; } -static void shutdown_op(grpc_winsocket_callback_info *info) { - if (!info->cb) return; - grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0); -} - +/* Schedule a shutdown of the socket operations. Will call the pending + operations to abort them. We need to do that this way because of the + various callsites of that function, which happens to be in various + mutex hold states, and that'd be unsafe to call them directly. */ void grpc_winsocket_shutdown(grpc_winsocket *socket) { - shutdown_op(&socket->read_info); - shutdown_op(&socket->write_info); + gpr_mu_lock(&socket->state_mu); + if (socket->read_info.cb) { + grpc_iomgr_add_delayed_callback(socket->read_info.cb, + socket->read_info.opaque, 0); + } + if (socket->write_info.cb) { + grpc_iomgr_add_delayed_callback(socket->write_info.cb, + socket->write_info.opaque, 0); + } + gpr_mu_unlock(&socket->state_mu); } +/* Abandons a socket. Either we're going to queue it up for garbage collecting + from the IO Completion Port thread, or destroy it immediately. Note that this + mechanisms assumes that we're either always waiting for an operation, or we + explicitely know that we don't. If there is a future case where we can have + an "idle" socket which is neither trying to read or write, we'd start leaking + both memory and sockets. */ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { SOCKET socket = winsocket->socket; if (!winsocket->closed_early) { diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index ee090668ea7..6e778a776a3 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -39,21 +39,43 @@ #include #include +/* This holds the data for an outstanding read or write on a socket. + The mutex to protect the concurrent access to that data is the one + inside the winsocket wrapper. */ typedef struct grpc_winsocket_callback_info { /* This is supposed to be a WSAOVERLAPPED, but in order to get that - * definition, we need to include ws2tcpip.h, which needs to be included - * from the top, otherwise it'll clash with a previous inclusion of - * windows.h that in turns includes winsock.h. If anyone knows a way - * to do it properly, feel free to send a patch. - */ + definition, we need to include ws2tcpip.h, which needs to be included + from the top, otherwise it'll clash with a previous inclusion of + windows.h that in turns includes winsock.h. If anyone knows a way + to do it properly, feel free to send a patch. */ OVERLAPPED overlapped; + /* The callback information for the pending operation. May be empty if the + caller hasn't registered a callback yet. */ void(*cb)(void *opaque, int success); void *opaque; + /* A boolean to describe if the IO Completion Port got a notification for + that operation. This will happen if the operation completed before the + called had time to register a callback. We could avoid that behavior + altogether by forcing the caller to always register its callback before + proceeding queue an operation, but it is frequent for an IO Completion + Port to trigger quickly. This way we avoid a context switch for calling + the callback. We also simplify the read / write operations to avoid having + to hold a mutex for a long amount of time. */ int has_pending_iocp; + /* The results of the overlapped operation. */ DWORD bytes_transfered; int wsa_error; } grpc_winsocket_callback_info; +/* This is a wrapper to a Windows socket. A socket can have one outstanding + read, and one outstanding write. Doing an asynchronous accept means waiting + for a read operation. Doing an asynchronous connect means waiting for a + write operation. These are completely abitrary ties between the operation + and the kind of event, because we can have one overlapped per pending + operation, whichever its nature is. So we could have more dedicated pending + operation callbacks for connect and listen. But given the scope of listen + and accept, we don't need to go to that extent and waste memory. Also, this + is closer to what happens in posix world. */ typedef struct grpc_winsocket { SOCKET socket; @@ -62,17 +84,35 @@ typedef struct grpc_winsocket { gpr_mu state_mu; + /* You can't add the same socket twice to the same IO Completion Port. + This prevents that. */ int added_to_iocp; + /* A boolean to indicate that the caller has abandonned that socket, but + there is a pending operation that the IO Completion Port will have to + wait for. The socket will be collected at that time. */ int orphan; + /* A boolean to indicate that the socket was already closed somehow, and + that no operation is going to be pending. Trying to abandon a socket in + that state won't result in an orphan, but will instead be destroyed + without further delay. We could avoid that boolean by adding one into + grpc_winsocket_callback_info describing that the operation is pending, + but that 1) waste memory more and 2) obfuscate the intent a bit more. */ int closed_early; } grpc_winsocket; -/* Create a wrapped windows handle. -This takes ownership of closing it. */ +/* Create a wrapped windows handle. This takes ownership of it, meaning that + it will be responsible for closing it. */ grpc_winsocket *grpc_winsocket_create(SOCKET socket); +/* Initiate an asynchronous shutdown of the socket. Will call off any pending + operation to cancel them. */ void grpc_winsocket_shutdown(grpc_winsocket *socket); + +/* Abandon a socket. */ void grpc_winsocket_orphan(grpc_winsocket *socket); + +/* Destroy a socket. Should only be called by the IO Completion Port thread, + or by grpc_winsocket_orphan if there's no pending operation. */ void grpc_winsocket_destroy(grpc_winsocket *socket); #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index c4d3293e83a..c6137e1e1d6 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -109,7 +109,8 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts. */ for (i = 0; i < s->nports; i++) { - grpc_winsocket_shutdown(s->ports[i].socket); + server_port *sp = &s->ports[i]; + grpc_winsocket_shutdown(sp->socket); } /* This happens asynchronously. Wait while that happens. */ while (s->active_ports) { @@ -275,22 +276,28 @@ static void on_accept(void *arg, int from_iocp) { /* If we're not notified from the IOCP, it means we are asked to shutdown. This will initiate that shutdown. Calling closesocket will trigger an IOCP notification, that will call this function a second time, from - the IOCP thread. */ + the IOCP thread. Of course, this only works if the socket was, in fact, + listening. If that's not the case, we'd wait indefinitely. That's a bit + of a degenerate case, but it can happen if you create a server, but + don't start it. So let's support that by recursing once. */ sp->shutting_down = 1; sp->new_socket = INVALID_SOCKET; - closesocket(sock); + if (sock != INVALID_SOCKET) { + closesocket(sock); + } else { + on_accept(sp, 1); + } + return; } /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ if (ep) sp->server->cb(sp->server->cb_arg, ep); - if (from_iocp) { - /* As we were notified from the IOCP of one and exactly one accept, - the former socked we created has now either been destroy or assigned - to the new connection. We need to create a new one for the next - connection. */ - start_accept(sp); - } + /* As we were notified from the IOCP of one and exactly one accept, + the former socked we created has now either been destroy or assigned + to the new connection. We need to create a new one for the next + connection. */ + start_accept(sp); } static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, @@ -332,6 +339,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->socket = grpc_winsocket_create(sock); sp->shutting_down = 0; sp->AcceptEx = AcceptEx; + sp->new_socket = INVALID_SOCKET; GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index d5b06e7b0b2..c8483bd891c 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -76,8 +76,11 @@ int grpc_tcp_prepare_socket(SOCKET sock) { } typedef struct grpc_tcp { + /* This is our C++ class derivation emulation. */ grpc_endpoint base; + /* The one socket this endpoint is using. */ grpc_winsocket *socket; + /* Refcounting how many operations are in progress. */ gpr_refcount refcount; grpc_endpoint_read_cb read_cb; @@ -90,6 +93,10 @@ typedef struct grpc_tcp { gpr_slice_buffer write_slices; int outstanding_write; + /* The IO Completion Port runs from another thread. We need some mechanism + to protect ourselves when requesting a shutdown. */ + gpr_mu mu; + int shutting_down; } grpc_tcp; static void tcp_ref(grpc_tcp *tcp) { @@ -100,11 +107,13 @@ static void tcp_unref(grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { gpr_slice_buffer_destroy(&tcp->write_slices); grpc_winsocket_orphan(tcp->socket); + gpr_mu_destroy(&tcp->mu); gpr_free(tcp); } } -static void on_read(void *tcpp, int success) { +/* Asynchronous callback from the IOCP, or the background thread. */ +static void on_read(void *tcpp, int from_iocp) { grpc_tcp *tcp = (grpc_tcp *) tcpp; grpc_winsocket *socket = tcp->socket; gpr_slice sub; @@ -114,16 +123,25 @@ static void on_read(void *tcpp, int success) { grpc_endpoint_read_cb cb = tcp->read_cb; grpc_winsocket_callback_info *info = &socket->read_info; void *opaque = tcp->read_user_data; + int do_abort = 0; + + gpr_mu_lock(&tcp->mu); + if (!from_iocp || tcp->shutting_down) { + /* If we are here with from_iocp set to true, it means we got raced to + shutting down the endpoint. No actual abort callback will happen + though, so we're going to do it from here. */ + do_abort = 1; + } + gpr_mu_unlock(&tcp->mu); - GPR_ASSERT(tcp->outstanding_read); - - if (!success) { + if (do_abort) { + if (from_iocp) gpr_slice_unref(tcp->read_slice); tcp_unref(tcp); cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); return; } - tcp->outstanding_read = 0; + GPR_ASSERT(tcp->outstanding_read); if (socket->read_info.wsa_error != 0) { char *utf8_message = gpr_format_message(info->wsa_error); @@ -142,6 +160,9 @@ static void on_read(void *tcpp, int success) { status = GRPC_ENDPOINT_CB_EOF; } } + + tcp->outstanding_read = 0; + tcp_unref(tcp); cb(opaque, slice, nslices, status); } @@ -158,6 +179,7 @@ static void win_notify_on_read(grpc_endpoint *ep, WSABUF buffer; GPR_ASSERT(!tcp->outstanding_read); + GPR_ASSERT(!tcp->shutting_down); tcp_ref(tcp); tcp->outstanding_read = 1; tcp->read_cb = cb; @@ -168,10 +190,12 @@ static void win_notify_on_read(grpc_endpoint *ep, buffer.len = GPR_SLICE_LENGTH(tcp->read_slice); buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); + /* First let's try a synchronous, non-blocking read. */ status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); + /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; /* This might heavily recurse. */ @@ -179,6 +203,7 @@ static void win_notify_on_read(grpc_endpoint *ep, return; } + /* Otherwise, let's retry, by queuing a read. */ memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, &info->overlapped, NULL); @@ -192,30 +217,53 @@ static void win_notify_on_read(grpc_endpoint *ep, if (error != WSA_IO_PENDING) { char *utf8_message = gpr_format_message(WSAGetLastError()); - __debugbreak(); - gpr_log(GPR_ERROR, "WSARecv error: %s", utf8_message); + gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.", + utf8_message); gpr_free(utf8_message); - /* would the IO completion port be called anyway... ? Let's assume not. */ + /* I'm pretty sure this is a very bad situation there. Hence the log. + What will happen now is that the socket will neither wait for read + or write, unless the caller retry, which is unlikely, but I am not + sure if that's guaranteed. And there might also be a write pending. + This means that the future orphanage of that socket will be in limbo, + and we're going to leak it. I have no idea what could cause this + specific case however, aside from a parameter error from our call. + Normal read errors would actually happen during the overlapped + operation, which is the supported way to go for that. */ tcp->outstanding_read = 0; tcp_unref(tcp); cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR); + /* Per the comment above, I'm going to treat that case as a hard failure + for now, and leave the option to catch that and debug. */ + __debugbreak(); return; } grpc_socket_notify_on_read(tcp->socket, on_read, tcp); } -static void on_write(void *tcpp, int success) { +/* Asynchronous callback from the IOCP, or the background thread. */ +static void on_write(void *tcpp, int from_iocp) { grpc_tcp *tcp = (grpc_tcp *) tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; grpc_endpoint_write_cb cb = tcp->write_cb; void *opaque = tcp->write_user_data; + int do_abort = 0; + + gpr_mu_lock(&tcp->mu); + if (!from_iocp || tcp->shutting_down) { + /* If we are here with from_iocp set to true, it means we got raced to + shutting down the endpoint. No actual abort callback will happen + though, so we're going to do it from here. */ + do_abort = 1; + } + gpr_mu_unlock(&tcp->mu); GPR_ASSERT(tcp->outstanding_write); - if (!success) { + if (do_abort) { + if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices); tcp_unref(tcp); cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); return; @@ -238,6 +286,7 @@ static void on_write(void *tcpp, int success) { cb(opaque, status); } +/* Initiates a write. */ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, @@ -253,11 +302,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, WSABUF *buffers = local_buffers; GPR_ASSERT(!tcp->outstanding_write); + GPR_ASSERT(!tcp->shutting_down); tcp_ref(tcp); tcp->outstanding_write = 1; tcp->write_cb = cb; tcp->write_user_data = arg; + gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { @@ -270,10 +321,14 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); } + /* First, let's try a synchronous, non-blocking write. */ status = WSASend(socket->socket, buffers, tcp->write_slices.count, &bytes_sent, 0, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); + /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy + connection that has its send queue filled up. But if we don't, then we can + avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; if (status == 0) { @@ -291,25 +346,42 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, return ret; } + /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same + operation, this time asynchronously. */ memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); status = WSASend(socket->socket, buffers, tcp->write_slices.count, &bytes_sent, 0, &socket->write_info.overlapped, NULL); if (allocated) gpr_free(allocated); + /* It is possible the operation completed then. But we'd still get an IOCP + notification. So let's ignore it and wait for the IOCP. */ if (status != 0) { int error = WSAGetLastError(); if (error != WSA_IO_PENDING) { char *utf8_message = gpr_format_message(WSAGetLastError()); - __debugbreak(); - gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); + gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.", + utf8_message); gpr_free(utf8_message); - /* would the IO completion port be called anyway ? Let's assume not. */ + /* I'm pretty sure this is a very bad situation there. Hence the log. + What will happen now is that the socket will neither wait for read + or write, unless the caller retry, which is unlikely, but I am not + sure if that's guaranteed. And there might also be a read pending. + This means that the future orphanage of that socket will be in limbo, + and we're going to leak it. I have no idea what could cause this + specific case however, aside from a parameter error from our call. + Normal read errors would actually happen during the overlapped + operation, which is the supported way to go for that. */ tcp->outstanding_write = 0; tcp_unref(tcp); + /* Per the comment above, I'm going to treat that case as a hard failure + for now, and leave the option to catch that and debug. */ + __debugbreak(); return GRPC_ENDPOINT_WRITE_ERROR; } } + /* As all is now setup, we can now ask for the IOCP notification. It may + trigger the callback immediately however, but no matter. */ grpc_socket_notify_on_write(socket, on_write, tcp); return GRPC_ENDPOINT_WRITE_PENDING; } @@ -319,9 +391,20 @@ static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_iocp_add_socket(tcp->socket); } +/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks + for the potential read and write operations. It is up to the caller to + guarantee this isn't called in parallel to a read or write request, so + we're not going to protect against these. However the IO Completion Port + callback will happen from another thread, so we need to protect against + concurrent access of the data structure in that regard. */ static void win_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *) ep; + gpr_mu_lock(&tcp->mu); + /* At that point, what may happen is that we're already inside the IOCP + callback. See the comments in on_read and on_write. */ + tcp->shutting_down = 1; grpc_winsocket_shutdown(tcp->socket); + gpr_mu_unlock(&tcp->mu); } static void win_destroy(grpc_endpoint *ep) { @@ -338,6 +421,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->socket = socket; + gpr_mu_init(&tcp->mu); gpr_slice_buffer_init(&tcp->write_slices); gpr_ref_init(&tcp->refcount, 1); return &tcp->base;