Merge pull request #1457 from nicolasnoble/win32-fixes-once-again

Another round of win32 fixes and documentation.
pull/1432/head^2
Craig Tiller 10 years ago
commit c832553504
  1. 4
      src/core/iomgr/iocp_windows.c
  2. 4
      src/core/iomgr/iomgr_windows.c
  3. 3
      src/core/iomgr/pollset_kick_windows.h
  4. 5
      src/core/iomgr/pollset_windows.c
  5. 8
      src/core/iomgr/pollset_windows.h
  6. 29
      src/core/iomgr/socket_windows.c
  7. 54
      src/core/iomgr/socket_windows.h
  8. 28
      src/core/iomgr/tcp_server_windows.c
  9. 110
      src/core/iomgr/tcp_windows.c

@ -177,6 +177,10 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
socket->orphan = 1;
}
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) {

@ -43,6 +43,10 @@
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
/* Windows' io manager is going to be fully designed using IO completion
ports. All of what we're doing here is basically make sure that
Windows sockets are initialized in and out. */
static void winsock_init(void) {
WSADATA wsaData;
int status = WSAStartup(MAKEWORD(2, 0), &wsaData);

@ -36,6 +36,9 @@
#include <grpc/support/sync.h>
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. */
struct grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state {

@ -41,6 +41,11 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_windows.h"
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
won't actually do any polling, and return as quickly as possible. */
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);

@ -40,10 +40,10 @@
#include "src/core/iomgr/pollset_kick.h"
#include "src/core/iomgr/socket_windows.h"
/* forward declare only in this file to avoid leaking impl details via
pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not
use the struct tag */
struct grpc_fd;
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. A Windows "pollset" is merely a mutex
and a condition variable, as this is the minimal set of features we need
implemented for the rest of grpc. But we won't use them directly. */
typedef struct grpc_pollset {
gpr_mu mu;

@ -41,9 +41,9 @@
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_windows.h"
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
#include "src/core/iomgr/socket_windows.h"
grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
@ -55,16 +55,29 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
return r;
}
static void shutdown_op(grpc_winsocket_callback_info *info) {
if (!info->cb) return;
grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0);
}
/* Schedule a shutdown of the socket operations. Will call the pending
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */
void grpc_winsocket_shutdown(grpc_winsocket *socket) {
shutdown_op(&socket->read_info);
shutdown_op(&socket->write_info);
gpr_mu_lock(&socket->state_mu);
if (socket->read_info.cb) {
grpc_iomgr_add_delayed_callback(socket->read_info.cb,
socket->read_info.opaque, 0);
}
if (socket->write_info.cb) {
grpc_iomgr_add_delayed_callback(socket->write_info.cb,
socket->write_info.opaque, 0);
}
gpr_mu_unlock(&socket->state_mu);
}
/* Abandons a socket. Either we're going to queue it up for garbage collecting
from the IO Completion Port thread, or destroy it immediately. Note that this
mechanisms assumes that we're either always waiting for an operation, or we
explicitely know that we don't. If there is a future case where we can have
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
if (!winsocket->closed_early) {

@ -39,21 +39,43 @@
#include <grpc/support/sync.h>
#include <grpc/support/atm.h>
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
inside the winsocket wrapper. */
typedef struct grpc_winsocket_callback_info {
/* This is supposed to be a WSAOVERLAPPED, but in order to get that
* definition, we need to include ws2tcpip.h, which needs to be included
* from the top, otherwise it'll clash with a previous inclusion of
* windows.h that in turns includes winsock.h. If anyone knows a way
* to do it properly, feel free to send a patch.
*/
definition, we need to include ws2tcpip.h, which needs to be included
from the top, otherwise it'll clash with a previous inclusion of
windows.h that in turns includes winsock.h. If anyone knows a way
to do it properly, feel free to send a patch. */
OVERLAPPED overlapped;
/* The callback information for the pending operation. May be empty if the
caller hasn't registered a callback yet. */
void(*cb)(void *opaque, int success);
void *opaque;
/* A boolean to describe if the IO Completion Port got a notification for
that operation. This will happen if the operation completed before the
called had time to register a callback. We could avoid that behavior
altogether by forcing the caller to always register its callback before
proceeding queue an operation, but it is frequent for an IO Completion
Port to trigger quickly. This way we avoid a context switch for calling
the callback. We also simplify the read / write operations to avoid having
to hold a mutex for a long amount of time. */
int has_pending_iocp;
/* The results of the overlapped operation. */
DWORD bytes_transfered;
int wsa_error;
} grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
read, and one outstanding write. Doing an asynchronous accept means waiting
for a read operation. Doing an asynchronous connect means waiting for a
write operation. These are completely abitrary ties between the operation
and the kind of event, because we can have one overlapped per pending
operation, whichever its nature is. So we could have more dedicated pending
operation callbacks for connect and listen. But given the scope of listen
and accept, we don't need to go to that extent and waste memory. Also, this
is closer to what happens in posix world. */
typedef struct grpc_winsocket {
SOCKET socket;
@ -62,17 +84,35 @@ typedef struct grpc_winsocket {
gpr_mu state_mu;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp;
/* A boolean to indicate that the caller has abandonned that socket, but
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
/* A boolean to indicate that the socket was already closed somehow, and
that no operation is going to be pending. Trying to abandon a socket in
that state won't result in an orphan, but will instead be destroyed
without further delay. We could avoid that boolean by adding one into
grpc_winsocket_callback_info describing that the operation is pending,
but that 1) waste memory more and 2) obfuscate the intent a bit more. */
int closed_early;
} grpc_winsocket;
/* Create a wrapped windows handle.
This takes ownership of closing it. */
/* Create a wrapped windows handle. This takes ownership of it, meaning that
it will be responsible for closing it. */
grpc_winsocket *grpc_winsocket_create(SOCKET socket);
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Abandon a socket. */
void grpc_winsocket_orphan(grpc_winsocket *socket);
/* Destroy a socket. Should only be called by the IO Completion Port thread,
or by grpc_winsocket_orphan if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */

@ -109,7 +109,8 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */
for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket);
server_port *sp = &s->ports[i];
grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
@ -275,22 +276,28 @@ static void on_accept(void *arg, int from_iocp) {
/* If we're not notified from the IOCP, it means we are asked to shutdown.
This will initiate that shutdown. Calling closesocket will trigger an
IOCP notification, that will call this function a second time, from
the IOCP thread. */
the IOCP thread. Of course, this only works if the socket was, in fact,
listening. If that's not the case, we'd wait indefinitely. That's a bit
of a degenerate case, but it can happen if you create a server, but
don't start it. So let's support that by recursing once. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
closesocket(sock);
if (sock != INVALID_SOCKET) {
closesocket(sock);
} else {
on_accept(sp, 1);
}
return;
}
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
if (from_iocp) {
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
start_accept(sp);
}
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
start_accept(sp);
}
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@ -332,6 +339,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->socket = grpc_winsocket_create(sock);
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
}

@ -76,8 +76,11 @@ int grpc_tcp_prepare_socket(SOCKET sock) {
}
typedef struct grpc_tcp {
/* This is our C++ class derivation emulation. */
grpc_endpoint base;
/* The one socket this endpoint is using. */
grpc_winsocket *socket;
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
grpc_endpoint_read_cb read_cb;
@ -90,6 +93,10 @@ typedef struct grpc_tcp {
gpr_slice_buffer write_slices;
int outstanding_write;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
} grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) {
@ -100,11 +107,13 @@ static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp);
}
}
static void on_read(void *tcpp, int success) {
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
@ -114,16 +123,25 @@ static void on_read(void *tcpp, int success) {
grpc_endpoint_read_cb cb = tcp->read_cb;
grpc_winsocket_callback_info *info = &socket->read_info;
void *opaque = tcp->read_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_read);
if (!success) {
if (do_abort) {
if (from_iocp) gpr_slice_unref(tcp->read_slice);
tcp_unref(tcp);
cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
return;
}
tcp->outstanding_read = 0;
GPR_ASSERT(tcp->outstanding_read);
if (socket->read_info.wsa_error != 0) {
char *utf8_message = gpr_format_message(info->wsa_error);
@ -142,6 +160,9 @@ static void on_read(void *tcpp, int success) {
status = GRPC_ENDPOINT_CB_EOF;
}
}
tcp->outstanding_read = 0;
tcp_unref(tcp);
cb(opaque, slice, nslices, status);
}
@ -158,6 +179,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
WSABUF buffer;
GPR_ASSERT(!tcp->outstanding_read);
GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp);
tcp->outstanding_read = 1;
tcp->read_cb = cb;
@ -168,10 +190,12 @@ static void win_notify_on_read(grpc_endpoint *ep,
buffer.len = GPR_SLICE_LENGTH(tcp->read_slice);
buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
/* First let's try a synchronous, non-blocking read. */
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
/* This might heavily recurse. */
@ -179,6 +203,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
return;
}
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
&info->overlapped, NULL);
@ -192,30 +217,53 @@ static void win_notify_on_read(grpc_endpoint *ep,
if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError());
__debugbreak();
gpr_log(GPR_ERROR, "WSARecv error: %s", utf8_message);
gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.",
utf8_message);
gpr_free(utf8_message);
/* would the IO completion port be called anyway... ? Let's assume not. */
/* I'm pretty sure this is a very bad situation there. Hence the log.
What will happen now is that the socket will neither wait for read
or write, unless the caller retry, which is unlikely, but I am not
sure if that's guaranteed. And there might also be a write pending.
This means that the future orphanage of that socket will be in limbo,
and we're going to leak it. I have no idea what could cause this
specific case however, aside from a parameter error from our call.
Normal read errors would actually happen during the overlapped
operation, which is the supported way to go for that. */
tcp->outstanding_read = 0;
tcp_unref(tcp);
cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
/* Per the comment above, I'm going to treat that case as a hard failure
for now, and leave the option to catch that and debug. */
__debugbreak();
return;
}
grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
}
static void on_write(void *tcpp, int success) {
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
grpc_endpoint_write_cb cb = tcp->write_cb;
void *opaque = tcp->write_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_write);
if (!success) {
if (do_abort) {
if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp_unref(tcp);
cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
return;
@ -238,6 +286,7 @@ static void on_write(void *tcpp, int success) {
cb(opaque, status);
}
/* Initiates a write. */
static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_slice *slices, size_t nslices,
grpc_endpoint_write_cb cb,
@ -253,11 +302,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
WSABUF *buffers = local_buffers;
GPR_ASSERT(!tcp->outstanding_write);
GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp);
tcp->outstanding_write = 1;
tcp->write_cb = cb;
tcp->write_user_data = arg;
gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
@ -270,10 +321,14 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
}
/* First, let's try a synchronous, non-blocking write. */
status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
if (status == 0) {
@ -291,25 +346,42 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
return ret;
}
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
/* It is possible the operation completed then. But we'd still get an IOCP
notification. So let's ignore it and wait for the IOCP. */
if (status != 0) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError());
__debugbreak();
gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message);
gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.",
utf8_message);
gpr_free(utf8_message);
/* would the IO completion port be called anyway ? Let's assume not. */
/* I'm pretty sure this is a very bad situation there. Hence the log.
What will happen now is that the socket will neither wait for read
or write, unless the caller retry, which is unlikely, but I am not
sure if that's guaranteed. And there might also be a read pending.
This means that the future orphanage of that socket will be in limbo,
and we're going to leak it. I have no idea what could cause this
specific case however, aside from a parameter error from our call.
Normal read errors would actually happen during the overlapped
operation, which is the supported way to go for that. */
tcp->outstanding_write = 0;
tcp_unref(tcp);
/* Per the comment above, I'm going to treat that case as a hard failure
for now, and leave the option to catch that and debug. */
__debugbreak();
return GRPC_ENDPOINT_WRITE_ERROR;
}
}
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp);
return GRPC_ENDPOINT_WRITE_PENDING;
}
@ -319,9 +391,20 @@ static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_iocp_add_socket(tcp->socket);
}
/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
for the potential read and write operations. It is up to the caller to
guarantee this isn't called in parallel to a read or write request, so
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *) ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_endpoint *ep) {
@ -338,6 +421,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1);
return &tcp->base;

Loading…
Cancel
Save