merge with head

pull/1464/head
Yang Gao 10 years ago
commit 181ad88508
  1. 2
      src/core/iomgr/endpoint_pair_windows.c
  2. 4
      src/core/iomgr/iocp_windows.c
  3. 4
      src/core/iomgr/iomgr_windows.c
  4. 3
      src/core/iomgr/pollset_kick_windows.h
  5. 5
      src/core/iomgr/pollset_windows.c
  6. 8
      src/core/iomgr/pollset_windows.h
  7. 29
      src/core/iomgr/socket_windows.c
  8. 54
      src/core/iomgr/socket_windows.h
  9. 28
      src/core/iomgr/tcp_server_windows.c
  10. 110
      src/core/iomgr/tcp_windows.c
  11. 94
      src/core/profiling/basic_timers.c
  12. 4
      src/core/profiling/timers.h
  13. 23
      src/core/profiling/timers_preciseclock.h
  14. 11
      tools/run_tests/run_sanity.sh

@ -50,7 +50,7 @@ static void create_sockets(SOCKET sv[2]) {
SOCKET lst_sock = INVALID_SOCKET; SOCKET lst_sock = INVALID_SOCKET;
SOCKET cli_sock = INVALID_SOCKET; SOCKET cli_sock = INVALID_SOCKET;
SOCKADDR_IN addr; SOCKADDR_IN addr;
int addr_len; int addr_len = sizeof(addr);
lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
GPR_ASSERT(lst_sock != INVALID_SOCKET); GPR_ASSERT(lst_sock != INVALID_SOCKET);

@ -177,6 +177,10 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
socket->orphan = 1; socket->orphan = 1;
} }
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_winsocket *socket, static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque, void(*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) { grpc_winsocket_callback_info *info) {

@ -43,6 +43,10 @@
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
/* Windows' io manager is going to be fully designed using IO completion
ports. All of what we're doing here is basically make sure that
Windows sockets are initialized in and out. */
static void winsock_init(void) { static void winsock_init(void) {
WSADATA wsaData; WSADATA wsaData;
int status = WSAStartup(MAKEWORD(2, 0), &wsaData); int status = WSAStartup(MAKEWORD(2, 0), &wsaData);

@ -36,6 +36,9 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. */
struct grpc_kick_fd_info; struct grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state { typedef struct grpc_pollset_kick_state {

@ -41,6 +41,11 @@
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_windows.h" #include "src/core/iomgr/pollset_windows.h"
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
won't actually do any polling, and return as quickly as possible. */
void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu); gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv); gpr_cv_init(&pollset->cv);

@ -40,10 +40,10 @@
#include "src/core/iomgr/pollset_kick.h" #include "src/core/iomgr/pollset_kick.h"
#include "src/core/iomgr/socket_windows.h" #include "src/core/iomgr/socket_windows.h"
/* forward declare only in this file to avoid leaking impl details via /* There isn't really any such thing as a pollset under Windows, due to the
pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not nature of the IO completion ports. A Windows "pollset" is merely a mutex
use the struct tag */ and a condition variable, as this is the minimal set of features we need
struct grpc_fd; implemented for the rest of grpc. But we won't use them directly. */
typedef struct grpc_pollset { typedef struct grpc_pollset {
gpr_mu mu; gpr_mu mu;

@ -41,9 +41,9 @@
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_windows.h"
#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h" #include "src/core/iomgr/pollset_windows.h"
#include "src/core/iomgr/socket_windows.h"
grpc_winsocket *grpc_winsocket_create(SOCKET socket) { grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket)); grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
@ -55,16 +55,29 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
return r; return r;
} }
static void shutdown_op(grpc_winsocket_callback_info *info) { /* Schedule a shutdown of the socket operations. Will call the pending
if (!info->cb) return; operations to abort them. We need to do that this way because of the
grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0); various callsites of that function, which happens to be in various
} mutex hold states, and that'd be unsafe to call them directly. */
void grpc_winsocket_shutdown(grpc_winsocket *socket) { void grpc_winsocket_shutdown(grpc_winsocket *socket) {
shutdown_op(&socket->read_info); gpr_mu_lock(&socket->state_mu);
shutdown_op(&socket->write_info); if (socket->read_info.cb) {
grpc_iomgr_add_delayed_callback(socket->read_info.cb,
socket->read_info.opaque, 0);
}
if (socket->write_info.cb) {
grpc_iomgr_add_delayed_callback(socket->write_info.cb,
socket->write_info.opaque, 0);
}
gpr_mu_unlock(&socket->state_mu);
} }
/* Abandons a socket. Either we're going to queue it up for garbage collecting
from the IO Completion Port thread, or destroy it immediately. Note that this
mechanisms assumes that we're either always waiting for an operation, or we
explicitely know that we don't. If there is a future case where we can have
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) { void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket; SOCKET socket = winsocket->socket;
if (!winsocket->closed_early) { if (!winsocket->closed_early) {

@ -39,21 +39,43 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
inside the winsocket wrapper. */
typedef struct grpc_winsocket_callback_info { typedef struct grpc_winsocket_callback_info {
/* This is supposed to be a WSAOVERLAPPED, but in order to get that /* This is supposed to be a WSAOVERLAPPED, but in order to get that
* definition, we need to include ws2tcpip.h, which needs to be included definition, we need to include ws2tcpip.h, which needs to be included
* from the top, otherwise it'll clash with a previous inclusion of from the top, otherwise it'll clash with a previous inclusion of
* windows.h that in turns includes winsock.h. If anyone knows a way windows.h that in turns includes winsock.h. If anyone knows a way
* to do it properly, feel free to send a patch. to do it properly, feel free to send a patch. */
*/
OVERLAPPED overlapped; OVERLAPPED overlapped;
/* The callback information for the pending operation. May be empty if the
caller hasn't registered a callback yet. */
void(*cb)(void *opaque, int success); void(*cb)(void *opaque, int success);
void *opaque; void *opaque;
/* A boolean to describe if the IO Completion Port got a notification for
that operation. This will happen if the operation completed before the
called had time to register a callback. We could avoid that behavior
altogether by forcing the caller to always register its callback before
proceeding queue an operation, but it is frequent for an IO Completion
Port to trigger quickly. This way we avoid a context switch for calling
the callback. We also simplify the read / write operations to avoid having
to hold a mutex for a long amount of time. */
int has_pending_iocp; int has_pending_iocp;
/* The results of the overlapped operation. */
DWORD bytes_transfered; DWORD bytes_transfered;
int wsa_error; int wsa_error;
} grpc_winsocket_callback_info; } grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
read, and one outstanding write. Doing an asynchronous accept means waiting
for a read operation. Doing an asynchronous connect means waiting for a
write operation. These are completely abitrary ties between the operation
and the kind of event, because we can have one overlapped per pending
operation, whichever its nature is. So we could have more dedicated pending
operation callbacks for connect and listen. But given the scope of listen
and accept, we don't need to go to that extent and waste memory. Also, this
is closer to what happens in posix world. */
typedef struct grpc_winsocket { typedef struct grpc_winsocket {
SOCKET socket; SOCKET socket;
@ -62,17 +84,35 @@ typedef struct grpc_winsocket {
gpr_mu state_mu; gpr_mu state_mu;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp; int added_to_iocp;
/* A boolean to indicate that the caller has abandonned that socket, but
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan; int orphan;
/* A boolean to indicate that the socket was already closed somehow, and
that no operation is going to be pending. Trying to abandon a socket in
that state won't result in an orphan, but will instead be destroyed
without further delay. We could avoid that boolean by adding one into
grpc_winsocket_callback_info describing that the operation is pending,
but that 1) waste memory more and 2) obfuscate the intent a bit more. */
int closed_early; int closed_early;
} grpc_winsocket; } grpc_winsocket;
/* Create a wrapped windows handle. /* Create a wrapped windows handle. This takes ownership of it, meaning that
This takes ownership of closing it. */ it will be responsible for closing it. */
grpc_winsocket *grpc_winsocket_create(SOCKET socket); grpc_winsocket *grpc_winsocket_create(SOCKET socket);
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket *socket); void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Abandon a socket. */
void grpc_winsocket_orphan(grpc_winsocket *socket); void grpc_winsocket_orphan(grpc_winsocket *socket);
/* Destroy a socket. Should only be called by the IO Completion Port thread,
or by grpc_winsocket_orphan if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket); void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */

@ -109,7 +109,8 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
/* First, shutdown all fd's. This will queue abortion calls for all /* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */ of the pending accepts. */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket); server_port *sp = &s->ports[i];
grpc_winsocket_shutdown(sp->socket);
} }
/* This happens asynchronously. Wait while that happens. */ /* This happens asynchronously. Wait while that happens. */
while (s->active_ports) { while (s->active_ports) {
@ -275,22 +276,28 @@ static void on_accept(void *arg, int from_iocp) {
/* If we're not notified from the IOCP, it means we are asked to shutdown. /* If we're not notified from the IOCP, it means we are asked to shutdown.
This will initiate that shutdown. Calling closesocket will trigger an This will initiate that shutdown. Calling closesocket will trigger an
IOCP notification, that will call this function a second time, from IOCP notification, that will call this function a second time, from
the IOCP thread. */ the IOCP thread. Of course, this only works if the socket was, in fact,
listening. If that's not the case, we'd wait indefinitely. That's a bit
of a degenerate case, but it can happen if you create a server, but
don't start it. So let's support that by recursing once. */
sp->shutting_down = 1; sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET; sp->new_socket = INVALID_SOCKET;
closesocket(sock); if (sock != INVALID_SOCKET) {
closesocket(sock);
} else {
on_accept(sp, 1);
}
return;
} }
/* The only time we should call our callback, is where we successfully /* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */ managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep); if (ep) sp->server->cb(sp->server->cb_arg, ep);
if (from_iocp) { /* As we were notified from the IOCP of one and exactly one accept,
/* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned
the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next
to the new connection. We need to create a new one for the next connection. */
connection. */ start_accept(sp);
start_accept(sp);
}
} }
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@ -332,6 +339,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->socket = grpc_winsocket_create(sock); sp->socket = grpc_winsocket_create(sock);
sp->shutting_down = 0; sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }

@ -76,8 +76,11 @@ int grpc_tcp_prepare_socket(SOCKET sock) {
} }
typedef struct grpc_tcp { typedef struct grpc_tcp {
/* This is our C++ class derivation emulation. */
grpc_endpoint base; grpc_endpoint base;
/* The one socket this endpoint is using. */
grpc_winsocket *socket; grpc_winsocket *socket;
/* Refcounting how many operations are in progress. */
gpr_refcount refcount; gpr_refcount refcount;
grpc_endpoint_read_cb read_cb; grpc_endpoint_read_cb read_cb;
@ -90,6 +93,10 @@ typedef struct grpc_tcp {
gpr_slice_buffer write_slices; gpr_slice_buffer write_slices;
int outstanding_write; int outstanding_write;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
} grpc_tcp; } grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) {
@ -100,11 +107,13 @@ static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) { if (gpr_unref(&tcp->refcount)) {
gpr_slice_buffer_destroy(&tcp->write_slices); gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket); grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp); gpr_free(tcp);
} }
} }
static void on_read(void *tcpp, int success) { /* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp; grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *socket = tcp->socket; grpc_winsocket *socket = tcp->socket;
gpr_slice sub; gpr_slice sub;
@ -114,16 +123,25 @@ static void on_read(void *tcpp, int success) {
grpc_endpoint_read_cb cb = tcp->read_cb; grpc_endpoint_read_cb cb = tcp->read_cb;
grpc_winsocket_callback_info *info = &socket->read_info; grpc_winsocket_callback_info *info = &socket->read_info;
void *opaque = tcp->read_user_data; void *opaque = tcp->read_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_read); if (do_abort) {
if (from_iocp) gpr_slice_unref(tcp->read_slice);
if (!success) {
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
return; return;
} }
tcp->outstanding_read = 0; GPR_ASSERT(tcp->outstanding_read);
if (socket->read_info.wsa_error != 0) { if (socket->read_info.wsa_error != 0) {
char *utf8_message = gpr_format_message(info->wsa_error); char *utf8_message = gpr_format_message(info->wsa_error);
@ -142,6 +160,9 @@ static void on_read(void *tcpp, int success) {
status = GRPC_ENDPOINT_CB_EOF; status = GRPC_ENDPOINT_CB_EOF;
} }
} }
tcp->outstanding_read = 0;
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, slice, nslices, status); cb(opaque, slice, nslices, status);
} }
@ -158,6 +179,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
WSABUF buffer; WSABUF buffer;
GPR_ASSERT(!tcp->outstanding_read); GPR_ASSERT(!tcp->outstanding_read);
GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp); tcp_ref(tcp);
tcp->outstanding_read = 1; tcp->outstanding_read = 1;
tcp->read_cb = cb; tcp->read_cb = cb;
@ -168,10 +190,12 @@ static void win_notify_on_read(grpc_endpoint *ep,
buffer.len = GPR_SLICE_LENGTH(tcp->read_slice); buffer.len = GPR_SLICE_LENGTH(tcp->read_slice);
buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
/* First let's try a synchronous, non-blocking read. */
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
NULL, NULL); NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError(); info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read; info->bytes_transfered = bytes_read;
/* This might heavily recurse. */ /* This might heavily recurse. */
@ -179,6 +203,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
return; return;
} }
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
&info->overlapped, NULL); &info->overlapped, NULL);
@ -192,30 +217,53 @@ static void win_notify_on_read(grpc_endpoint *ep,
if (error != WSA_IO_PENDING) { if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError()); char *utf8_message = gpr_format_message(WSAGetLastError());
__debugbreak(); gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.",
gpr_log(GPR_ERROR, "WSARecv error: %s", utf8_message); utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
/* would the IO completion port be called anyway... ? Let's assume not. */ /* I'm pretty sure this is a very bad situation there. Hence the log.
What will happen now is that the socket will neither wait for read
or write, unless the caller retry, which is unlikely, but I am not
sure if that's guaranteed. And there might also be a write pending.
This means that the future orphanage of that socket will be in limbo,
and we're going to leak it. I have no idea what could cause this
specific case however, aside from a parameter error from our call.
Normal read errors would actually happen during the overlapped
operation, which is the supported way to go for that. */
tcp->outstanding_read = 0; tcp->outstanding_read = 0;
tcp_unref(tcp); tcp_unref(tcp);
cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR); cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
/* Per the comment above, I'm going to treat that case as a hard failure
for now, and leave the option to catch that and debug. */
__debugbreak();
return; return;
} }
grpc_socket_notify_on_read(tcp->socket, on_read, tcp); grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
} }
static void on_write(void *tcpp, int success) { /* Asynchronous callback from the IOCP, or the background thread. */
static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp; grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *handle = tcp->socket; grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info; grpc_winsocket_callback_info *info = &handle->write_info;
grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
grpc_endpoint_write_cb cb = tcp->write_cb; grpc_endpoint_write_cb cb = tcp->write_cb;
void *opaque = tcp->write_user_data; void *opaque = tcp->write_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_write); GPR_ASSERT(tcp->outstanding_write);
if (!success) { if (do_abort) {
if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
return; return;
@ -238,6 +286,7 @@ static void on_write(void *tcpp, int success) {
cb(opaque, status); cb(opaque, status);
} }
/* Initiates a write. */
static grpc_endpoint_write_status win_write(grpc_endpoint *ep, static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_slice *slices, size_t nslices, gpr_slice *slices, size_t nslices,
grpc_endpoint_write_cb cb, grpc_endpoint_write_cb cb,
@ -253,11 +302,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
WSABUF *buffers = local_buffers; WSABUF *buffers = local_buffers;
GPR_ASSERT(!tcp->outstanding_write); GPR_ASSERT(!tcp->outstanding_write);
GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp); tcp_ref(tcp);
tcp->outstanding_write = 1; tcp->outstanding_write = 1;
tcp->write_cb = cb; tcp->write_cb = cb;
tcp->write_user_data = arg; tcp->write_user_data = arg;
gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
@ -270,10 +321,14 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
} }
/* First, let's try a synchronous, non-blocking write. */
status = WSASend(socket->socket, buffers, tcp->write_slices.count, status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, NULL, NULL); &bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError(); info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
if (status == 0) { if (status == 0) {
@ -291,25 +346,42 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
return ret; return ret;
} }
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSASend(socket->socket, buffers, tcp->write_slices.count, status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL); &bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated); if (allocated) gpr_free(allocated);
/* It is possible the operation completed then. But we'd still get an IOCP
notification. So let's ignore it and wait for the IOCP. */
if (status != 0) { if (status != 0) {
int error = WSAGetLastError(); int error = WSAGetLastError();
if (error != WSA_IO_PENDING) { if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError()); char *utf8_message = gpr_format_message(WSAGetLastError());
__debugbreak(); gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.",
gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
/* would the IO completion port be called anyway ? Let's assume not. */ /* I'm pretty sure this is a very bad situation there. Hence the log.
What will happen now is that the socket will neither wait for read
or write, unless the caller retry, which is unlikely, but I am not
sure if that's guaranteed. And there might also be a read pending.
This means that the future orphanage of that socket will be in limbo,
and we're going to leak it. I have no idea what could cause this
specific case however, aside from a parameter error from our call.
Normal read errors would actually happen during the overlapped
operation, which is the supported way to go for that. */
tcp->outstanding_write = 0; tcp->outstanding_write = 0;
tcp_unref(tcp); tcp_unref(tcp);
/* Per the comment above, I'm going to treat that case as a hard failure
for now, and leave the option to catch that and debug. */
__debugbreak();
return GRPC_ENDPOINT_WRITE_ERROR; return GRPC_ENDPOINT_WRITE_ERROR;
} }
} }
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp); grpc_socket_notify_on_write(socket, on_write, tcp);
return GRPC_ENDPOINT_WRITE_PENDING; return GRPC_ENDPOINT_WRITE_PENDING;
} }
@ -319,9 +391,20 @@ static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_iocp_add_socket(tcp->socket); grpc_iocp_add_socket(tcp->socket);
} }
/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
for the potential read and write operations. It is up to the caller to
guarantee this isn't called in parallel to a read or write request, so
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) { static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *) ep; grpc_tcp *tcp = (grpc_tcp *) ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket); grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
} }
static void win_destroy(grpc_endpoint *ep) { static void win_destroy(grpc_endpoint *ep) {
@ -338,6 +421,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
memset(tcp, 0, sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable; tcp->base.vtable = &vtable;
tcp->socket = socket; tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices); gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
return &tcp->base; return &tcp->base;

@ -45,15 +45,10 @@
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <stdio.h> #include <stdio.h>
typedef enum { typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type;
BEGIN = '{',
END = '}',
MARK = '.'
} marker_type;
typedef struct grpc_timer_entry { typedef struct grpc_timer_entry {
grpc_precise_clock tm; grpc_precise_clock tm;
gpr_thd_id thd;
int tag; int tag;
marker_type type; marker_type type;
void* id; void* id;
@ -61,71 +56,35 @@ typedef struct grpc_timer_entry {
int line; int line;
} grpc_timer_entry; } grpc_timer_entry;
struct grpc_timers_log { #define MAX_COUNT (1024 * 1024 / sizeof(grpc_timer_entry))
gpr_mu mu;
grpc_timer_entry* log;
int num_entries;
int capacity;
int capacity_limit;
FILE* fp;
};
grpc_timers_log* grpc_timers_log_global = NULL; static __thread grpc_timer_entry log[MAX_COUNT];
static __thread int count;
static grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) { static void log_report() {
grpc_timers_log* log = gpr_malloc(sizeof(*log));
/* TODO (vpai): Allow allocation below limit */
log->log = gpr_malloc(capacity_limit * sizeof(*log->log));
/* TODO (vpai): Improve concurrency, do per-thread logging? */
gpr_mu_init(&log->mu);
log->num_entries = 0;
log->capacity = log->capacity_limit = capacity_limit;
log->fp = dump;
return log;
}
static void log_report_locked(grpc_timers_log* log) {
FILE* fp = log->fp;
int i; int i;
for (i = 0; i < log->num_entries; i++) { for (i = 0; i < count; i++) {
grpc_timer_entry* entry = &(log->log[i]); grpc_timer_entry* entry = &(log[i]);
fprintf(fp, "GRPC_LAT_PROF "); printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT " %p %c %d %p %s %d\n",
grpc_precise_clock_print(&entry->tm, fp); GRPC_PRECISE_CLOCK_PRINTF_ARGS(&entry->tm),
fprintf(fp, " %p %c %d %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->type, entry->tag, (void*)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tag,
entry->id, entry->file, entry->line); entry->id, entry->file, entry->line);
} }
/* Now clear out the log */ /* Now clear out the log */
log->num_entries = 0; count = 0;
}
static void grpc_timers_log_destroy(grpc_timers_log* log) {
gpr_mu_lock(&log->mu);
log_report_locked(log);
gpr_mu_unlock(&log->mu);
gpr_free(log->log);
gpr_mu_destroy(&log->mu);
gpr_free(log);
} }
static void grpc_timers_log_add(grpc_timers_log* log, int tag, marker_type type, void* id, static void grpc_timers_log_add(int tag, marker_type type, void* id,
const char* file, int line) { const char* file, int line) {
grpc_timer_entry* entry; grpc_timer_entry* entry;
/* TODO (vpai) : Improve concurrency */ /* TODO (vpai) : Improve concurrency */
gpr_mu_lock(&log->mu); if (count == MAX_COUNT) {
if (log->num_entries == log->capacity_limit) { log_report();
log_report_locked(log);
} }
entry = &log->log[log->num_entries++]; entry = &log[count++];
grpc_precise_clock_now(&entry->tm); grpc_precise_clock_now(&entry->tm);
entry->tag = tag; entry->tag = tag;
@ -133,38 +92,31 @@ static void grpc_timers_log_add(grpc_timers_log* log, int tag, marker_type type,
entry->id = id; entry->id = id;
entry->file = file; entry->file = file;
entry->line = line; entry->line = line;
entry->thd = gpr_thd_currentid();
gpr_mu_unlock(&log->mu);
} }
/* Latency profiler API implementation. */ /* Latency profiler API implementation. */
void grpc_timer_add_mark(int tag, void* id, const char* file, int line) { void grpc_timer_add_mark(int tag, void* id, const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, MARK, id, file, line); grpc_timers_log_add(tag, MARK, id, file, line);
} }
} }
void grpc_timer_begin(int tag, void* id, const char *file, int line) { void grpc_timer_begin(int tag, void* id, const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, BEGIN, id, file, line); grpc_timers_log_add(tag, BEGIN, id, file, line);
} }
} }
void grpc_timer_end(int tag, void* id, const char *file, int line) { void grpc_timer_end(int tag, void* id, const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, END, id, file, line); grpc_timers_log_add(tag, END, id, file, line);
} }
} }
/* Basic profiler specific API functions. */ /* Basic profiler specific API functions. */
void grpc_timers_global_init(void) { void grpc_timers_global_init(void) {}
grpc_timers_log_global = grpc_timers_log_create(100000, stdout);
}
void grpc_timers_global_destroy(void) { void grpc_timers_global_destroy(void) {}
grpc_timers_log_destroy(grpc_timers_log_global);
}
#else /* !GRPC_BASIC_PROFILER */ #else /* !GRPC_BASIC_PROFILER */
void grpc_timers_global_init(void) {} void grpc_timers_global_init(void) {}

@ -112,9 +112,7 @@ enum grpc_profiling_tags {
#endif /* GRPC_STAP_PROFILER */ #endif /* GRPC_STAP_PROFILER */
#ifdef GRPC_BASIC_PROFILER #ifdef GRPC_BASIC_PROFILER
typedef struct grpc_timers_log grpc_timers_log; /* Empty placeholder for now. */
extern grpc_timers_log *grpc_timers_log_global;
#endif /* GRPC_BASIC_PROFILER */ #endif /* GRPC_BASIC_PROFILER */
#endif /* at least one profiler requested. */ #endif /* at least one profiler requested. */

@ -43,7 +43,7 @@ typedef long long int grpc_precise_clock;
#if defined(__i386__) #if defined(__i386__)
static void grpc_precise_clock_now(grpc_precise_clock *clk) { static void grpc_precise_clock_now(grpc_precise_clock *clk) {
grpc_precise_clock ret; grpc_precise_clock ret;
__asm__ volatile("rdtsc" : "=A" (ret) ); __asm__ volatile("rdtsc" : "=A"(ret));
*clk = ret; *clk = ret;
} }
@ -51,7 +51,7 @@ static void grpc_precise_clock_now(grpc_precise_clock *clk) {
#elif defined(__x86_64__) || defined(__amd64__) #elif defined(__x86_64__) || defined(__amd64__)
static void grpc_precise_clock_now(grpc_precise_clock *clk) { static void grpc_precise_clock_now(grpc_precise_clock *clk) {
unsigned long long low, high; unsigned long long low, high;
__asm__ volatile("rdtsc" : "=a" (low), "=d" (high)); __asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
*clk = (high << 32) | low; *clk = (high << 32) | low;
} }
#endif #endif
@ -61,19 +61,21 @@ static void grpc_precise_clock_init() {
time_t start = time(NULL); time_t start = time(NULL);
grpc_precise_clock start_time; grpc_precise_clock start_time;
grpc_precise_clock end_time; grpc_precise_clock end_time;
while (time(NULL) == start); while (time(NULL) == start)
;
grpc_precise_clock_now(&start_time); grpc_precise_clock_now(&start_time);
while (time(NULL) == start+1); while (time(NULL) == start + 1)
;
grpc_precise_clock_now(&end_time); grpc_precise_clock_now(&end_time);
cycles_per_second = end_time - start_time; cycles_per_second = end_time - start_time;
} }
static double grpc_precise_clock_scaling_factor() { static double grpc_precise_clock_scaling_factor() {
gpr_once_init(&precise_clock_init, grpc_precise_clock_init); gpr_once_init(&precise_clock_init, grpc_precise_clock_init);
return 1e6 / cycles_per_second; return 1e6 / cycles_per_second;
}
static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) {
fprintf(fp, "%f", *clk * grpc_precise_clock_scaling_factor());
} }
#define GRPC_PRECISE_CLOCK_FORMAT "%f"
#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \
(*(clk)*grpc_precise_clock_scaling_factor())
#else #else
typedef struct grpc_precise_clock grpc_precise_clock; typedef struct grpc_precise_clock grpc_precise_clock;
struct grpc_precise_clock { struct grpc_precise_clock {
@ -82,6 +84,9 @@ struct grpc_precise_clock {
static void grpc_precise_clock_now(grpc_precise_clock* clk) { static void grpc_precise_clock_now(grpc_precise_clock* clk) {
clk->clock = gpr_now(); clk->clock = gpr_now();
} }
#define GRPC_PRECISE_CLOCK_FORMAT "%ld.%09d"
#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \
(clk)->clock.tv_sec, (clk)->clock.tv_nsec
static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) { static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) {
fprintf(fp, "%ld.%09d", clk->clock.tv_sec, clk->clock.tv_nsec); fprintf(fp, "%ld.%09d", clk->clock.tv_sec, clk->clock.tv_nsec);
} }

@ -37,3 +37,14 @@ export TEST=true
cd `dirname $0`/../.. cd `dirname $0`/../..
./tools/buildgen/generate_projects.sh ./tools/buildgen/generate_projects.sh
submodules=`mktemp`
git submodule > $submodules
diff -u $submodules - << EOF
05b155ff59114735ec8cd089f669c4c3d8f59029 third_party/gflags (v2.1.0-45-g05b155f)
3df69d3aefde7671053d4e3c242b228e5d79c83f third_party/openssl (OpenSSL_1_0_2a)
644a6a1da71385e9d7a7a26b3476c93fdd71788c third_party/protobuf (v3.0.0-alpha-1-35-g644a6a1)
50893291621658f355bc5b4d450a8d06a563053d third_party/zlib (v1.2.8)
EOF

Loading…
Cancel
Save