Merge remote-tracking branch 'upstream/master'

pull/1469/head
David Garcia Quintas 10 years ago
commit 7c2ba271f9
  1. 1
      .travis.yml
  2. 2
      gRPC.podspec
  3. 19
      src/compiler/cpp_generator.cc
  4. 2
      src/core/iomgr/endpoint_pair_windows.c
  5. 6
      src/core/iomgr/iocp_windows.c
  6. 4
      src/core/iomgr/iomgr_windows.c
  7. 3
      src/core/iomgr/pollset_kick_windows.h
  8. 5
      src/core/iomgr/pollset_windows.c
  9. 8
      src/core/iomgr/pollset_windows.h
  10. 53
      src/core/iomgr/socket_windows.c
  11. 55
      src/core/iomgr/socket_windows.h
  12. 48
      src/core/iomgr/tcp_client_windows.c
  13. 156
      src/core/iomgr/tcp_posix.c
  14. 93
      src/core/iomgr/tcp_server_windows.c
  15. 112
      src/core/iomgr/tcp_windows.c
  16. 95
      src/core/profiling/basic_timers.c
  17. 31
      src/core/profiling/timers.h
  18. 23
      src/core/profiling/timers_preciseclock.h
  19. 7
      src/core/surface/call.c
  20. 1
      src/core/surface/init.c
  21. 29
      src/core/transport/chttp2_transport.c
  22. 1
      src/core/transport/chttp2_transport.h
  23. 2
      tools/gce_setup/cloud_prod_runner.sh
  24. 11
      tools/run_tests/run_sanity.sh
  25. 13
      vsprojects/openssl.props
  26. 13
      vsprojects/zlib.props

@ -25,7 +25,6 @@ env:
- CONFIG=opt TEST=python - CONFIG=opt TEST=python
- CONFIG=opt TEST=csharp - CONFIG=opt TEST=csharp
- USE_GCC=4.4 CONFIG=opt TEST=build - USE_GCC=4.4 CONFIG=opt TEST=build
- USE_GCC=4.5 CONFIG=opt TEST=build
script: script:
- rvm use $RUBY_VERSION - rvm use $RUBY_VERSION
- gem install bundler - gem install bundler

@ -4,7 +4,7 @@ Pod::Spec.new do |s|
s.summary = 'Generic gRPC client library for iOS' s.summary = 'Generic gRPC client library for iOS'
s.homepage = 'https://www.grpc.io' s.homepage = 'https://www.grpc.io'
s.license = 'New BSD' s.license = 'New BSD'
s.authors = { 'Jorge Canizales' => 'jcanizales@google.com' s.authors = { 'Jorge Canizales' => 'jcanizales@google.com',
'Michael Lumish' => 'mlumish@google.com' } 'Michael Lumish' => 'mlumish@google.com' }
# s.source = { :git => 'https://github.com/grpc/grpc.git', :tag => 'release-0_5_0' } # s.source = { :git => 'https://github.com/grpc/grpc.git', :tag => 'release-0_5_0' }

@ -828,9 +828,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, " "$Request$, "
"$Response$>(\n" "$Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
@ -840,10 +838,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n" " ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< " " new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n" "$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, "
"::grpc::ServerReader< $Request$>*, $Response$*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
@ -853,10 +848,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n" " ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< " " new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n" "$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, "
"const $Request$*, ::grpc::ServerWriter< $Response$>*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print( printer->Print(
@ -866,10 +858,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n" " ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< " " new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n" "$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, "
"::grpc::ServerReaderWriter< $Response$, $Request$>*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} }
} }

@ -50,7 +50,7 @@ static void create_sockets(SOCKET sv[2]) {
SOCKET lst_sock = INVALID_SOCKET; SOCKET lst_sock = INVALID_SOCKET;
SOCKET cli_sock = INVALID_SOCKET; SOCKET cli_sock = INVALID_SOCKET;
SOCKADDR_IN addr; SOCKADDR_IN addr;
int addr_len; int addr_len = sizeof(addr);
lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
GPR_ASSERT(lst_sock != INVALID_SOCKET); GPR_ASSERT(lst_sock != INVALID_SOCKET);

@ -172,9 +172,15 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
} }
void grpc_iocp_socket_orphan(grpc_winsocket *socket) { void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
GPR_ASSERT(!socket->orphan);
gpr_atm_full_fetch_add(&g_orphans, 1); gpr_atm_full_fetch_add(&g_orphans, 1);
socket->orphan = 1;
} }
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_winsocket *socket, static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque, void(*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) { grpc_winsocket_callback_info *info) {

@ -43,6 +43,10 @@
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
/* Windows' io manager is going to be fully designed using IO completion
ports. All of what we're doing here is basically make sure that
Windows sockets are initialized in and out. */
static void winsock_init(void) { static void winsock_init(void) {
WSADATA wsaData; WSADATA wsaData;
int status = WSAStartup(MAKEWORD(2, 0), &wsaData); int status = WSAStartup(MAKEWORD(2, 0), &wsaData);

@ -36,6 +36,9 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. */
struct grpc_kick_fd_info; struct grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state { typedef struct grpc_pollset_kick_state {

@ -41,6 +41,11 @@
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_windows.h" #include "src/core/iomgr/pollset_windows.h"
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
won't actually do any polling, and return as quickly as possible. */
void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu); gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv); gpr_cv_init(&pollset->cv);

@ -40,10 +40,10 @@
#include "src/core/iomgr/pollset_kick.h" #include "src/core/iomgr/pollset_kick.h"
#include "src/core/iomgr/socket_windows.h" #include "src/core/iomgr/socket_windows.h"
/* forward declare only in this file to avoid leaking impl details via /* There isn't really any such thing as a pollset under Windows, due to the
pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not nature of the IO completion ports. A Windows "pollset" is merely a mutex
use the struct tag */ and a condition variable, as this is the minimal set of features we need
struct grpc_fd; implemented for the rest of grpc. But we won't use them directly. */
typedef struct grpc_pollset { typedef struct grpc_pollset {
gpr_mu mu; gpr_mu mu;

@ -32,17 +32,18 @@
*/ */
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#ifdef GPR_WINSOCK_SOCKET #ifdef GPR_WINSOCK_SOCKET
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_windows.h"
#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h" #include "src/core/iomgr/pollset_windows.h"
#include "src/core/iomgr/socket_windows.h"
grpc_winsocket *grpc_winsocket_create(SOCKET socket) { grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket)); grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
@ -54,26 +55,44 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
return r; return r;
} }
static void shutdown_op(grpc_winsocket_callback_info *info) { /* Schedule a shutdown of the socket operations. Will call the pending
if (!info->cb) return; operations to abort them. We need to do that this way because of the
grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0); various callsites of that function, which happens to be in various
} mutex hold states, and that'd be unsafe to call them directly. */
void grpc_winsocket_shutdown(grpc_winsocket *socket) { void grpc_winsocket_shutdown(grpc_winsocket *socket) {
shutdown_op(&socket->read_info); gpr_mu_lock(&socket->state_mu);
shutdown_op(&socket->write_info); if (socket->read_info.cb) {
grpc_iomgr_add_delayed_callback(socket->read_info.cb,
socket->read_info.opaque, 0);
}
if (socket->write_info.cb) {
grpc_iomgr_add_delayed_callback(socket->write_info.cb,
socket->write_info.opaque, 0);
}
gpr_mu_unlock(&socket->state_mu);
} }
void grpc_winsocket_orphan(grpc_winsocket *socket) { /* Abandons a socket. Either we're going to queue it up for garbage collecting
grpc_iocp_socket_orphan(socket); from the IO Completion Port thread, or destroy it immediately. Note that this
socket->orphan = 1; mechanisms assumes that we're either always waiting for an operation, or we
explicitely know that we don't. If there is a future case where we can have
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
if (!winsocket->closed_early) {
grpc_iocp_socket_orphan(winsocket);
}
if (winsocket->closed_early) {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
grpc_iomgr_unref(); grpc_iomgr_unref();
closesocket(socket->socket);
} }
void grpc_winsocket_destroy(grpc_winsocket *socket) { void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
gpr_mu_destroy(&socket->state_mu); gpr_mu_destroy(&winsocket->state_mu);
gpr_free(socket); gpr_free(winsocket);
} }
#endif /* GPR_WINSOCK_SOCKET */ #endif /* GPR_WINSOCK_SOCKET */

@ -39,21 +39,43 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
inside the winsocket wrapper. */
typedef struct grpc_winsocket_callback_info { typedef struct grpc_winsocket_callback_info {
/* This is supposed to be a WSAOVERLAPPED, but in order to get that /* This is supposed to be a WSAOVERLAPPED, but in order to get that
* definition, we need to include ws2tcpip.h, which needs to be included definition, we need to include ws2tcpip.h, which needs to be included
* from the top, otherwise it'll clash with a previous inclusion of from the top, otherwise it'll clash with a previous inclusion of
* windows.h that in turns includes winsock.h. If anyone knows a way windows.h that in turns includes winsock.h. If anyone knows a way
* to do it properly, feel free to send a patch. to do it properly, feel free to send a patch. */
*/
OVERLAPPED overlapped; OVERLAPPED overlapped;
/* The callback information for the pending operation. May be empty if the
caller hasn't registered a callback yet. */
void(*cb)(void *opaque, int success); void(*cb)(void *opaque, int success);
void *opaque; void *opaque;
/* A boolean to describe if the IO Completion Port got a notification for
that operation. This will happen if the operation completed before the
called had time to register a callback. We could avoid that behavior
altogether by forcing the caller to always register its callback before
proceeding queue an operation, but it is frequent for an IO Completion
Port to trigger quickly. This way we avoid a context switch for calling
the callback. We also simplify the read / write operations to avoid having
to hold a mutex for a long amount of time. */
int has_pending_iocp; int has_pending_iocp;
/* The results of the overlapped operation. */
DWORD bytes_transfered; DWORD bytes_transfered;
int wsa_error; int wsa_error;
} grpc_winsocket_callback_info; } grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
read, and one outstanding write. Doing an asynchronous accept means waiting
for a read operation. Doing an asynchronous connect means waiting for a
write operation. These are completely abitrary ties between the operation
and the kind of event, because we can have one overlapped per pending
operation, whichever its nature is. So we could have more dedicated pending
operation callbacks for connect and listen. But given the scope of listen
and accept, we don't need to go to that extent and waste memory. Also, this
is closer to what happens in posix world. */
typedef struct grpc_winsocket { typedef struct grpc_winsocket {
SOCKET socket; SOCKET socket;
@ -62,16 +84,35 @@ typedef struct grpc_winsocket {
gpr_mu state_mu; gpr_mu state_mu;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp; int added_to_iocp;
/* A boolean to indicate that the caller has abandonned that socket, but
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan; int orphan;
/* A boolean to indicate that the socket was already closed somehow, and
that no operation is going to be pending. Trying to abandon a socket in
that state won't result in an orphan, but will instead be destroyed
without further delay. We could avoid that boolean by adding one into
grpc_winsocket_callback_info describing that the operation is pending,
but that 1) waste memory more and 2) obfuscate the intent a bit more. */
int closed_early;
} grpc_winsocket; } grpc_winsocket;
/* Create a wrapped windows handle. /* Create a wrapped windows handle. This takes ownership of it, meaning that
This takes ownership of closing it. */ it will be responsible for closing it. */
grpc_winsocket *grpc_winsocket_create(SOCKET socket); grpc_winsocket *grpc_winsocket_create(SOCKET socket);
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket *socket); void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Abandon a socket. */
void grpc_winsocket_orphan(grpc_winsocket *socket); void grpc_winsocket_orphan(grpc_winsocket *socket);
/* Destroy a socket. Should only be called by the IO Completion Port thread,
or by grpc_winsocket_orphan if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket); void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */

@ -59,6 +59,7 @@ typedef struct {
gpr_timespec deadline; gpr_timespec deadline;
grpc_alarm alarm; grpc_alarm alarm;
int refs; int refs;
int aborted;
} async_connect; } async_connect;
static void async_connect_cleanup(async_connect *ac) { static void async_connect_cleanup(async_connect *ac) {
@ -70,26 +71,31 @@ static void async_connect_cleanup(async_connect *ac) {
} }
} }
static void on_alarm(void *acp, int success) { static void on_alarm(void *acp, int occured) {
async_connect *ac = acp; async_connect *ac = acp;
gpr_mu_lock(&ac->mu); gpr_mu_lock(&ac->mu);
if (ac->socket != NULL && success) { /* If the alarm didn't occor, it got cancelled. */
if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket); grpc_winsocket_shutdown(ac->socket);
} }
async_connect_cleanup(ac); async_connect_cleanup(ac);
} }
static void on_connect(void *acp, int success) { static void on_connect(void *acp, int from_iocp) {
async_connect *ac = acp; async_connect *ac = acp;
SOCKET sock = ac->socket->socket; SOCKET sock = ac->socket->socket;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
grpc_winsocket_callback_info *info = &ac->socket->write_info; grpc_winsocket_callback_info *info = &ac->socket->write_info;
void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg; void *cb_arg = ac->cb_arg;
int aborted;
grpc_alarm_cancel(&ac->alarm); grpc_alarm_cancel(&ac->alarm);
if (success) { gpr_mu_lock(&ac->mu);
aborted = ac->aborted;
if (from_iocp) {
DWORD transfered_bytes = 0; DWORD transfered_bytes = 0;
DWORD flags; DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@ -107,20 +113,40 @@ static void on_connect(void *acp, int success) {
} }
} else { } else {
gpr_log(GPR_ERROR, "on_connect is shutting down"); gpr_log(GPR_ERROR, "on_connect is shutting down");
goto finish; /* If the connection timeouts, we will still get a notification from
the IOCP whatever happens. So we're just going to flag that connection
as being in the process of being aborted, and wait for the IOCP. We
can't just orphan the socket now, because the IOCP might already have
gotten a successful connection, which is our worst-case scenario.
We need to call our callback now to respect the deadline. */
ac->aborted = 1;
gpr_mu_unlock(&ac->mu);
cb(cb_arg, NULL);
return;
} }
abort(); abort();
finish: finish:
gpr_mu_lock(&ac->mu); /* If we don't have an endpoint, it means the connection failed,
if (!ep) { so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
if (!ep || aborted) {
/* If the connection failed, it means we won't get an IOCP notification,
so let's flag it as already closed. But if the connection was aborted,
while we still got an endpoint, we have to wait for the IOCP to collect
that socket. So let's properly flag that. */
ac->socket->closed_early = !ep;
grpc_winsocket_orphan(ac->socket); grpc_winsocket_orphan(ac->socket);
} }
async_connect_cleanup(ac); async_connect_cleanup(ac);
cb(cb_arg, ep); /* If the connection was aborted, the callback was already called when
the deadline was met. */
if (!aborted) cb(cb_arg, ep);
} }
/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
void *arg, const struct sockaddr *addr, void *arg, const struct sockaddr *addr,
int addr_len, gpr_timespec deadline) { int addr_len, gpr_timespec deadline) {
@ -156,6 +182,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
goto failure; goto failure;
} }
/* Grab the function pointer for ConnectEx for that specific socket.
It may change depending on the interface. */
status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx),
&ioctl_num_bytes, NULL, NULL); &ioctl_num_bytes, NULL, NULL);
@ -178,6 +206,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
info = &socket->write_info; info = &socket->write_info;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
an IOCP notification, so let's ignore it. */
if (!success) { if (!success) {
int error = WSAGetLastError(); int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) { if (error != ERROR_IO_PENDING) {
@ -192,6 +222,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket; ac->socket = socket;
gpr_mu_init(&ac->mu); gpr_mu_init(&ac->mu);
ac->refs = 2; ac->refs = 2;
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_socket_notify_on_write(socket, on_connect, ac); grpc_socket_notify_on_write(socket, on_connect, ac);
@ -202,6 +233,7 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message); gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
if (socket) { if (socket) {
socket->closed_early = 1;
grpc_winsocket_orphan(socket); grpc_winsocket_orphan(socket);
} else if (sock != INVALID_SOCKET) { } else if (sock != INVALID_SOCKET) {
closesocket(sock); closesocket(sock);

@ -258,6 +258,8 @@ typedef struct {
grpc_endpoint base; grpc_endpoint base;
grpc_fd *em_fd; grpc_fd *em_fd;
int fd; int fd;
int iov_size; /* Number of slices to allocate per read attempt */
int finished_edge;
size_t slice_size; size_t slice_size;
gpr_refcount refcount; gpr_refcount refcount;
@ -315,9 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
#define INLINE_SLICE_BUFFER_SIZE 8 #define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4 #define MAX_READ_IOVEC 4
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { static void grpc_tcp_continue_read(grpc_tcp *tcp) {
grpc_tcp *tcp = (grpc_tcp *)arg;
int iov_size = 1;
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
struct msghdr msg; struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC]; struct iovec iov[MAX_READ_IOVEC];
@ -327,88 +327,103 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
gpr_slice *final_slices; gpr_slice *final_slices;
size_t final_nslices; size_t final_nslices;
GPR_ASSERT(!tcp->finished_edge);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0); 0);
if (!success) { allocated_bytes = slice_state_append_blocks_into_iovec(
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); &read_state, iov, tcp->iov_size, tcp->slice_size);
grpc_tcp_unref(tcp);
return; msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = tcp->iov_size;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
GRPC_TIMER_MARK(RECVMSG_BEGIN, 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_MARK(RECVMSG_END, 0);
if (read_bytes < allocated_bytes) {
/* TODO(klempner): Consider a second read first, in hopes of getting a
* quick EAGAIN and saving a bunch of allocations. */
slice_state_remove_last(&read_state, read_bytes < 0
? allocated_bytes
: allocated_bytes - read_bytes);
} }
/* TODO(klempner): Limit the amount we read at once. */ if (read_bytes < 0) {
for (;;) { /* NB: After calling the user_cb a parallel call of the read handler may
allocated_bytes = slice_state_append_blocks_into_iovec( * be running. */
&read_state, iov, iov_size, tcp->slice_size); if (errno == EAGAIN) {
if (tcp->iov_size > 1) {
msg.msg_name = NULL; tcp->iov_size /= 2;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
if (read_bytes < allocated_bytes) {
/* TODO(klempner): Consider a second read first, in hopes of getting a
* quick EAGAIN and saving a bunch of allocations. */
slice_state_remove_last(&read_state, read_bytes < 0
? allocated_bytes
: allocated_bytes - read_bytes);
}
if (read_bytes < 0) {
/* NB: After calling the user_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (slice_state_has_available(&read_state)) {
/* TODO(klempner): We should probably do the call into the application
without all this junk on the stack */
/* FIXME(klempner): Refcount properly */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
}
} else {
/* TODO(klempner): Log interesting errors */
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} }
return;
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
if (slice_state_has_available(&read_state)) { if (slice_state_has_available(&read_state)) {
/* there were bytes already read: pass them up to the application */ /* TODO(klempner): We should probably do the call into the application
without all this junk on the stack */
/* FIXME(klempner): Refcount properly */
slice_state_transfer_ownership(&read_state, &final_slices, slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices); &final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); tcp->finished_edge = 1;
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else { } else {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); /* We've consumed the edge, request a new one */
slice_state_destroy(&read_state);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} }
} else {
/* TODO(klempner): Log interesting errors */
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
slice_state_destroy(&read_state); slice_state_destroy(&read_state);
grpc_tcp_unref(tcp); grpc_tcp_unref(tcp);
return;
} else if (iov_size < MAX_READ_IOVEC) {
++iov_size;
} }
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
if (slice_state_has_available(&read_state)) {
/* there were bytes already read: pass them up to the application */
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
} else {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
}
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} else {
if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
GPR_ASSERT(slice_state_has_available(&read_state));
slice_state_transfer_ownership(&read_state, &final_slices,
&final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
} }
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
} }
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (!success) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
} else {
grpc_tcp_continue_read(tcp);
}
}
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
void *user_data) { void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
@ -416,7 +431,12 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->read_cb = cb; tcp->read_cb = cb;
tcp->read_user_data = user_data; tcp->read_user_data = user_data;
gpr_ref(&tcp->refcount); gpr_ref(&tcp->refcount);
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); if (tcp->finished_edge) {
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
}
} }
#define MAX_WRITE_IOVEC 16 #define MAX_WRITE_IOVEC 16
@ -554,6 +574,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->read_user_data = NULL; tcp->read_user_data = NULL;
tcp->write_user_data = NULL; tcp->write_user_data = NULL;
tcp->slice_size = slice_size; tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
slice_state_init(&tcp->write_state, NULL, 0, 0); slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */ /* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);

@ -55,11 +55,17 @@
/* one listening port */ /* one listening port */
typedef struct server_port { typedef struct server_port {
gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32]; /* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */
SOCKET new_socket; SOCKET new_socket;
/* The listener winsocked. */
grpc_winsocket *socket; grpc_winsocket *socket;
grpc_tcp_server *server; grpc_tcp_server *server;
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down;
} server_port; } server_port;
/* the overall server */ /* the overall server */
@ -79,6 +85,8 @@ struct grpc_tcp_server {
size_t port_capacity; size_t port_capacity;
}; };
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
@ -92,24 +100,30 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
return s; return s;
} }
/* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_tcp_server *s, void grpc_tcp_server_destroy(grpc_tcp_server *s,
void (*shutdown_done)(void *shutdown_done_arg), void (*shutdown_done)(void *shutdown_done_arg),
void *shutdown_done_arg) { void *shutdown_done_arg) {
size_t i; size_t i;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
/* shutdown all fd's */ /* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket); server_port *sp = &s->ports[i];
grpc_winsocket_shutdown(sp->socket);
} }
/* wait while that happens */ /* This happens asynchronously. Wait while that happens. */
while (s->active_ports) { while (s->active_ports) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
/* delete ALL the things */ /* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
closed by the system. */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i]; server_port *sp = &s->ports[i];
sp->socket->closed_early = 1;
grpc_winsocket_orphan(sp->socket); grpc_winsocket_orphan(sp->socket);
} }
gpr_free(s->ports); gpr_free(s->ports);
@ -120,7 +134,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
} }
} }
/* Prepare a recently-created socket for listening. */ /* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr, static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
int addr_len) { int addr_len) {
struct sockaddr_storage sockname_temp; struct sockaddr_storage sockname_temp;
@ -168,8 +182,11 @@ error:
return -1; return -1;
} }
static void on_accept(void *arg, int success); /* start_accept will reference that for the IOCP notification request. */
static void on_accept(void *arg, int from_iocp);
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
static void start_accept(server_port *port) { static void start_accept(server_port *port) {
SOCKET sock = INVALID_SOCKET; SOCKET sock = INVALID_SOCKET;
char *message; char *message;
@ -191,12 +208,13 @@ static void start_accept(server_port *port) {
goto failure; goto failure;
} }
/* TODO(jtattermusch): probably a race here, we regularly get use-after-free on server shutdown */ /* Start the "accept" asynchronously. */
GPR_ASSERT(port->socket != (grpc_winsocket*)0xfeeefeee);
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
addrlen, addrlen, &bytes_received, addrlen, addrlen, &bytes_received,
&port->socket->read_info.overlapped); &port->socket->read_info.overlapped);
/* It is possible to get an accept immediately without delay. However, we
will still get an IOCP notification for it. So let's just ignore it. */
if (!success) { if (!success) {
int error = WSAGetLastError(); int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) { if (error != ERROR_IO_PENDING) {
@ -205,6 +223,8 @@ static void start_accept(server_port *port) {
} }
} }
/* We're ready to do the accept. Calling grpc_socket_notify_on_read may
immediately process an accept that happened in the meantime. */
port->new_socket = sock; port->new_socket = sock;
grpc_socket_notify_on_read(port->socket, on_accept, port); grpc_socket_notify_on_read(port->socket, on_accept, port);
return; return;
@ -216,14 +236,30 @@ failure:
if (sock != INVALID_SOCKET) closesocket(sock); if (sock != INVALID_SOCKET) closesocket(sock);
} }
/* event manager callback when reads are ready */ /* Event manager callback when reads are ready. */
static void on_accept(void *arg, int success) { static void on_accept(void *arg, int from_iocp) {
server_port *sp = arg; server_port *sp = arg;
SOCKET sock = sp->new_socket; SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
if (success) { /* The shutdown sequence is done in two parts. This is the second
part here, acknowledging the IOCP notification, and doing nothing
else, especially not queuing a new accept. */
if (sp->shutting_down) {
GPR_ASSERT(from_iocp);
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
}
gpr_mu_unlock(&sp->server->mu);
return;
}
if (from_iocp) {
/* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */
DWORD transfered_bytes = 0; DWORD transfered_bytes = 0;
DWORD flags; DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@ -237,18 +273,31 @@ static void on_accept(void *arg, int success) {
ep = grpc_tcp_create(grpc_winsocket_create(sock)); ep = grpc_tcp_create(grpc_winsocket_create(sock));
} }
} else { } else {
closesocket(sock); /* If we're not notified from the IOCP, it means we are asked to shutdown.
gpr_mu_lock(&sp->server->mu); This will initiate that shutdown. Calling closesocket will trigger an
if (0 == --sp->server->active_ports) { IOCP notification, that will call this function a second time, from
gpr_cv_broadcast(&sp->server->cv); the IOCP thread. Of course, this only works if the socket was, in fact,
listening. If that's not the case, we'd wait indefinitely. That's a bit
of a degenerate case, but it can happen if you create a server, but
don't start it. So let's support that by recursing once. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
if (sock != INVALID_SOCKET) {
closesocket(sock);
} else {
on_accept(sp, 1);
} }
gpr_mu_unlock(&sp->server->mu); return;
} }
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep); if (ep) sp->server->cb(sp->server->cb_arg, ep);
if (success) { /* As we were notified from the IOCP of one and exactly one accept,
start_accept(sp); the former socked we created has now either been destroy or assigned
} to the new connection. We need to create a new one for the next
connection. */
start_accept(sp);
} }
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@ -262,6 +311,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
if (sock == INVALID_SOCKET) return -1; if (sock == INVALID_SOCKET) return -1;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */
status = status =
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL); &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
@ -286,7 +337,9 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp = &s->ports[s->nports++]; sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock); sp->socket = grpc_winsocket_create(sock);
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }

@ -76,8 +76,11 @@ int grpc_tcp_prepare_socket(SOCKET sock) {
} }
typedef struct grpc_tcp { typedef struct grpc_tcp {
/* This is our C++ class derivation emulation. */
grpc_endpoint base; grpc_endpoint base;
/* The one socket this endpoint is using. */
grpc_winsocket *socket; grpc_winsocket *socket;
/* Refcounting how many operations are in progress. */
gpr_refcount refcount; gpr_refcount refcount;
grpc_endpoint_read_cb read_cb; grpc_endpoint_read_cb read_cb;
@ -90,6 +93,10 @@ typedef struct grpc_tcp {
gpr_slice_buffer write_slices; gpr_slice_buffer write_slices;
int outstanding_write; int outstanding_write;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
} grpc_tcp; } grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) {
@ -100,11 +107,13 @@ static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) { if (gpr_unref(&tcp->refcount)) {
gpr_slice_buffer_destroy(&tcp->write_slices); gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket); grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp); gpr_free(tcp);
} }
} }
static void on_read(void *tcpp, int success) { /* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp; grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *socket = tcp->socket; grpc_winsocket *socket = tcp->socket;
gpr_slice sub; gpr_slice sub;
@ -114,22 +123,32 @@ static void on_read(void *tcpp, int success) {
grpc_endpoint_read_cb cb = tcp->read_cb; grpc_endpoint_read_cb cb = tcp->read_cb;
grpc_winsocket_callback_info *info = &socket->read_info; grpc_winsocket_callback_info *info = &socket->read_info;
void *opaque = tcp->read_user_data; void *opaque = tcp->read_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_read); if (do_abort) {
if (from_iocp) gpr_slice_unref(tcp->read_slice);
if (!success) {
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
return; return;
} }
tcp->outstanding_read = 0; GPR_ASSERT(tcp->outstanding_read);
if (socket->read_info.wsa_error != 0) { if (socket->read_info.wsa_error != 0) {
char *utf8_message = gpr_format_message(info->wsa_error); char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR; status = GRPC_ENDPOINT_CB_ERROR;
socket->closed_early = 1;
} else { } else {
if (info->bytes_transfered != 0) { if (info->bytes_transfered != 0) {
sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
@ -141,6 +160,9 @@ static void on_read(void *tcpp, int success) {
status = GRPC_ENDPOINT_CB_EOF; status = GRPC_ENDPOINT_CB_EOF;
} }
} }
tcp->outstanding_read = 0;
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, slice, nslices, status); cb(opaque, slice, nslices, status);
} }
@ -157,6 +179,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
WSABUF buffer; WSABUF buffer;
GPR_ASSERT(!tcp->outstanding_read); GPR_ASSERT(!tcp->outstanding_read);
GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp); tcp_ref(tcp);
tcp->outstanding_read = 1; tcp->outstanding_read = 1;
tcp->read_cb = cb; tcp->read_cb = cb;
@ -167,10 +190,12 @@ static void win_notify_on_read(grpc_endpoint *ep,
buffer.len = GPR_SLICE_LENGTH(tcp->read_slice); buffer.len = GPR_SLICE_LENGTH(tcp->read_slice);
buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
/* First let's try a synchronous, non-blocking read. */
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
NULL, NULL); NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError(); info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read; info->bytes_transfered = bytes_read;
/* This might heavily recurse. */ /* This might heavily recurse. */
@ -178,6 +203,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
return; return;
} }
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
&info->overlapped, NULL); &info->overlapped, NULL);
@ -191,30 +217,53 @@ static void win_notify_on_read(grpc_endpoint *ep,
if (error != WSA_IO_PENDING) { if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError()); char *utf8_message = gpr_format_message(WSAGetLastError());
__debugbreak(); gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.",
gpr_log(GPR_ERROR, "WSARecv error: %s", utf8_message); utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
/* would the IO completion port be called anyway... ? Let's assume not. */ /* I'm pretty sure this is a very bad situation there. Hence the log.
What will happen now is that the socket will neither wait for read
or write, unless the caller retry, which is unlikely, but I am not
sure if that's guaranteed. And there might also be a write pending.
This means that the future orphanage of that socket will be in limbo,
and we're going to leak it. I have no idea what could cause this
specific case however, aside from a parameter error from our call.
Normal read errors would actually happen during the overlapped
operation, which is the supported way to go for that. */
tcp->outstanding_read = 0; tcp->outstanding_read = 0;
tcp_unref(tcp); tcp_unref(tcp);
cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR); cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
/* Per the comment above, I'm going to treat that case as a hard failure
for now, and leave the option to catch that and debug. */
__debugbreak();
return; return;
} }
grpc_socket_notify_on_read(tcp->socket, on_read, tcp); grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
} }
static void on_write(void *tcpp, int success) { /* Asynchronous callback from the IOCP, or the background thread. */
static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *) tcpp; grpc_tcp *tcp = (grpc_tcp *) tcpp;
grpc_winsocket *handle = tcp->socket; grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info; grpc_winsocket_callback_info *info = &handle->write_info;
grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
grpc_endpoint_write_cb cb = tcp->write_cb; grpc_endpoint_write_cb cb = tcp->write_cb;
void *opaque = tcp->write_user_data; void *opaque = tcp->write_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
though, so we're going to do it from here. */
do_abort = 1;
}
gpr_mu_unlock(&tcp->mu);
GPR_ASSERT(tcp->outstanding_write); GPR_ASSERT(tcp->outstanding_write);
if (!success) { if (do_abort) {
if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp_unref(tcp); tcp_unref(tcp);
cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
return; return;
@ -225,6 +274,7 @@ static void on_write(void *tcpp, int success) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR; status = GRPC_ENDPOINT_CB_ERROR;
tcp->socket->closed_early = 1;
} else { } else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
} }
@ -236,6 +286,7 @@ static void on_write(void *tcpp, int success) {
cb(opaque, status); cb(opaque, status);
} }
/* Initiates a write. */
static grpc_endpoint_write_status win_write(grpc_endpoint *ep, static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
gpr_slice *slices, size_t nslices, gpr_slice *slices, size_t nslices,
grpc_endpoint_write_cb cb, grpc_endpoint_write_cb cb,
@ -251,11 +302,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
WSABUF *buffers = local_buffers; WSABUF *buffers = local_buffers;
GPR_ASSERT(!tcp->outstanding_write); GPR_ASSERT(!tcp->outstanding_write);
GPR_ASSERT(!tcp->shutting_down);
tcp_ref(tcp); tcp_ref(tcp);
tcp->outstanding_write = 1; tcp->outstanding_write = 1;
tcp->write_cb = cb; tcp->write_cb = cb;
tcp->write_user_data = arg; tcp->write_user_data = arg;
gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
@ -268,10 +321,14 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
} }
/* First, let's try a synchronous, non-blocking write. */
status = WSASend(socket->socket, buffers, tcp->write_slices.count, status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, NULL, NULL); &bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError(); info->wsa_error = status == 0 ? 0 : WSAGetLastError();
/* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
if (status == 0) { if (status == 0) {
@ -289,25 +346,42 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
return ret; return ret;
} }
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSASend(socket->socket, buffers, tcp->write_slices.count, status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL); &bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated); if (allocated) gpr_free(allocated);
/* It is possible the operation completed then. But we'd still get an IOCP
notification. So let's ignore it and wait for the IOCP. */
if (status != 0) { if (status != 0) {
int error = WSAGetLastError(); int error = WSAGetLastError();
if (error != WSA_IO_PENDING) { if (error != WSA_IO_PENDING) {
char *utf8_message = gpr_format_message(WSAGetLastError()); char *utf8_message = gpr_format_message(WSAGetLastError());
__debugbreak(); gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.",
gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message); utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
/* would the IO completion port be called anyway ? Let's assume not. */ /* I'm pretty sure this is a very bad situation there. Hence the log.
What will happen now is that the socket will neither wait for read
or write, unless the caller retry, which is unlikely, but I am not
sure if that's guaranteed. And there might also be a read pending.
This means that the future orphanage of that socket will be in limbo,
and we're going to leak it. I have no idea what could cause this
specific case however, aside from a parameter error from our call.
Normal read errors would actually happen during the overlapped
operation, which is the supported way to go for that. */
tcp->outstanding_write = 0; tcp->outstanding_write = 0;
tcp_unref(tcp); tcp_unref(tcp);
/* Per the comment above, I'm going to treat that case as a hard failure
for now, and leave the option to catch that and debug. */
__debugbreak();
return GRPC_ENDPOINT_WRITE_ERROR; return GRPC_ENDPOINT_WRITE_ERROR;
} }
} }
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp); grpc_socket_notify_on_write(socket, on_write, tcp);
return GRPC_ENDPOINT_WRITE_PENDING; return GRPC_ENDPOINT_WRITE_PENDING;
} }
@ -317,9 +391,20 @@ static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_iocp_add_socket(tcp->socket); grpc_iocp_add_socket(tcp->socket);
} }
/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
for the potential read and write operations. It is up to the caller to
guarantee this isn't called in parallel to a read or write request, so
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) { static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *) ep; grpc_tcp *tcp = (grpc_tcp *) ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket); grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
} }
static void win_destroy(grpc_endpoint *ep) { static void win_destroy(grpc_endpoint *ep) {
@ -336,6 +421,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
memset(tcp, 0, sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable; tcp->base.vtable = &vtable;
tcp->socket = socket; tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices); gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
return &tcp->base; return &tcp->base;

@ -45,15 +45,10 @@
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <stdio.h> #include <stdio.h>
typedef enum { typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type;
BEGIN = '{',
END = '}',
MARK = '.'
} marker_type;
typedef struct grpc_timer_entry { typedef struct grpc_timer_entry {
grpc_precise_clock tm; grpc_precise_clock tm;
gpr_thd_id thd;
int tag; int tag;
marker_type type; marker_type type;
void* id; void* id;
@ -61,71 +56,35 @@ typedef struct grpc_timer_entry {
int line; int line;
} grpc_timer_entry; } grpc_timer_entry;
struct grpc_timers_log { #define MAX_COUNT (1024 * 1024 / sizeof(grpc_timer_entry))
gpr_mu mu;
grpc_timer_entry* log;
int num_entries;
int capacity;
int capacity_limit;
FILE* fp;
};
grpc_timers_log* grpc_timers_log_global = NULL; static __thread grpc_timer_entry log[MAX_COUNT];
static __thread int count;
static grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) { static void log_report() {
grpc_timers_log* log = gpr_malloc(sizeof(*log));
/* TODO (vpai): Allow allocation below limit */
log->log = gpr_malloc(capacity_limit * sizeof(*log->log));
/* TODO (vpai): Improve concurrency, do per-thread logging? */
gpr_mu_init(&log->mu);
log->num_entries = 0;
log->capacity = log->capacity_limit = capacity_limit;
log->fp = dump;
return log;
}
static void log_report_locked(grpc_timers_log* log) {
FILE* fp = log->fp;
int i; int i;
for (i = 0; i < log->num_entries; i++) { for (i = 0; i < count; i++) {
grpc_timer_entry* entry = &(log->log[i]); grpc_timer_entry* entry = &(log[i]);
fprintf(fp, "GRPC_LAT_PROF "); printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT " %p %c %d %p %s %d\n",
grpc_precise_clock_print(&entry->tm, fp); GRPC_PRECISE_CLOCK_PRINTF_ARGS(&entry->tm),
fprintf(fp, " %p %c %d %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->type, entry->tag, (void*)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tag,
entry->id, entry->file, entry->line); entry->id, entry->file, entry->line);
} }
/* Now clear out the log */ /* Now clear out the log */
log->num_entries = 0; count = 0;
} }
static void grpc_timers_log_destroy(grpc_timers_log* log) { static void grpc_timers_log_add(int tag, marker_type type, void* id,
gpr_mu_lock(&log->mu);
log_report_locked(log);
gpr_mu_unlock(&log->mu);
gpr_free(log->log);
gpr_mu_destroy(&log->mu);
gpr_free(log);
}
static void grpc_timers_log_add(grpc_timers_log* log, int tag, marker_type type, void* id,
const char* file, int line) { const char* file, int line) {
grpc_timer_entry* entry; grpc_timer_entry* entry;
/* TODO (vpai) : Improve concurrency */ /* TODO (vpai) : Improve concurrency */
gpr_mu_lock(&log->mu); if (count == MAX_COUNT) {
if (log->num_entries == log->capacity_limit) { log_report();
log_report_locked(log);
} }
entry = &log->log[log->num_entries++]; entry = &log[count++];
grpc_precise_clock_now(&entry->tm); grpc_precise_clock_now(&entry->tm);
entry->tag = tag; entry->tag = tag;
@ -133,39 +92,31 @@ static void grpc_timers_log_add(grpc_timers_log* log, int tag, marker_type type,
entry->id = id; entry->id = id;
entry->file = file; entry->file = file;
entry->line = line; entry->line = line;
entry->thd = gpr_thd_currentid();
gpr_mu_unlock(&log->mu);
} }
/* Latency profiler API implementation. */ /* Latency profiler API implementation. */
void grpc_timer_add_mark(int tag, void* id, const char* file, int line) { void grpc_timer_add_mark(int tag, void* id, const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, MARK, id, file, line); grpc_timers_log_add(tag, MARK, id, file, line);
} }
} }
void grpc_timer_begin(int tag, void* id, const char *file, int line) { void grpc_timer_begin(int tag, void* id, const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, BEGIN, id, file, line); grpc_timers_log_add(tag, BEGIN, id, file, line);
} }
} }
void grpc_timer_end(int tag, void* id, const char *file, int line) { void grpc_timer_end(int tag, void* id, const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, END, id, file, line); grpc_timers_log_add(tag, END, id, file, line);
} }
} }
/* Basic profiler specific API functions. */ /* Basic profiler specific API functions. */
void grpc_timers_global_init(void) { void grpc_timers_global_init(void) {}
grpc_timers_log_global = grpc_timers_log_create(100000, stdout);
}
void grpc_timers_global_destroy(void) {
grpc_timers_log_destroy(grpc_timers_log_global);
}
void grpc_timers_global_destroy(void) {}
#else /* !GRPC_BASIC_PROFILER */ #else /* !GRPC_BASIC_PROFILER */
void grpc_timers_global_init(void) {} void grpc_timers_global_init(void) {}

@ -41,9 +41,9 @@ extern "C" {
void grpc_timers_global_init(void); void grpc_timers_global_init(void);
void grpc_timers_global_destroy(void); void grpc_timers_global_destroy(void);
void grpc_timer_add_mark(int tag, void* id, const char *file, int line); void grpc_timer_add_mark(int tag, void *id, const char *file, int line);
void grpc_timer_begin(int tag, void* id, const char *file, int line); void grpc_timer_begin(int tag, void *id, const char *file, int line);
void grpc_timer_end(int tag, void* id, const char *file, int line); void grpc_timer_end(int tag, void *id, const char *file, int line);
enum grpc_profiling_tags { enum grpc_profiling_tags {
/* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */ /* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */
@ -74,13 +74,16 @@ enum grpc_profiling_tags {
#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) #if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */ /* No profiling. No-op all the things. */
#define GRPC_TIMER_MARK(tag, id) \ #define GRPC_TIMER_MARK(tag, id) \
do {} while(0) do { \
} while (0)
#define GRPC_TIMER_BEGIN(tag, id) \ #define GRPC_TIMER_BEGIN(tag, id) \
do {} while(0) do { \
} while (0)
#define GRPC_TIMER_END(tag, id) \ #define GRPC_TIMER_END(tag, id) \
do {} while(0) do { \
} while (0)
#else /* at least one profiler requested... */ #else /* at least one profiler requested... */
/* ... hopefully only one. */ /* ... hopefully only one. */
@ -94,14 +97,14 @@ enum grpc_profiling_tags {
grpc_timer_add_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ grpc_timer_add_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
} }
#define GRPC_TIMER_BEGIN(tag, id) \ #define GRPC_TIMER_BEGIN(tag, id) \
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
grpc_timer_begin(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ grpc_timer_begin(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
} }
#define GRPC_TIMER_END(tag, id) \ #define GRPC_TIMER_END(tag, id) \
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
grpc_timer_end(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ grpc_timer_end(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
} }
#ifdef GRPC_STAP_PROFILER #ifdef GRPC_STAP_PROFILER
@ -109,9 +112,7 @@ enum grpc_profiling_tags {
#endif /* GRPC_STAP_PROFILER */ #endif /* GRPC_STAP_PROFILER */
#ifdef GRPC_BASIC_PROFILER #ifdef GRPC_BASIC_PROFILER
typedef struct grpc_timers_log grpc_timers_log; /* Empty placeholder for now. */
extern grpc_timers_log *grpc_timers_log_global;
#endif /* GRPC_BASIC_PROFILER */ #endif /* GRPC_BASIC_PROFILER */
#endif /* at least one profiler requested. */ #endif /* at least one profiler requested. */

@ -43,7 +43,7 @@ typedef long long int grpc_precise_clock;
#if defined(__i386__) #if defined(__i386__)
static void grpc_precise_clock_now(grpc_precise_clock *clk) { static void grpc_precise_clock_now(grpc_precise_clock *clk) {
grpc_precise_clock ret; grpc_precise_clock ret;
__asm__ volatile("rdtsc" : "=A" (ret) ); __asm__ volatile("rdtsc" : "=A"(ret));
*clk = ret; *clk = ret;
} }
@ -51,7 +51,7 @@ static void grpc_precise_clock_now(grpc_precise_clock *clk) {
#elif defined(__x86_64__) || defined(__amd64__) #elif defined(__x86_64__) || defined(__amd64__)
static void grpc_precise_clock_now(grpc_precise_clock *clk) { static void grpc_precise_clock_now(grpc_precise_clock *clk) {
unsigned long long low, high; unsigned long long low, high;
__asm__ volatile("rdtsc" : "=a" (low), "=d" (high)); __asm__ volatile("rdtsc" : "=a"(low), "=d"(high));
*clk = (high << 32) | low; *clk = (high << 32) | low;
} }
#endif #endif
@ -61,19 +61,21 @@ static void grpc_precise_clock_init() {
time_t start = time(NULL); time_t start = time(NULL);
grpc_precise_clock start_time; grpc_precise_clock start_time;
grpc_precise_clock end_time; grpc_precise_clock end_time;
while (time(NULL) == start); while (time(NULL) == start)
;
grpc_precise_clock_now(&start_time); grpc_precise_clock_now(&start_time);
while (time(NULL) == start+1); while (time(NULL) == start + 1)
;
grpc_precise_clock_now(&end_time); grpc_precise_clock_now(&end_time);
cycles_per_second = end_time - start_time; cycles_per_second = end_time - start_time;
} }
static double grpc_precise_clock_scaling_factor() { static double grpc_precise_clock_scaling_factor() {
gpr_once_init(&precise_clock_init, grpc_precise_clock_init); gpr_once_init(&precise_clock_init, grpc_precise_clock_init);
return 1e6 / cycles_per_second; return 1e6 / cycles_per_second;
}
static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) {
fprintf(fp, "%f", *clk * grpc_precise_clock_scaling_factor());
} }
#define GRPC_PRECISE_CLOCK_FORMAT "%f"
#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \
(*(clk)*grpc_precise_clock_scaling_factor())
#else #else
typedef struct grpc_precise_clock grpc_precise_clock; typedef struct grpc_precise_clock grpc_precise_clock;
struct grpc_precise_clock { struct grpc_precise_clock {
@ -82,6 +84,9 @@ struct grpc_precise_clock {
static void grpc_precise_clock_now(grpc_precise_clock* clk) { static void grpc_precise_clock_now(grpc_precise_clock* clk) {
clk->clock = gpr_now(); clk->clock = gpr_now();
} }
#define GRPC_PRECISE_CLOCK_FORMAT "%ld.%09d"
#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \
(clk)->clock.tv_sec, (clk)->clock.tv_nsec
static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) { static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) {
fprintf(fp, "%ld.%09d", clk->clock.tv_sec, clk->clock.tv_nsec); fprintf(fp, "%ld.%09d", clk->clock.tv_sec, clk->clock.tv_nsec);
} }

@ -34,6 +34,7 @@
#include "src/core/surface/call.h" #include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h" #include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h" #include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/channel.h" #include "src/core/surface/channel.h"
@ -405,14 +406,14 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) { static int need_more_data(grpc_call *call) {
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS) || is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) || grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client && (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
call->read_state != READ_STATE_STREAM_CLOSED); call->read_state < READ_STATE_GOT_INITIAL_METADATA);
} }
static void unlock(grpc_call *call) { static void unlock(grpc_call *call) {
@ -685,6 +686,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) { static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc; grpc_call *call = pc;
size_t i; size_t i;
GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0);
lock(call); lock(call);
call->receiving = 0; call->receiving = 0;
if (success) { if (success) {
@ -729,6 +731,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call); unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0);
} }
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,

@ -59,6 +59,7 @@ void grpc_init(void) {
grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("channel", &grpc_trace_channel);
grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("surface", &grpc_surface_trace);
grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
grpc_register_tracer("batch", &grpc_trace_batch); grpc_register_tracer("batch", &grpc_trace_batch);
grpc_security_pre_init(); grpc_security_pre_init();
grpc_iomgr_init(); grpc_iomgr_init();

@ -37,6 +37,7 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_data.h"
#include "src/core/transport/chttp2/frame_goaway.h" #include "src/core/transport/chttp2/frame_goaway.h"
@ -64,6 +65,7 @@
#define CLIENT_CONNECT_STRLEN 24 #define CLIENT_CONNECT_STRLEN 24
int grpc_http_trace = 0; int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
typedef struct transport transport; typedef struct transport transport;
typedef struct stream stream; typedef struct stream stream;
@ -74,6 +76,12 @@ typedef struct stream stream;
else \ else \
stmt stmt
#define FLOWCTL_TRACE(t, obj, dir, id, delta) \
if (!grpc_flowctl_trace) \
; \
else \
flowctl_trace(t, #dir, obj->dir##_window, id, delta)
/* streams are kept in various linked lists depending on what things need to /* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */ happen to them... this enum labels each list */
typedef enum { typedef enum {
@ -382,6 +390,12 @@ static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
static void add_metadata_batch(transport *t, stream *s); static void add_metadata_batch(transport *t, stream *s);
static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
gpr_uint32 id, gpr_int32 delta) {
gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
delta, window + delta);
}
/* /*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING * CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ */
@ -772,6 +786,8 @@ static void unlock(transport *t) {
grpc_stream_op_buffer nuke_now; grpc_stream_op_buffer nuke_now;
const grpc_transport_callbacks *cb = t->cb; const grpc_transport_callbacks *cb = t->cb;
GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0);
grpc_sopb_init(&nuke_now); grpc_sopb_init(&nuke_now);
if (t->nuke_later_sopb.nops) { if (t->nuke_later_sopb.nops) {
grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb); grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
@ -820,6 +836,8 @@ static void unlock(transport *t) {
/* finally unlock */ /* finally unlock */
gpr_mu_unlock(&t->mu); gpr_mu_unlock(&t->mu);
GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0);
/* perform some callbacks if necessary */ /* perform some callbacks if necessary */
for (i = 0; i < num_goaways; i++) { for (i = 0; i < num_goaways; i++) {
cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug); cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
@ -850,6 +868,8 @@ static void unlock(transport *t) {
grpc_sopb_destroy(&nuke_now); grpc_sopb_destroy(&nuke_now);
gpr_free(goaways); gpr_free(goaways);
GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0);
} }
/* /*
@ -896,6 +916,8 @@ static int prepare_write(transport *t) {
window_delta = grpc_chttp2_preencode( window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops, s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta; t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta; s->outgoing_window -= window_delta;
@ -924,6 +946,7 @@ static int prepare_write(transport *t) {
if (!s->read_closed && window_delta) { if (!s->read_closed && window_delta) {
gpr_slice_buffer_add( gpr_slice_buffer_add(
&t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta; s->incoming_window += window_delta;
} }
} }
@ -933,6 +956,7 @@ static int prepare_write(transport *t) {
window_delta = t->connection_window_target - t->incoming_window; window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->outbuf, gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(0, window_delta)); grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta; t->incoming_window += window_delta;
} }
@ -1259,6 +1283,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
return GRPC_CHTTP2_CONNECTION_ERROR; return GRPC_CHTTP2_CONNECTION_ERROR;
} }
FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size);
FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
t->incoming_window -= t->incoming_frame_size; t->incoming_window -= t->incoming_frame_size;
s->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size;
@ -1608,6 +1634,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
for (i = 0; i < t->stream_map.count; i++) { for (i = 0; i < t->stream_map.count; i++) {
stream *s = (stream *)(t->stream_map.values[i]); stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0; int was_window_empty = s->outgoing_window <= 0;
FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update);
s->outgoing_window += st.initial_window_update; s->outgoing_window += st.initial_window_update;
if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb && if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
s->outgoing_sopb->nops > 0) { s->outgoing_sopb->nops > 0) {
@ -1626,6 +1653,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
GRPC_CHTTP2_FLOW_CONTROL_ERROR), GRPC_CHTTP2_FLOW_CONTROL_ERROR),
GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1); GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else { } else {
FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update);
s->outgoing_window += st.window_update; s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again, /* if this window update makes outgoing ops writable again,
flag that */ flag that */
@ -1640,6 +1668,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, t->outgoing_window)) { if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
drop_connection(t); drop_connection(t);
} else { } else {
FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update);
t->outgoing_window += st.window_update; t->outgoing_window += st.window_update;
} }
} }

@ -38,6 +38,7 @@
#include "src/core/transport/transport.h" #include "src/core/transport/transport.h"
extern int grpc_http_trace; extern int grpc_http_trace;
extern int grpc_flowctl_trace;
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg, void *arg,

@ -38,7 +38,7 @@ main() {
source grpc_docker.sh source grpc_docker.sh
test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response)
auth_test_cases=(service_account_creds compute_engine_creds jwt_token_creds) auth_test_cases=(service_account_creds compute_engine_creds jwt_token_creds)
clients=(cxx java go ruby node csharp_mono python) clients=(cxx java go ruby node csharp_mono python php)
for test_case in "${test_cases[@]}" for test_case in "${test_cases[@]}"
do do
for client in "${clients[@]}" for client in "${clients[@]}"

@ -37,3 +37,14 @@ export TEST=true
cd `dirname $0`/../.. cd `dirname $0`/../..
./tools/buildgen/generate_projects.sh ./tools/buildgen/generate_projects.sh
submodules=`mktemp`
git submodule > $submodules
diff -u $submodules - << EOF
05b155ff59114735ec8cd089f669c4c3d8f59029 third_party/gflags (v2.1.0-45-g05b155f)
3df69d3aefde7671053d4e3c242b228e5d79c83f third_party/openssl (OpenSSL_1_0_2a)
644a6a1da71385e9d7a7a26b3476c93fdd71788c third_party/protobuf (v3.0.0-alpha-1-35-g644a6a1)
50893291621658f355bc5b4d450a8d06a563053d third_party/zlib (v1.2.8)
EOF

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<PropertyGroup />
<ItemDefinitionGroup>
<Link>
<AdditionalDependencies>ssleay32.lib;libeay32.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalLibraryDirectories>$(MSBuildProjectDirectory)\..\packages\grpc.dependencies.openssl.1.0.2.2\build\native\lib\$(PlatformToolset)\$(Platform)\$(Configuration)\static;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
</Link>
</ItemDefinitionGroup>
<ItemGroup />
</Project>

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<PropertyGroup />
<ItemDefinitionGroup>
<Link>
<AdditionalDependencies>zlib.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalLibraryDirectories>$(MSBuildProjectDirectory)\..\packages\grpc.dependencies.zlib.1.2.8.9\build\native\lib\$(PlatformToolset)\$(Platform)\$(Configuration)\static\cdecl;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
</Link>
</ItemDefinitionGroup>
<ItemGroup />
</Project>
Loading…
Cancel
Save