merge with head

pull/1464/head
Yang Gao 10 years ago
commit 1ac95abbf3
  1. 16
      Makefile
  2. 3
      gRPC.podspec
  3. 19
      src/compiler/cpp_generator.cc
  4. 2
      src/core/iomgr/iocp_windows.c
  5. 24
      src/core/iomgr/socket_windows.c
  6. 1
      src/core/iomgr/socket_windows.h
  7. 48
      src/core/iomgr/tcp_client_windows.c
  8. 79
      src/core/iomgr/tcp_server_windows.c
  9. 2
      src/core/iomgr/tcp_windows.c
  10. 29
      src/core/profiling/basic_timers.c
  11. 40
      src/core/profiling/timers_preciseclock.h
  12. 2
      src/core/surface/call_log_batch.c
  13. 14
      src/core/transport/chttp2_transport.c
  14. 157
      src/objective-c/GRPCClient/GRPCCall.m
  15. 8
      src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
  16. 27
      src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
  17. 97
      src/objective-c/GRPCClient/private/GRPCWrappedCall.h
  18. 326
      src/objective-c/GRPCClient/private/GRPCWrappedCall.m
  19. 3
      src/objective-c/GRPCClient/private/NSData+GRPC.m
  20. 4
      src/objective-c/GRPCClient/private/NSDictionary+GRPC.h
  21. 22
      src/objective-c/GRPCClient/private/NSDictionary+GRPC.m
  22. 12
      src/objective-c/GRPCClient/private/NSError+GRPC.h
  23. 7
      templates/Makefile.template
  24. 9
      tools/dockerfile/grpc_php/Dockerfile
  25. 4
      tools/gce_setup/shared_startup_funcs.sh
  26. 13
      vsprojects/openssl.props
  27. 13
      vsprojects/zlib.props

@ -92,7 +92,7 @@ CC_basicprof = $(DEFAULT_CC)
CXX_basicprof = $(DEFAULT_CXX) CXX_basicprof = $(DEFAULT_CXX)
LD_basicprof = $(DEFAULT_CC) LD_basicprof = $(DEFAULT_CC)
LDXX_basicprof = $(DEFAULT_CXX) LDXX_basicprof = $(DEFAULT_CXX)
CPPFLAGS_basicprof = -O2 -DGRPC_BASIC_PROFILER CPPFLAGS_basicprof = -O2 -DGRPC_BASIC_PROFILER -DGRPC_TIMERS_RDTSC
LDFLAGS_basicprof = LDFLAGS_basicprof =
DEFINES_basicprof = NDEBUG DEFINES_basicprof = NDEBUG
@ -2471,7 +2471,7 @@ endif
ifeq ($(SYSTEM),MINGW32) ifeq ($(SYSTEM),MINGW32)
$(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT): $(LIBGRPC_OBJS) $(ZLIB_DEP)$(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT) $(OPENSSL_DEP) $(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT): $(LIBGRPC_OBJS) $(ZLIB_DEP) $(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT) $(OPENSSL_DEP)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc-imp.a -o $(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT) $(LIBGRPC_OBJS) $(LDLIBS) $(LDLIBS_SECURE) $(OPENSSL_MERGE_LIBS) -lgpr-imp $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc-imp.a -o $(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT) $(LIBGRPC_OBJS) $(LDLIBS) $(LDLIBS_SECURE) $(OPENSSL_MERGE_LIBS) -lgpr-imp
@ -2697,7 +2697,7 @@ endif
ifeq ($(SYSTEM),MINGW32) ifeq ($(SYSTEM),MINGW32)
$(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT): $(LIBGRPC_UNSECURE_OBJS) $(ZLIB_DEP)$(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT): $(LIBGRPC_UNSECURE_OBJS) $(ZLIB_DEP) $(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc_unsecure.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc_unsecure-imp.a -o $(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT) $(LIBGRPC_UNSECURE_OBJS) $(LDLIBS) -lgpr-imp $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc_unsecure.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc_unsecure-imp.a -o $(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT) $(LIBGRPC_UNSECURE_OBJS) $(LDLIBS) -lgpr-imp
@ -2825,12 +2825,12 @@ endif
ifeq ($(SYSTEM),MINGW32) ifeq ($(SYSTEM),MINGW32)
$(LIBDIR)/$(CONFIG)/grpc++.$(SHARED_EXT): $(LIBGRPC++_OBJS) $(ZLIB_DEP)$(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT)$(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT) $(OPENSSL_DEP) $(LIBDIR)/$(CONFIG)/grpc++.$(SHARED_EXT): $(LIBGRPC++_OBJS) $(ZLIB_DEP) $(PROTOBUF_DEP) $(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT) $(OPENSSL_DEP)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc++.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc++-imp.a -o $(LIBDIR)/$(CONFIG)/grpc++.$(SHARED_EXT) $(LIBGRPC++_OBJS) $(LDLIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) -lgpr-imp -lgrpc-imp $(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc++.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc++-imp.a -o $(LIBDIR)/$(CONFIG)/grpc++.$(SHARED_EXT) $(LIBGRPC++_OBJS) $(LDLIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) -lgpr-imp -lgrpc-imp
else else
$(LIBDIR)/$(CONFIG)/libgrpc++.$(SHARED_EXT): $(LIBGRPC++_OBJS) $(ZLIB_DEP) $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc.$(SHARED_EXT) $(OPENSSL_DEP) $(LIBDIR)/$(CONFIG)/libgrpc++.$(SHARED_EXT): $(LIBGRPC++_OBJS) $(ZLIB_DEP) $(PROTOBUF_DEP) $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc.$(SHARED_EXT) $(OPENSSL_DEP)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
ifeq ($(SYSTEM),Darwin) ifeq ($(SYSTEM),Darwin)
@ -3043,12 +3043,12 @@ endif
ifeq ($(SYSTEM),MINGW32) ifeq ($(SYSTEM),MINGW32)
$(LIBDIR)/$(CONFIG)/grpc++_unsecure.$(SHARED_EXT): $(LIBGRPC++_UNSECURE_OBJS) $(ZLIB_DEP)$(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT)$(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/grpc++_unsecure.$(SHARED_EXT): $(LIBGRPC++_UNSECURE_OBJS) $(ZLIB_DEP) $(PROTOBUF_DEP) $(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/grpc_unsecure.$(SHARED_EXT)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc++_unsecure.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc++_unsecure-imp.a -o $(LIBDIR)/$(CONFIG)/grpc++_unsecure.$(SHARED_EXT) $(LIBGRPC++_UNSECURE_OBJS) $(LDLIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) -lgpr-imp -lgrpc_unsecure-imp $(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc++_unsecure.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc++_unsecure-imp.a -o $(LIBDIR)/$(CONFIG)/grpc++_unsecure.$(SHARED_EXT) $(LIBGRPC++_UNSECURE_OBJS) $(LDLIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) -lgpr-imp -lgrpc_unsecure-imp
else else
$(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.$(SHARED_EXT): $(LIBGRPC++_UNSECURE_OBJS) $(ZLIB_DEP) $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.$(SHARED_EXT): $(LIBGRPC++_UNSECURE_OBJS) $(ZLIB_DEP) $(PROTOBUF_DEP) $(LIBDIR)/$(CONFIG)/libgpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.$(SHARED_EXT)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
ifeq ($(SYSTEM),Darwin) ifeq ($(SYSTEM),Darwin)
@ -3451,7 +3451,7 @@ endif
ifeq ($(SYSTEM),MINGW32) ifeq ($(SYSTEM),MINGW32)
$(LIBDIR)/$(CONFIG)/grpc_csharp_ext.$(SHARED_EXT): $(LIBGRPC_CSHARP_EXT_OBJS) $(ZLIB_DEP)$(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT)$(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT) $(OPENSSL_DEP) $(LIBDIR)/$(CONFIG)/grpc_csharp_ext.$(SHARED_EXT): $(LIBGRPC_CSHARP_EXT_OBJS) $(ZLIB_DEP) $(LIBDIR)/$(CONFIG)/gpr.$(SHARED_EXT) $(LIBDIR)/$(CONFIG)/grpc.$(SHARED_EXT) $(OPENSSL_DEP)
$(E) "[LD] Linking $@" $(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@` $(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc_csharp_ext.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext-imp.a -o $(LIBDIR)/$(CONFIG)/grpc_csharp_ext.$(SHARED_EXT) $(LIBGRPC_CSHARP_EXT_OBJS) $(LDLIBS) -lgpr-imp -lgrpc-imp $(Q) $(LD) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpc_csharp_ext.def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext-imp.a -o $(LIBDIR)/$(CONFIG)/grpc_csharp_ext.$(SHARED_EXT) $(LIBGRPC_CSHARP_EXT_OBJS) $(LDLIBS) -lgpr-imp -lgrpc-imp

@ -4,7 +4,8 @@ Pod::Spec.new do |s|
s.summary = 'Generic gRPC client library for iOS' s.summary = 'Generic gRPC client library for iOS'
s.homepage = 'https://www.grpc.io' s.homepage = 'https://www.grpc.io'
s.license = 'New BSD' s.license = 'New BSD'
s.authors = { 'Jorge Canizales' => 'jcanizales@google.com' } s.authors = { 'Jorge Canizales' => 'jcanizales@google.com'
'Michael Lumish' => 'mlumish@google.com' }
# s.source = { :git => 'https://github.com/grpc/grpc.git', :tag => 'release-0_5_0' } # s.source = { :git => 'https://github.com/grpc/grpc.git', :tag => 'release-0_5_0' }

@ -828,9 +828,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, " "$Request$, "
"$Response$>(\n" "$Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
@ -840,10 +838,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n" " ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< " " new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n" "$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, "
"::grpc::ServerReader< $Request$>*, $Response$*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
@ -853,10 +848,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n" " ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< " " new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n" "$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, "
"const $Request$*, ::grpc::ServerWriter< $Response$>*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print( printer->Print(
@ -866,10 +858,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n" " ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< " " new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n" "$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, " " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
"::grpc::ServerContext*, "
"::grpc::ServerReaderWriter< $Response$, $Request$>*)>("
"&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n"); " new $Request$, new $Response$));\n");
} }
} }

@ -172,7 +172,9 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
} }
void grpc_iocp_socket_orphan(grpc_winsocket *socket) { void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
GPR_ASSERT(!socket->orphan);
gpr_atm_full_fetch_add(&g_orphans, 1); gpr_atm_full_fetch_add(&g_orphans, 1);
socket->orphan = 1;
} }
static void socket_notify_on_iocp(grpc_winsocket *socket, static void socket_notify_on_iocp(grpc_winsocket *socket,

@ -32,11 +32,12 @@
*/ */
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#ifdef GPR_WINSOCK_SOCKET #ifdef GPR_WINSOCK_SOCKET
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
@ -64,16 +65,21 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
shutdown_op(&socket->write_info); shutdown_op(&socket->write_info);
} }
void grpc_winsocket_orphan(grpc_winsocket *socket) { void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
grpc_iocp_socket_orphan(socket); SOCKET socket = winsocket->socket;
socket->orphan = 1; if (!winsocket->closed_early) {
grpc_iocp_socket_orphan(winsocket);
}
if (winsocket->closed_early) {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
grpc_iomgr_unref(); grpc_iomgr_unref();
closesocket(socket->socket);
} }
void grpc_winsocket_destroy(grpc_winsocket *socket) { void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
gpr_mu_destroy(&socket->state_mu); gpr_mu_destroy(&winsocket->state_mu);
gpr_free(socket); gpr_free(winsocket);
} }
#endif /* GPR_WINSOCK_SOCKET */ #endif /* GPR_WINSOCK_SOCKET */

@ -64,6 +64,7 @@ typedef struct grpc_winsocket {
int added_to_iocp; int added_to_iocp;
int orphan; int orphan;
int closed_early;
} grpc_winsocket; } grpc_winsocket;
/* Create a wrapped windows handle. /* Create a wrapped windows handle.

@ -59,6 +59,7 @@ typedef struct {
gpr_timespec deadline; gpr_timespec deadline;
grpc_alarm alarm; grpc_alarm alarm;
int refs; int refs;
int aborted;
} async_connect; } async_connect;
static void async_connect_cleanup(async_connect *ac) { static void async_connect_cleanup(async_connect *ac) {
@ -70,26 +71,31 @@ static void async_connect_cleanup(async_connect *ac) {
} }
} }
static void on_alarm(void *acp, int success) { static void on_alarm(void *acp, int occured) {
async_connect *ac = acp; async_connect *ac = acp;
gpr_mu_lock(&ac->mu); gpr_mu_lock(&ac->mu);
if (ac->socket != NULL && success) { /* If the alarm didn't occor, it got cancelled. */
if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket); grpc_winsocket_shutdown(ac->socket);
} }
async_connect_cleanup(ac); async_connect_cleanup(ac);
} }
static void on_connect(void *acp, int success) { static void on_connect(void *acp, int from_iocp) {
async_connect *ac = acp; async_connect *ac = acp;
SOCKET sock = ac->socket->socket; SOCKET sock = ac->socket->socket;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
grpc_winsocket_callback_info *info = &ac->socket->write_info; grpc_winsocket_callback_info *info = &ac->socket->write_info;
void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg; void *cb_arg = ac->cb_arg;
int aborted;
grpc_alarm_cancel(&ac->alarm); grpc_alarm_cancel(&ac->alarm);
if (success) { gpr_mu_lock(&ac->mu);
aborted = ac->aborted;
if (from_iocp) {
DWORD transfered_bytes = 0; DWORD transfered_bytes = 0;
DWORD flags; DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@ -107,20 +113,40 @@ static void on_connect(void *acp, int success) {
} }
} else { } else {
gpr_log(GPR_ERROR, "on_connect is shutting down"); gpr_log(GPR_ERROR, "on_connect is shutting down");
goto finish; /* If the connection timeouts, we will still get a notification from
the IOCP whatever happens. So we're just going to flag that connection
as being in the process of being aborted, and wait for the IOCP. We
can't just orphan the socket now, because the IOCP might already have
gotten a successful connection, which is our worst-case scenario.
We need to call our callback now to respect the deadline. */
ac->aborted = 1;
gpr_mu_unlock(&ac->mu);
cb(cb_arg, NULL);
return;
} }
abort(); abort();
finish: finish:
gpr_mu_lock(&ac->mu); /* If we don't have an endpoint, it means the connection failed,
if (!ep) { so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
if (!ep || aborted) {
/* If the connection failed, it means we won't get an IOCP notification,
so let's flag it as already closed. But if the connection was aborted,
while we still got an endpoint, we have to wait for the IOCP to collect
that socket. So let's properly flag that. */
ac->socket->closed_early = !ep;
grpc_winsocket_orphan(ac->socket); grpc_winsocket_orphan(ac->socket);
} }
async_connect_cleanup(ac); async_connect_cleanup(ac);
cb(cb_arg, ep); /* If the connection was aborted, the callback was already called when
the deadline was met. */
if (!aborted) cb(cb_arg, ep);
} }
/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
void *arg, const struct sockaddr *addr, void *arg, const struct sockaddr *addr,
int addr_len, gpr_timespec deadline) { int addr_len, gpr_timespec deadline) {
@ -156,6 +182,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
goto failure; goto failure;
} }
/* Grab the function pointer for ConnectEx for that specific socket.
It may change depending on the interface. */
status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx),
&ioctl_num_bytes, NULL, NULL); &ioctl_num_bytes, NULL, NULL);
@ -178,6 +206,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
info = &socket->write_info; info = &socket->write_info;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
an IOCP notification, so let's ignore it. */
if (!success) { if (!success) {
int error = WSAGetLastError(); int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) { if (error != ERROR_IO_PENDING) {
@ -192,6 +222,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket; ac->socket = socket;
gpr_mu_init(&ac->mu); gpr_mu_init(&ac->mu);
ac->refs = 2; ac->refs = 2;
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_socket_notify_on_write(socket, on_connect, ac); grpc_socket_notify_on_write(socket, on_connect, ac);
@ -202,6 +233,7 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message); gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
if (socket) { if (socket) {
socket->closed_early = 1;
grpc_winsocket_orphan(socket); grpc_winsocket_orphan(socket);
} else if (sock != INVALID_SOCKET) { } else if (sock != INVALID_SOCKET) {
closesocket(sock); closesocket(sock);

@ -55,11 +55,17 @@
/* one listening port */ /* one listening port */
typedef struct server_port { typedef struct server_port {
gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32]; /* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */
SOCKET new_socket; SOCKET new_socket;
/* The listener winsocked. */
grpc_winsocket *socket; grpc_winsocket *socket;
grpc_tcp_server *server; grpc_tcp_server *server;
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down;
} server_port; } server_port;
/* the overall server */ /* the overall server */
@ -79,6 +85,8 @@ struct grpc_tcp_server {
size_t port_capacity; size_t port_capacity;
}; };
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
@ -92,24 +100,29 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
return s; return s;
} }
/* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_tcp_server *s, void grpc_tcp_server_destroy(grpc_tcp_server *s,
void (*shutdown_done)(void *shutdown_done_arg), void (*shutdown_done)(void *shutdown_done_arg),
void *shutdown_done_arg) { void *shutdown_done_arg) {
size_t i; size_t i;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
/* shutdown all fd's */ /* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket); grpc_winsocket_shutdown(s->ports[i].socket);
} }
/* wait while that happens */ /* This happens asynchronously. Wait while that happens. */
while (s->active_ports) { while (s->active_ports) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
/* delete ALL the things */ /* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
closed by the system. */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i]; server_port *sp = &s->ports[i];
sp->socket->closed_early = 1;
grpc_winsocket_orphan(sp->socket); grpc_winsocket_orphan(sp->socket);
} }
gpr_free(s->ports); gpr_free(s->ports);
@ -120,7 +133,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
} }
} }
/* Prepare a recently-created socket for listening. */ /* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr, static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
int addr_len) { int addr_len) {
struct sockaddr_storage sockname_temp; struct sockaddr_storage sockname_temp;
@ -168,8 +181,11 @@ error:
return -1; return -1;
} }
static void on_accept(void *arg, int success); /* start_accept will reference that for the IOCP notification request. */
static void on_accept(void *arg, int from_iocp);
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
static void start_accept(server_port *port) { static void start_accept(server_port *port) {
SOCKET sock = INVALID_SOCKET; SOCKET sock = INVALID_SOCKET;
char *message; char *message;
@ -191,12 +207,13 @@ static void start_accept(server_port *port) {
goto failure; goto failure;
} }
/* TODO(jtattermusch): probably a race here, we regularly get use-after-free on server shutdown */ /* Start the "accept" asynchronously. */
GPR_ASSERT(port->socket != (grpc_winsocket*)0xfeeefeee);
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
addrlen, addrlen, &bytes_received, addrlen, addrlen, &bytes_received,
&port->socket->read_info.overlapped); &port->socket->read_info.overlapped);
/* It is possible to get an accept immediately without delay. However, we
will still get an IOCP notification for it. So let's just ignore it. */
if (!success) { if (!success) {
int error = WSAGetLastError(); int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) { if (error != ERROR_IO_PENDING) {
@ -205,6 +222,8 @@ static void start_accept(server_port *port) {
} }
} }
/* We're ready to do the accept. Calling grpc_socket_notify_on_read may
immediately process an accept that happened in the meantime. */
port->new_socket = sock; port->new_socket = sock;
grpc_socket_notify_on_read(port->socket, on_accept, port); grpc_socket_notify_on_read(port->socket, on_accept, port);
return; return;
@ -216,14 +235,30 @@ failure:
if (sock != INVALID_SOCKET) closesocket(sock); if (sock != INVALID_SOCKET) closesocket(sock);
} }
/* event manager callback when reads are ready */ /* Event manager callback when reads are ready. */
static void on_accept(void *arg, int success) { static void on_accept(void *arg, int from_iocp) {
server_port *sp = arg; server_port *sp = arg;
SOCKET sock = sp->new_socket; SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
if (success) { /* The shutdown sequence is done in two parts. This is the second
part here, acknowledging the IOCP notification, and doing nothing
else, especially not queuing a new accept. */
if (sp->shutting_down) {
GPR_ASSERT(from_iocp);
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
}
gpr_mu_unlock(&sp->server->mu);
return;
}
if (from_iocp) {
/* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */
DWORD transfered_bytes = 0; DWORD transfered_bytes = 0;
DWORD flags; DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@ -237,16 +272,23 @@ static void on_accept(void *arg, int success) {
ep = grpc_tcp_create(grpc_winsocket_create(sock)); ep = grpc_tcp_create(grpc_winsocket_create(sock));
} }
} else { } else {
/* If we're not notified from the IOCP, it means we are asked to shutdown.
This will initiate that shutdown. Calling closesocket will trigger an
IOCP notification, that will call this function a second time, from
the IOCP thread. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
closesocket(sock); closesocket(sock);
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
}
gpr_mu_unlock(&sp->server->mu);
} }
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep); if (ep) sp->server->cb(sp->server->cb_arg, ep);
if (success) { if (from_iocp) {
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
start_accept(sp); start_accept(sp);
} }
} }
@ -262,6 +304,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
if (sock == INVALID_SOCKET) return -1; if (sock == INVALID_SOCKET) return -1;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */
status = status =
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL); &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
@ -286,6 +330,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp = &s->ports[s->nports++]; sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock); sp->socket = grpc_winsocket_create(sock);
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);

@ -130,6 +130,7 @@ static void on_read(void *tcpp, int success) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR; status = GRPC_ENDPOINT_CB_ERROR;
socket->closed_early = 1;
} else { } else {
if (info->bytes_transfered != 0) { if (info->bytes_transfered != 0) {
sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
@ -225,6 +226,7 @@ static void on_write(void *tcpp, int success) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR; status = GRPC_ENDPOINT_CB_ERROR;
tcp->socket->closed_early = 1;
} else { } else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
} }

@ -45,10 +45,17 @@
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <stdio.h> #include <stdio.h>
typedef enum {
BEGIN = '{',
END = '}',
MARK = '.'
} marker_type;
typedef struct grpc_timer_entry { typedef struct grpc_timer_entry {
grpc_precise_clock tm; grpc_precise_clock tm;
gpr_thd_id thd; gpr_thd_id thd;
int tag; int tag;
marker_type type;
void* id; void* id;
const char* file; const char* file;
int line; int line;
@ -89,7 +96,7 @@ static void log_report_locked(grpc_timers_log* log) {
grpc_timer_entry* entry = &(log->log[i]); grpc_timer_entry* entry = &(log->log[i]);
fprintf(fp, "GRPC_LAT_PROF "); fprintf(fp, "GRPC_LAT_PROF ");
grpc_precise_clock_print(&entry->tm, fp); grpc_precise_clock_print(&entry->tm, fp);
fprintf(fp, " %p %d %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, fprintf(fp, " %p %c %d %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->type, entry->tag,
entry->id, entry->file, entry->line); entry->id, entry->file, entry->line);
} }
@ -108,7 +115,7 @@ static void grpc_timers_log_destroy(grpc_timers_log* log) {
gpr_free(log); gpr_free(log);
} }
static void grpc_timers_log_add(grpc_timers_log* log, int tag, void* id, static void grpc_timers_log_add(grpc_timers_log* log, int tag, marker_type type, void* id,
const char* file, int line) { const char* file, int line) {
grpc_timer_entry* entry; grpc_timer_entry* entry;
@ -122,6 +129,7 @@ static void grpc_timers_log_add(grpc_timers_log* log, int tag, void* id,
grpc_precise_clock_now(&entry->tm); grpc_precise_clock_now(&entry->tm);
entry->tag = tag; entry->tag = tag;
entry->type = type;
entry->id = id; entry->id = id;
entry->file = file; entry->file = file;
entry->line = line; entry->line = line;
@ -132,11 +140,22 @@ static void grpc_timers_log_add(grpc_timers_log* log, int tag, void* id,
/* Latency profiler API implementation. */ /* Latency profiler API implementation. */
void grpc_timer_add_mark(int tag, void* id, const char* file, int line) { void grpc_timer_add_mark(int tag, void* id, const char* file, int line) {
grpc_timers_log_add(grpc_timers_log_global, tag, id, file, line); if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, MARK, id, file, line);
}
} }
void grpc_timer_begin(int tag, void* id, const char* file, int line) {} void grpc_timer_begin(int tag, void* id, const char *file, int line) {
void grpc_timer_end(int tag, void* id, const char* file, int line) {} if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, BEGIN, id, file, line);
}
}
void grpc_timer_end(int tag, void* id, const char *file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(grpc_timers_log_global, tag, END, id, file, line);
}
}
/* Basic profiler specific API functions. */ /* Basic profiler specific API functions. */
void grpc_timers_global_init(void) { void grpc_timers_global_init(void) {

@ -34,14 +34,48 @@
#ifndef GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H #ifndef GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H
#define GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H #define GRPC_CORE_PROFILING_TIMERS_PRECISECLOCK_H
#include <grpc/support/sync.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <stdio.h> #include <stdio.h>
typedef struct grpc_precise_clock grpc_precise_clock;
#ifdef GRPC_TIMERS_RDTSC #ifdef GRPC_TIMERS_RDTSC
#error RDTSC timers not currently supported typedef long long int grpc_precise_clock;
#if defined(__i386__)
static void grpc_precise_clock_now(grpc_precise_clock *clk) {
grpc_precise_clock ret;
__asm__ volatile("rdtsc" : "=A" (ret) );
*clk = ret;
}
// ----------------------------------------------------------------
#elif defined(__x86_64__) || defined(__amd64__)
static void grpc_precise_clock_now(grpc_precise_clock *clk) {
unsigned long long low, high;
__asm__ volatile("rdtsc" : "=a" (low), "=d" (high));
*clk = (high << 32) | low;
}
#endif
static gpr_once precise_clock_init = GPR_ONCE_INIT;
static double cycles_per_second = 0.0;
static void grpc_precise_clock_init() {
time_t start = time(NULL);
grpc_precise_clock start_time;
grpc_precise_clock end_time;
while (time(NULL) == start);
grpc_precise_clock_now(&start_time);
while (time(NULL) == start+1);
grpc_precise_clock_now(&end_time);
cycles_per_second = end_time - start_time;
}
static double grpc_precise_clock_scaling_factor() {
gpr_once_init(&precise_clock_init, grpc_precise_clock_init);
return 1e6 / cycles_per_second;
}
static void grpc_precise_clock_print(const grpc_precise_clock* clk, FILE* fp) {
fprintf(fp, "%f", *clk * grpc_precise_clock_scaling_factor());
}
#else #else
typedef struct grpc_precise_clock grpc_precise_clock;
struct grpc_precise_clock { struct grpc_precise_clock {
gpr_timespec clock; gpr_timespec clock;
}; };

@ -112,7 +112,7 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
char *tmp; char *tmp;
size_t i; size_t i;
gpr_log(file, line, severity, gpr_log(file, line, severity,
"grpc_call_start_batch(%p, %p, %d, 0x%x)", call, ops, nops, tag); "grpc_call_start_batch(call=%p, ops=%p, nops=%d, tag=%p)", call, ops, nops, tag);
for(i = 0; i < nops; i++) { for(i = 0; i < nops; i++) {
tmp = grpc_op_string(&ops[i]); tmp = grpc_op_string(&ops[i]);
gpr_log(file, line, severity, "ops[%d]: %s", i, tmp); gpr_log(file, line, severity, "ops[%d]: %s", i, tmp);

@ -611,17 +611,19 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
if (!server_data) { if (!server_data) {
lock(t); lock(t);
s->id = 0; s->id = 0;
s->outgoing_window = 0;
s->incoming_window = 0;
} else { } else {
/* already locked */ /* already locked */
s->id = (gpr_uint32)(gpr_uintptr)server_data; s->id = (gpr_uint32)(gpr_uintptr)server_data;
s->outgoing_window =
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->incoming_stream = s; t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
} }
s->outgoing_window =
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_deadline = gpr_inf_future; s->incoming_deadline = gpr_inf_future;
grpc_sopb_init(&s->writing_sopb); grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb); grpc_sopb_init(&s->callback_sopb);
@ -1017,6 +1019,10 @@ static void maybe_start_some_streams(transport *t) {
GPR_ASSERT(s->id == 0); GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id; s->id = t->next_stream_id;
t->next_stream_id += 2; t->next_stream_id += 2;
s->outgoing_window =
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
stream_list_join(t, s, WRITABLE); stream_list_join(t, s, WRITABLE);
} }

@ -41,23 +41,11 @@
#import "private/GRPCCompletionQueue.h" #import "private/GRPCCompletionQueue.h"
#import "private/GRPCDelegateWrapper.h" #import "private/GRPCDelegateWrapper.h"
#import "private/GRPCMethodName+HTTP2Encoding.h" #import "private/GRPCMethodName+HTTP2Encoding.h"
#import "private/GRPCWrappedCall.h"
#import "private/NSData+GRPC.h" #import "private/NSData+GRPC.h"
#import "private/NSDictionary+GRPC.h" #import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h" #import "private/NSError+GRPC.h"
// A grpc_call_error represents a precondition failure when invoking the
// grpc_call_* functions. If one ever happens, it's a bug in this library.
//
// TODO(jcanizales): Can an application shut down gracefully when a thread other
// than the main one throws an exception?
static void AssertNoErrorInCall(grpc_call_error error) {
if (error != GRPC_CALL_OK) {
@throw [NSException exceptionWithName:NSInternalInconsistencyException
reason:@"Precondition of grpc_call_* not met."
userInfo:nil];
}
}
@interface GRPCCall () <GRXWriteable> @interface GRPCCall () <GRXWriteable>
// Makes it readwrite. // Makes it readwrite.
@property(atomic, strong) NSDictionary *responseMetadata; @property(atomic, strong) NSDictionary *responseMetadata;
@ -65,34 +53,24 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// The following methods of a C gRPC call object aren't reentrant, and thus // The following methods of a C gRPC call object aren't reentrant, and thus
// calls to them must be serialized: // calls to them must be serialized:
// - add_metadata // - start_batch
// - invoke
// - start_write
// - writes_done
// - start_read
// - destroy // - destroy
// The first four are called as part of responding to client commands, but
// start_read we want to call as soon as we're notified that the RPC was
// successfully established (which happens concurrently in the network queue).
// Serialization is achieved by using a private serial queue to operate the
// call object.
// Because add_metadata and invoke are called and return successfully before
// any of the other methods is called, they don't need to use the queue.
// //
// Furthermore, start_write and writes_done can only be called after the // start_batch with a SEND_MESSAGE argument can only be called after the
// WRITE_ACCEPTED event for any previous write is received. This is achieved by // OP_COMPLETE event for any previous write is received. This is achieved by
// pausing the requests writer immediately every time it writes a value, and // pausing the requests writer immediately every time it writes a value, and
// resuming it again when WRITE_ACCEPTED is received. // resuming it again when OP_COMPLETE is received.
// //
// Similarly, start_read can only be called after the READ event for any // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
// previous read is received. This is easier to enforce, as we're writing the // the OP_COMPLETE event for any previous read is received.This is easier to
// received messages into the writeable: start_read is enqueued once upon receiving // enforce, as we're writing the received messages into the writeable:
// the CLIENT_METADATA_READ event, and then once after receiving each READ // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
// event. // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
// each RECV_MESSAGE batch.
@implementation GRPCCall { @implementation GRPCCall {
dispatch_queue_t _callQueue; dispatch_queue_t _callQueue;
grpc_call *_gRPCCall; GRPCWrappedCall *_wrappedCall;
dispatch_once_t _callAlreadyInvoked; dispatch_once_t _callAlreadyInvoked;
GRPCChannel *_channel; GRPCChannel *_channel;
@ -129,10 +107,10 @@ static void AssertNoErrorInCall(grpc_call_error error) {
_completionQueue = [GRPCCompletionQueue completionQueue]; _completionQueue = [GRPCCompletionQueue completionQueue];
_channel = [GRPCChannel channelToHost:host]; _channel = [GRPCChannel channelToHost:host];
_gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel,
method.HTTP2Path.UTF8String, _wrappedCall = [[GRPCWrappedCall alloc] initWithChannel:_channel
host.UTF8String, method:method.HTTP2Path
gpr_inf_future); host:host];
// Serial queue to invoke the non-reentrant methods of the grpc_call object. // Serial queue to invoke the non-reentrant methods of the grpc_call object.
_callQueue = dispatch_queue_create("org.grpc.call", NULL); _callQueue = dispatch_queue_create("org.grpc.call", NULL);
@ -156,7 +134,7 @@ static void AssertNoErrorInCall(grpc_call_error error) {
- (void)cancelCall { - (void)cancelCall {
// Can be called from any thread, any number of times. // Can be called from any thread, any number of times.
AssertNoErrorInCall(grpc_call_cancel(_gRPCCall)); [_wrappedCall cancel];
} }
- (void)cancel { - (void)cancel {
@ -167,9 +145,9 @@ static void AssertNoErrorInCall(grpc_call_error error) {
} }
- (void)dealloc { - (void)dealloc {
grpc_call *gRPCCall = _gRPCCall; __block GRPCWrappedCall *wrappedCall = _wrappedCall;
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
grpc_call_destroy(gRPCCall); wrappedCall = nil;
}); });
} }
@ -177,8 +155,9 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// Only called from the call queue. // Only called from the call queue.
// The handler will be called from the network queue. // The handler will be called from the network queue.
- (void)startReadWithHandler:(GRPCEventHandler)handler { - (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler {
AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler)); // TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]];
} }
// Called initially from the network queue once response headers are received, // Called initially from the network queue once response headers are received,
@ -195,12 +174,13 @@ static void AssertNoErrorInCall(grpc_call_error error) {
__weak GRPCDelegateWrapper *weakWriteable = _responseWriteable; __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
[weakSelf startReadWithHandler:^(grpc_event *event) { [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) {
if (!event->data.read) { if (message == NULL) {
// No more responses from the server. // No more messages from the server
return; return;
} }
NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read]; NSData *data = [NSData grpc_dataWithByteBuffer:message];
grpc_byte_buffer_destroy(message);
if (!data) { if (!data) {
// The app doesn't have enough memory to hold the server response. We // The app doesn't have enough memory to hold the server response. We
// don't want to throw, because the app shouldn't crash for a behavior // don't want to throw, because the app shouldn't crash for a behavior
@ -225,35 +205,11 @@ static void AssertNoErrorInCall(grpc_call_error error) {
#pragma mark Send headers #pragma mark Send headers
- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value {
grpc_metadata metadata;
// Safe to discard const qualifiers; we're not going to modify the contents.
metadata.key = (char *)name.UTF8String;
metadata.value = (char *)value.bytes;
metadata.value_length = value.length;
grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
}
- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value {
grpc_metadata metadata;
// Safe to discard const qualifiers; we're not going to modify the contents.
metadata.key = (char *)name.UTF8String;
metadata.value = (char *)value.UTF8String;
// The trailing \0 isn't encoded in HTTP2.
metadata.value_length = value.length;
grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
}
// TODO(jcanizales): Rename to commitHeaders. // TODO(jcanizales): Rename to commitHeaders.
- (void)sendHeaders:(NSDictionary *)metadata { - (void)sendHeaders:(NSDictionary *)metadata {
for (NSString *name in metadata) { // TODO(jcanizales): Add error handlers for async failures
id value = metadata[name]; [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc]
if ([value isKindOfClass:[NSData class]]) { initWithMetadata:metadata ?: @{} handler:nil]]];
[self addHeaderWithName:name binaryValue:value];
} else if ([value isKindOfClass:[NSString class]]) {
[self addHeaderWithName:name ASCIIValue:value];
}
}
} }
#pragma mark GRXWriteable implementation #pragma mark GRXWriteable implementation
@ -263,24 +219,16 @@ static void AssertNoErrorInCall(grpc_call_error error) {
- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
__weak GRPCCall *weakSelf = self; __weak GRPCCall *weakSelf = self;
GRPCEventHandler resumingHandler = ^(grpc_event *event) { void(^resumingHandler)(void) = ^{
if (event->data.write_accepted != GRPC_OP_OK) { // Resume the request writer.
errorHandler();
}
// Resume the request writer (even in the case of error).
// TODO(jcanizales): No need to do it in the case of errors anymore?
GRPCCall *strongSelf = weakSelf; GRPCCall *strongSelf = weakSelf;
if (strongSelf) { if (strongSelf) {
strongSelf->_requestWriter.state = GRXWriterStateStarted; strongSelf->_requestWriter.state = GRXWriterStateStarted;
} }
}; };
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
grpc_byte_buffer *buffer = message.grpc_byteBuffer; initWithMessage:message
AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall, handler:resumingHandler]] errorHandler:errorHandler];
buffer,
(__bridge_retained void *)resumingHandler,
0));
grpc_byte_buffer_destroy(buffer);
} }
- (void)didReceiveValue:(id)value { - (void)didReceiveValue:(id)value {
@ -303,12 +251,8 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// Only called from the call queue. The error handler will be called from the // Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully. // network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler { - (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
GRPCEventHandler handler = ^(grpc_event *event) { [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
if (event->data.finish_accepted != GRPC_OP_OK) { errorHandler:errorHandler];
errorHandler();
}
};
AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler));
} }
- (void)didFinishWithError:(NSError *)errorOrNil { - (void)didFinishWithError:(NSError *)errorOrNil {
@ -332,32 +276,27 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// after this. // after this.
// The first one (metadataHandler), when the response headers are received. // The first one (metadataHandler), when the response headers are received.
// The second one (completionHandler), whenever the RPC finishes for any reason. // The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler - (void)invokeCallWithMetadataHandler:(void(^)(NSDictionary *))metadataHandler
completionHandler:(GRPCEventHandler)completionHandler { completionHandler:(void(^)(NSError *))completionHandler {
AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall, // TODO(jcanizales): Add error handlers for async failures
_completionQueue.unmanagedQueue, [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
(__bridge_retained void *)metadataHandler, initWithHandler:metadataHandler]]];
(__bridge_retained void *)completionHandler, [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
0)); initWithHandler:completionHandler]]];
} }
- (void)invokeCall { - (void)invokeCall {
__weak GRPCCall *weakSelf = self; __weak GRPCCall *weakSelf = self;
[self invokeCallWithMetadataHandler:^(grpc_event *event) { [self invokeCallWithMetadataHandler:^(NSDictionary *metadata) {
// Response metadata received. // Response metadata received.
// TODO(jcanizales): Name the type of event->data.client_metadata_read
// in the C library so one can actually pass the object to a method.
grpc_metadata *entries = event->data.client_metadata_read.elements;
size_t count = event->data.client_metadata_read.count;
GRPCCall *strongSelf = weakSelf; GRPCCall *strongSelf = weakSelf;
if (strongSelf) { if (strongSelf) {
strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries strongSelf.responseMetadata = metadata;
count:count];
[strongSelf startNextRead]; [strongSelf startNextRead];
} }
} completionHandler:^(grpc_event *event) { } completionHandler:^(NSError *error) {
// TODO(jcanizales): Merge HTTP2 trailers into response metadata. // TODO(jcanizales): Merge HTTP2 trailers into response metadata.
[weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]]; [weakSelf finishWithError:error];
}]; }];
// Now that the RPC has been initiated, request writes can start. // Now that the RPC has been initiated, request writes can start.
[_requestWriter startWithWriteable:self]; [_requestWriter startWithWriteable:self];

@ -32,11 +32,9 @@
*/ */
#import <Foundation/Foundation.h> #import <Foundation/Foundation.h>
#include <grpc/grpc.h>
struct grpc_completion_queue; typedef void(^GRPCQueueCompletionHandler)(grpc_op_error error);
struct grpc_event;
typedef void(^GRPCEventHandler)(struct grpc_event *event);
// This class lets one more easily use grpc_completion_queue. To use it, pass // This class lets one more easily use grpc_completion_queue. To use it, pass
// the value of the unmanagedQueue property of an instance of this class to // the value of the unmanagedQueue property of an instance of this class to
@ -48,7 +46,7 @@ typedef void(^GRPCEventHandler)(struct grpc_event *event);
// Release the GRPCCompletionQueue object only after you are not going to pass // Release the GRPCCompletionQueue object only after you are not going to pass
// any more blocks to the grpc_call that's using it. // any more blocks to the grpc_call that's using it.
@interface GRPCCompletionQueue : NSObject @interface GRPCCompletionQueue : NSObject
@property(nonatomic, readonly) struct grpc_completion_queue *unmanagedQueue; @property(nonatomic, readonly) grpc_completion_queue *unmanagedQueue;
+ (instancetype)completionQueue; + (instancetype)completionQueue;
@end @end

@ -66,30 +66,21 @@
while (YES) { while (YES) {
// The following call blocks until an event is available. // The following call blocks until an event is available.
grpc_event *event = grpc_completion_queue_next(unmanagedQueue, gpr_inf_future); grpc_event *event = grpc_completion_queue_next(unmanagedQueue, gpr_inf_future);
GRPCQueueCompletionHandler handler;
switch (event->type) { switch (event->type) {
case GRPC_WRITE_ACCEPTED: case GRPC_OP_COMPLETE:
case GRPC_FINISH_ACCEPTED: handler = (__bridge_transfer GRPCQueueCompletionHandler)event->tag;
case GRPC_CLIENT_METADATA_READ: handler(event->data.op_complete);
case GRPC_READ:
case GRPC_FINISHED:
if (event->tag) {
GRPCEventHandler handler = (__bridge_transfer GRPCEventHandler) event->tag;
handler(event);
}
grpc_event_finish(event); grpc_event_finish(event);
continue; break;
case GRPC_QUEUE_SHUTDOWN: case GRPC_QUEUE_SHUTDOWN:
grpc_completion_queue_destroy(unmanagedQueue);
grpc_event_finish(event); grpc_event_finish(event);
grpc_completion_queue_destroy(unmanagedQueue);
return; return;
case GRPC_SERVER_RPC_NEW: default:
NSAssert(NO, @"C gRPC library produced a server-only event."); grpc_event_finish(event);
continue; [NSException raise:@"Unrecognized completion type" format:@""];
} }
// This means the C gRPC library produced an event that wasn't known
// when this library was written. To preserve evolvability, ignore the
// unknown event on release builds.
NSAssert(NO, @"C gRPC library produced an unknown event.");
}; };
}); });
} }

@ -0,0 +1,97 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import <Foundation/Foundation.h>
#include <grpc/grpc.h>
#import "GRPCChannel.h"
typedef void(^GRPCCompletionHandler)(NSDictionary *);
@protocol GRPCOp <NSObject>
- (void)getOp:(grpc_op *)op;
- (void)finish;
@end
@interface GRPCOpSendMetadata : NSObject <GRPCOp>
- (instancetype)initWithMetadata:(NSDictionary *)metadata
handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpSendMessage : NSObject <GRPCOp>
- (instancetype)initWithMessage:(NSData *)message
handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpSendClose : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpRecvMetadata : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(NSDictionary *))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpRecvMessage : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(grpc_byte_buffer *))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpRecvStatus : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(NSError *))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCWrappedCall : NSObject
- (instancetype)initWithChannel:(GRPCChannel *)channel
method:(NSString *)method
host:(NSString *)host NS_DESIGNATED_INITIALIZER;
- (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void(^)())errorHandler;
- (void)startBatchWithOperations:(NSArray *)ops;
- (void)cancel;
@end

@ -0,0 +1,326 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import "GRPCWrappedCall.h"
#import <Foundation/Foundation.h>
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#import "GRPCCompletionQueue.h"
#import "NSDictionary+GRPC.h"
#import "NSData+GRPC.h"
#import "NSError+GRPC.h"
@implementation GRPCOpSendMetadata{
void(^_handler)(void);
grpc_metadata *_sendMetadata;
size_t _count;
}
- (instancetype)init {
return [self initWithMetadata:nil handler:nil];
}
- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void (^)(void))handler {
if (self = [super init]) {
_sendMetadata = [metadata grpc_metadataArray];
_count = metadata.count;
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = _count;
op->data.send_initial_metadata.metadata = _sendMetadata;
}
- (void)finish {
if (_handler) {
_handler();
}
}
- (void)dealloc {
gpr_free(_sendMetadata);
}
@end
@implementation GRPCOpSendMessage{
void(^_handler)(void);
grpc_byte_buffer *_byteBuffer;
}
- (instancetype)init {
return [self initWithMessage:nil handler:nil];
}
- (instancetype)initWithMessage:(NSData *)message handler:(void (^)(void))handler {
if (!message) {
[NSException raise:NSInvalidArgumentException format:@"message cannot be nil"];
}
if (self = [super init]) {
_byteBuffer = [message grpc_byteBuffer];
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = _byteBuffer;
}
- (void)finish {
if (_handler) {
_handler();
}
}
- (void)dealloc {
gpr_free(_byteBuffer);
}
@end
@implementation GRPCOpSendClose{
void(^_handler)(void);
}
- (instancetype)init {
return [self initWithHandler:nil];
}
- (instancetype)initWithHandler:(void (^)(void))handler {
if (self = [super init]) {
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
}
- (void)finish {
if (_handler) {
_handler();
}
}
@end
@implementation GRPCOpRecvMetadata{
void(^_handler)(NSDictionary *);
grpc_metadata_array _recvInitialMetadata;
}
- (instancetype) init {
return [self initWithHandler:nil];
}
- (instancetype) initWithHandler:(void (^)(NSDictionary *))handler {
if (self = [super init]) {
_handler = handler;
grpc_metadata_array_init(&_recvInitialMetadata);
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &_recvInitialMetadata;
}
- (void)finish {
NSDictionary *metadata = [NSDictionary
grpc_dictionaryFromMetadata:_recvInitialMetadata.metadata
count:_recvInitialMetadata.count];
if (_handler) {
_handler(metadata);
}
}
- (void)dealloc {
grpc_metadata_array_destroy(&_recvInitialMetadata);
}
@end
@implementation GRPCOpRecvMessage{
void(^_handler)(grpc_byte_buffer *);
grpc_byte_buffer *_recvMessage;
}
- (instancetype)init {
return [self initWithHandler:nil];
}
- (instancetype)initWithHandler:(void (^)(grpc_byte_buffer *))handler {
if (self = [super init]) {
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &_recvMessage;
}
- (void)finish {
if (_handler) {
_handler(_recvMessage);
}
}
@end
@implementation GRPCOpRecvStatus{
void(^_handler)(NSError *);
size_t _detailsCapacity;
grpc_status _status;
}
- (instancetype) init {
return [self initWithHandler:nil];
}
- (instancetype) initWithHandler:(void (^)(NSError *))handler {
if (self = [super init]) {
_handler = handler;
grpc_metadata_array_init(&_status.metadata);
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.status = &_status.status;
op->data.recv_status_on_client.status_details = &_status.details;
op->data.recv_status_on_client.status_details_capacity = &_detailsCapacity;
op->data.recv_status_on_client.trailing_metadata = &_status.metadata;
}
- (void)finish {
if (_handler) {
NSError *error = [NSError grpc_errorFromStatus:&_status];
_handler(error);
}
}
- (void)dealloc {
grpc_metadata_array_destroy(&_status.metadata);
gpr_free(_status.details);
}
@end
@implementation GRPCWrappedCall{
grpc_call *_call;
GRPCCompletionQueue *_queue;
}
- (instancetype)init {
return [self initWithChannel:nil method:nil host:nil];
}
- (instancetype)initWithChannel:(GRPCChannel *)channel
method:(NSString *)method
host:(NSString *)host {
if (!channel || !method || !host) {
[NSException raise:NSInvalidArgumentException
format:@"channel, method, and host cannot be nil."];
}
if (self = [super init]) {
static dispatch_once_t initialization;
dispatch_once(&initialization, ^{
grpc_init();
});
_queue = [GRPCCompletionQueue completionQueue];
if (!_queue) {
return nil;
}
_call = grpc_channel_create_call(channel.unmanagedChannel, _queue.unmanagedQueue,
method.UTF8String, host.UTF8String, gpr_inf_future);
if (_call == NULL) {
return nil;
}
}
return self;
}
- (void)startBatchWithOperations:(NSArray *)operations {
[self startBatchWithOperations:operations errorHandler:nil];
}
- (void)startBatchWithOperations:(NSArray *)operations errorHandler:(void (^)())errorHandler {
size_t nops = operations.count;
grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
size_t i = 0;
for (id op in operations) {
[op getOp:&ops_array[i++]];
}
grpc_call_error error = grpc_call_start_batch(_call, ops_array, nops,
(__bridge_retained void *)(^(grpc_op_error error){
if (error != GRPC_OP_OK) {
if (errorHandler) {
errorHandler();
} else {
return;
}
}
for (id<GRPCOp> operation in operations) {
[operation finish];
}
}));
if (error != GRPC_CALL_OK) {
[NSException raise:NSInternalInconsistencyException
format:@"A precondition for calling grpc_call_start_batch wasn't met"];
}
}
- (void)cancel {
grpc_call_cancel(_call);
}
- (void)dealloc {
grpc_call_destroy(_call);
}
@end

@ -59,6 +59,9 @@ static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array, size_t
@implementation NSData (GRPC) @implementation NSData (GRPC)
+ (instancetype)grpc_dataWithByteBuffer:(grpc_byte_buffer *)buffer { + (instancetype)grpc_dataWithByteBuffer:(grpc_byte_buffer *)buffer {
if (buffer == NULL) {
return nil;
}
NSUInteger length = grpc_byte_buffer_length(buffer); NSUInteger length = grpc_byte_buffer_length(buffer);
char *array = malloc(length * sizeof(*array)); char *array = malloc(length * sizeof(*array));
if (!array) { if (!array) {

@ -32,9 +32,9 @@
*/ */
#import <Foundation/Foundation.h> #import <Foundation/Foundation.h>
#include <grpc/grpc.h>
struct grpc_metadata;
@interface NSDictionary (GRPC) @interface NSDictionary (GRPC)
+ (instancetype)grpc_dictionaryFromMetadata:(struct grpc_metadata *)entries count:(size_t)count; + (instancetype)grpc_dictionaryFromMetadata:(struct grpc_metadata *)entries count:(size_t)count;
- (grpc_metadata *)grpc_metadataArray;
@end @end

@ -33,7 +33,7 @@
#import "NSDictionary+GRPC.h" #import "NSDictionary+GRPC.h"
#include <grpc.h> #include <grpc/support/alloc.h>
@implementation NSDictionary (GRPC) @implementation NSDictionary (GRPC)
+ (instancetype)grpc_dictionaryFromMetadata:(grpc_metadata *)entries count:(size_t)count { + (instancetype)grpc_dictionaryFromMetadata:(grpc_metadata *)entries count:(size_t)count {
@ -53,4 +53,24 @@
} }
return metadata; return metadata;
} }
- (grpc_metadata *)grpc_metadataArray {
grpc_metadata *metadata = gpr_malloc([self count] * sizeof(grpc_metadata));
int i = 0;
for (id key in self) {
id value = self[key];
grpc_metadata *current = &metadata[i];
current->key = [key UTF8String];
if ([value isKindOfClass:[NSData class]]) {
current->value = [value bytes];
} else if ([value isKindOfClass:[NSString class]]) {
current->value = [value UTF8String];
} else {
[NSException raise:NSInvalidArgumentException
format:@"Metadata values must be NSString or NSData."];
}
i += 1;
}
return metadata;
}
@end @end

@ -58,14 +58,12 @@ typedef NS_ENUM(NSInteger, GRPCErrorCode) {
// TODO(jcanizales): This is conflating trailing metadata with Status details. Fix it once there's // TODO(jcanizales): This is conflating trailing metadata with Status details. Fix it once there's
// a decision on how to codify Status. // a decision on how to codify Status.
#include <grpc/status.h> #include <grpc/grpc.h>
struct grpc_metadata; typedef struct grpc_status {
struct grpc_status {
grpc_status_code status; grpc_status_code status;
const char *details; char *details;
size_t metadata_count; grpc_metadata_array metadata;
struct grpc_metadata *metadata_elements; } grpc_status;
};
@interface NSError (GRPC) @interface NSError (GRPC)
// Returns nil if the status is OK. Otherwise, a NSError whose code is one of // Returns nil if the status is OK. Otherwise, a NSError whose code is one of

@ -106,7 +106,7 @@ CC_basicprof = $(DEFAULT_CC)
CXX_basicprof = $(DEFAULT_CXX) CXX_basicprof = $(DEFAULT_CXX)
LD_basicprof = $(DEFAULT_CC) LD_basicprof = $(DEFAULT_CC)
LDXX_basicprof = $(DEFAULT_CXX) LDXX_basicprof = $(DEFAULT_CXX)
CPPFLAGS_basicprof = -O2 -DGRPC_BASIC_PROFILER CPPFLAGS_basicprof = -O2 -DGRPC_BASIC_PROFILER -DGRPC_TIMERS_RDTSC
LDFLAGS_basicprof = LDFLAGS_basicprof =
DEFINES_basicprof = NDEBUG DEFINES_basicprof = NDEBUG
@ -1191,11 +1191,14 @@ endif
lib_deps = ' $(ZLIB_DEP)' lib_deps = ' $(ZLIB_DEP)'
mingw_libs = '' mingw_libs = ''
mingw_lib_deps = ' $(ZLIB_DEP)' mingw_lib_deps = ' $(ZLIB_DEP)'
if lib.language == 'c++':
lib_deps += ' $(PROTOBUF_DEP)'
mingw_lib_deps += ' $(PROTOBUF_DEP)'
for dep in lib.get('deps', []): for dep in lib.get('deps', []):
libs = libs + ' -l' + dep libs = libs + ' -l' + dep
lib_deps = lib_deps + ' $(LIBDIR)/$(CONFIG)/lib' + dep + '.$(SHARED_EXT)' lib_deps = lib_deps + ' $(LIBDIR)/$(CONFIG)/lib' + dep + '.$(SHARED_EXT)'
mingw_libs = mingw_libs + ' -l' + dep + '-imp' mingw_libs = mingw_libs + ' -l' + dep + '-imp'
mingw_lib_deps = mingw_lib_deps + '$(LIBDIR)/$(CONFIG)/' + dep + '.$(SHARED_EXT)' mingw_lib_deps = mingw_lib_deps + ' $(LIBDIR)/$(CONFIG)/' + dep + '.$(SHARED_EXT)'
if lib.get('secure', 'check') == 'yes': if lib.get('secure', 'check') == 'yes':
common = common + ' $(LDLIBS_SECURE) $(OPENSSL_MERGE_LIBS)' common = common + ' $(LDLIBS_SECURE) $(OPENSSL_MERGE_LIBS)'

@ -48,6 +48,13 @@ RUN cd /var/local/git/grpc/src/php/ext/grpc \
RUN cd /var/local/git/grpc/src/php && composer install RUN cd /var/local/git/grpc/src/php && composer install
# Add a cacerts directory containing the Google root pem file, allowing the
# php client to access the production test instance
ADD cacerts cacerts
# Add a service_account directory containing the auth creds file
ADD service_account service_account
RUN cd /var/local/git/grpc/src/php && protoc-gen-php -i tests/interop/ -o tests/interop/ tests/interop/test.proto RUN cd /var/local/git/grpc/src/php && protoc-gen-php -i tests/interop/ -o tests/interop/ tests/interop/test.proto
RUN cd /var/local/git/grpc/src/php && ./bin/run_tests.sh RUN cd /var/local/git/grpc/src/php && ./bin/run_tests.sh

@ -421,6 +421,10 @@ grpc_dockerfile_install() {
grpc_docker_sync_roots_pem $dockerfile_dir/cacerts || return 1; grpc_docker_sync_roots_pem $dockerfile_dir/cacerts || return 1;
grpc_docker_sync_service_account $dockerfile_dir/service_account || return 1; grpc_docker_sync_service_account $dockerfile_dir/service_account || return 1;
} }
[[ $image_label == "grpc/php" ]] && {
grpc_docker_sync_roots_pem $dockerfile_dir/cacerts || return 1;
grpc_docker_sync_service_account $dockerfile_dir/service_account || return 1;
}
[[ $image_label == "grpc/cxx" ]] && { [[ $image_label == "grpc/cxx" ]] && {
grpc_docker_sync_roots_pem $dockerfile_dir/cacerts || return 1; grpc_docker_sync_roots_pem $dockerfile_dir/cacerts || return 1;
grpc_docker_sync_service_account $dockerfile_dir/service_account || return 1; grpc_docker_sync_service_account $dockerfile_dir/service_account || return 1;

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<PropertyGroup />
<ItemDefinitionGroup>
<Link>
<AdditionalDependencies>ssleay32.lib;libeay32.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalLibraryDirectories>$(MSBuildProjectDirectory)\..\packages\grpc.dependencies.openssl.1.0.2.2\build\native\lib\$(PlatformToolset)\$(Platform)\$(Configuration)\static;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
</Link>
</ItemDefinitionGroup>
<ItemGroup />
</Project>

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<PropertyGroup />
<ItemDefinitionGroup>
<Link>
<AdditionalDependencies>zlib.lib;%(AdditionalDependencies)</AdditionalDependencies>
<AdditionalLibraryDirectories>$(MSBuildProjectDirectory)\..\packages\grpc.dependencies.zlib.1.2.8.9\build\native\lib\$(PlatformToolset)\$(Platform)\$(Configuration)\static\cdecl;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
</Link>
</ItemDefinitionGroup>
<ItemGroup />
</Project>
Loading…
Cancel
Save