Merge branch 'master' into node_route_guide_sample

pull/601/head
murgatroid99 10 years ago
commit a6793f2dd6
  1. 5
      INSTALL
  2. 175
      Makefile
  3. 33
      README.md
  4. 44
      build.json
  5. 4
      include/grpc++/async_unary_call.h
  6. 9
      include/grpc++/completion_queue.h
  7. 4
      include/grpc++/impl/call.h
  8. 11
      include/grpc++/server_context.h
  9. 8
      include/grpc++/stream.h
  10. 2
      src/compiler/cpp_generator.cc
  11. 332
      src/compiler/python_generator.cc
  12. 51
      src/compiler/python_generator.h
  13. 87
      src/compiler/python_plugin.cc
  14. 9
      src/core/README.md
  15. 29
      src/core/channel/client_setup.c
  16. 6
      src/core/channel/client_setup.h
  17. 4
      src/core/iomgr/fd_posix.c
  18. 6
      src/core/security/credentials.c
  19. 9
      src/core/security/json_token.c
  20. 2
      src/core/support/cpu_posix.c
  21. 5
      src/core/surface/channel_create.c
  22. 5
      src/core/surface/secure_channel_create.c
  23. 9
      src/cpp/README.md
  24. 1
      src/cpp/client/client_unary_call.cc
  25. 3
      src/cpp/common/call.cc
  26. 42
      src/cpp/common/completion_queue.cc
  27. 173
      src/cpp/server/server.cc
  28. 71
      src/cpp/server/server_context.cc
  29. 4
      src/csharp/GrpcApi/MathExamples.cs
  30. 16
      src/csharp/GrpcApi/MathGrpc.cs
  31. 6
      src/csharp/GrpcApi/MathServiceImpl.cs
  32. 4
      src/csharp/GrpcApi/Properties/AssemblyInfo.cs
  33. 23
      src/csharp/GrpcApi/TestServiceGrpc.cs
  34. 8
      src/csharp/GrpcApiTests/MathClientServerTests.cs
  35. 4
      src/csharp/GrpcApiTests/Properties/AssemblyInfo.cs
  36. 4
      src/csharp/GrpcCore/Call.cs
  37. 4
      src/csharp/GrpcCore/Calls.cs
  38. 4
      src/csharp/GrpcCore/Channel.cs
  39. 2
      src/csharp/GrpcCore/ClientStreamingAsyncResult.cs
  40. 4
      src/csharp/GrpcCore/GrpcEnvironment.cs
  41. 14
      src/csharp/GrpcCore/Internal/AsyncCall.cs
  42. 4
      src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs
  43. 6
      src/csharp/GrpcCore/Internal/CallSafeHandle.cs
  44. 2
      src/csharp/GrpcCore/Internal/ChannelSafeHandle.cs
  45. 4
      src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs
  46. 2
      src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs
  47. 2
      src/csharp/GrpcCore/Internal/Enums.cs
  48. 6
      src/csharp/GrpcCore/Internal/GrpcThreadPool.cs
  49. 2
      src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs
  50. 6
      src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
  51. 6
      src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs
  52. 2
      src/csharp/GrpcCore/Internal/Timespec.cs
  53. 2
      src/csharp/GrpcCore/Marshaller.cs
  54. 2
      src/csharp/GrpcCore/Method.cs
  55. 4
      src/csharp/GrpcCore/Properties/AssemblyInfo.cs
  56. 2
      src/csharp/GrpcCore/RpcException.cs
  57. 10
      src/csharp/GrpcCore/Server.cs
  58. 6
      src/csharp/GrpcCore/ServerCallHandler.cs
  59. 2
      src/csharp/GrpcCore/ServerCalls.cs
  60. 2
      src/csharp/GrpcCore/ServerServiceDefinition.cs
  61. 2
      src/csharp/GrpcCore/Status.cs
  62. 40
      src/csharp/GrpcCore/StatusCode.cs
  63. 4
      src/csharp/GrpcCore/Utils/RecordingObserver.cs
  64. 2
      src/csharp/GrpcCore/Utils/RecordingQueue.cs
  65. 14
      src/csharp/GrpcCoreTests/ClientServerTest.cs
  66. 6
      src/csharp/GrpcCoreTests/GrpcEnvironmentTest.cs
  67. 4
      src/csharp/GrpcCoreTests/Properties/AssemblyInfo.cs
  68. 10
      src/csharp/GrpcCoreTests/ServerTest.cs
  69. 6
      src/csharp/GrpcCoreTests/TestResult.xml
  70. 6
      src/csharp/GrpcCoreTests/TimespecTest.cs
  71. 8
      src/csharp/InteropClient/Client.cs
  72. 9
      src/csharp/InteropClient/InteropClient.csproj
  73. 4
      src/csharp/InteropClient/Properties/AssemblyInfo.cs
  74. 2
      src/csharp/MathClient/MathClient.cs
  75. 4
      src/csharp/MathClient/Properties/AssemblyInfo.cs
  76. 4
      src/csharp/README.md
  77. 28
      src/node/.jshintrc
  78. 4
      src/node/README.md
  79. 5
      src/node/examples/math_server.js
  80. 33
      src/node/examples/perf_test.js
  81. 2
      src/node/examples/stock_server.js
  82. 1
      src/node/ext/node_grpc.cc
  83. 35
      src/node/index.js
  84. 67
      src/node/interop/interop_client.js
  85. 2
      src/node/interop/interop_server.js
  86. 10
      src/node/interop/messages.proto
  87. 17
      src/node/package.json
  88. 212
      src/node/src/client.js
  89. 2
      src/node/src/common.js
  90. 12
      src/node/src/server.js
  91. 2
      src/node/test/call_test.js
  92. 2
      src/node/test/channel_test.js
  93. 2
      src/node/test/constant_test.js
  94. 4
      src/node/test/end_to_end_test.js
  95. 23
      src/node/test/interop_sanity_test.js
  96. 4
      src/node/test/math_client_test.js
  97. 4
      src/node/test/surface_test.js
  98. 18
      src/objective-c/.gitignore
  99. 90
      src/objective-c/GRPCClient/GRPCCall.h
  100. 406
      src/objective-c/GRPCClient/GRPCCall.m
  101. Some files were not shown because too many files have changed in this diff Show More

@ -23,6 +23,11 @@ Building the python wrapper requires the following:
# apt-get install python-all-dev python-virtualenv
If you want to install in a different directory than the default /usr/lib, you can
override it on the command line:
# make install prefix=/opt
*******************************
* More detailled instructions *

@ -332,9 +332,9 @@ endif
.SECONDARY = %.pb.h %.pb.cc
PROTOC_PLUGINS= $(BINDIR)/$(CONFIG)/cpp_plugin $(BINDIR)/$(CONFIG)/ruby_plugin
PROTOC_PLUGINS = $(BINDIR)/$(CONFIG)/grpc_cpp_plugin $(BINDIR)/$(CONFIG)/grpc_ruby_plugin $(BINDIR)/$(CONFIG)/grpc_python_plugin
ifeq ($(DEP_MISSING),)
all: static shared
all: static shared plugins
dep_error:
@echo "You shouldn't see this message - all of your dependencies are correct."
else
@ -498,7 +498,9 @@ timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test
transport_metadata_test: $(BINDIR)/$(CONFIG)/transport_metadata_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
cpp_plugin: $(BINDIR)/$(CONFIG)/cpp_plugin
grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin
grpc_ruby_plugin: $(BINDIR)/$(CONFIG)/grpc_ruby_plugin
grpc_python_plugin: $(BINDIR)/$(CONFIG)/grpc_python_plugin
credentials_test: $(BINDIR)/$(CONFIG)/credentials_test
end2end_test: $(BINDIR)/$(CONFIG)/end2end_test
interop_client: $(BINDIR)/$(CONFIG)/interop_client
@ -508,7 +510,6 @@ pubsub_publisher_test: $(BINDIR)/$(CONFIG)/pubsub_publisher_test
pubsub_subscriber_test: $(BINDIR)/$(CONFIG)/pubsub_subscriber_test
qps_client: $(BINDIR)/$(CONFIG)/qps_client
qps_server: $(BINDIR)/$(CONFIG)/qps_server
ruby_plugin: $(BINDIR)/$(CONFIG)/ruby_plugin
status_test: $(BINDIR)/$(CONFIG)/status_test
thread_pool_test: $(BINDIR)/$(CONFIG)/thread_pool_test
chttp2_fake_security_cancel_after_accept_test: $(BINDIR)/$(CONFIG)/chttp2_fake_security_cancel_after_accept_test
@ -905,6 +906,8 @@ shared_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++.$(SHARED_EXT)
shared_csharp: shared_c $(LIBDIR)/$(CONFIG)/libgrpc_csharp_ext.$(SHARED_EXT)
grpc_csharp_ext: shared_csharp
plugins: $(PROTOC_PLUGINS)
privatelibs: privatelibs_c privatelibs_cxx
privatelibs_c: $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_fake_security.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_fullstack.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_fullstack_uds.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_simple_ssl_fullstack.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_simple_ssl_with_oauth2_fullstack.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_socket_pair.a $(LIBDIR)/$(CONFIG)/libend2end_fixture_chttp2_socket_pair_one_byte_at_a_time.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept_and_writes_closed.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_invoke.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_before_invoke.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_in_a_vacuum.a $(LIBDIR)/$(CONFIG)/libend2end_test_census_simple_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_disappearing_server.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_inflight_calls.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_tags.a $(LIBDIR)/$(CONFIG)/libend2end_test_empty_batch.a $(LIBDIR)/$(CONFIG)/libend2end_test_graceful_server_shutdown.a $(LIBDIR)/$(CONFIG)/libend2end_test_invoke_large_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_max_concurrent_streams.a $(LIBDIR)/$(CONFIG)/libend2end_test_no_op.a $(LIBDIR)/$(CONFIG)/libend2end_test_ping_pong_streaming.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_binary_metadata_and_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_metadata_and_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_large_metadata.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_payload.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_delayed_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_request.a $(LIBDIR)/$(CONFIG)/libend2end_test_thread_stress.a $(LIBDIR)/$(CONFIG)/libend2end_test_writes_done_hangs_with_pending_read.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_accept_and_writes_closed_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_after_invoke_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_before_invoke_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_cancel_in_a_vacuum_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_census_simple_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_disappearing_server_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_inflight_calls_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_early_server_shutdown_finishes_tags_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_graceful_server_shutdown_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_invoke_large_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_max_concurrent_streams_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_no_op_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_ping_pong_streaming_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_binary_metadata_and_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_metadata_and_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_response_with_trailing_metadata_and_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_large_metadata_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_request_with_payload_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_delayed_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_simple_request_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_thread_stress_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_test_writes_done_hangs_with_pending_read_legacy.a $(LIBDIR)/$(CONFIG)/libend2end_certs.a
@ -1806,7 +1809,7 @@ else
$(GENDIR)/examples/pubsub/empty.pb.cc: examples/pubsub/empty.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1815,7 +1818,7 @@ else
$(GENDIR)/examples/pubsub/label.pb.cc: examples/pubsub/label.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1824,7 +1827,7 @@ else
$(GENDIR)/examples/pubsub/pubsub.pb.cc: examples/pubsub/pubsub.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1833,7 +1836,7 @@ else
$(GENDIR)/test/cpp/interop/empty.pb.cc: test/cpp/interop/empty.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1842,7 +1845,7 @@ else
$(GENDIR)/test/cpp/interop/messages.pb.cc: test/cpp/interop/messages.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1851,7 +1854,7 @@ else
$(GENDIR)/test/cpp/interop/test.pb.cc: test/cpp/interop/test.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1860,7 +1863,7 @@ else
$(GENDIR)/test/cpp/qps/qpstest.pb.cc: test/cpp/qps/qpstest.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1869,7 +1872,7 @@ else
$(GENDIR)/test/cpp/util/echo.pb.cc: test/cpp/util/echo.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1878,7 +1881,7 @@ else
$(GENDIR)/test/cpp/util/echo_duplicate.pb.cc: test/cpp/util/echo_duplicate.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
ifeq ($(NO_PROTOC),true)
@ -1887,7 +1890,7 @@ else
$(GENDIR)/test/cpp/util/messages.pb.cc: test/cpp/util/messages.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/cpp_plugin $<
$(Q) $(PROTOC) --cpp_out=$(GENDIR) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $<
endif
@ -1912,7 +1915,7 @@ $(OBJDIR)/$(CONFIG)/%.o : %.cc
$(Q) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -MMD -MF $(addsuffix .dep, $(basename $@)) -c -o $@ $<
install: install_c install_cxx
install: install_c install_cxx install-protobuf install-plugins
install_c: install-headers_c install-static_c install-shared_c
@ -1946,6 +1949,8 @@ install-static_cxx: static_cxx strip-static_cxx
$(E) "[INSTALL] Installing libgrpc++.a"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(prefix)/lib/libgrpc++.a
install-shared_c: shared_c strip-shared_c
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing gpr.$(SHARED_EXT)"
@ -1986,7 +1991,8 @@ ifneq ($(SYSTEM),Darwin)
endif
endif
install-shared_cxx: shared_cxx strip-shared_cxx
install-shared_cxx: shared_cxx strip-shared_cxx install-shared_c
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing grpc++.$(SHARED_EXT)"
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/grpc++.$(SHARED_EXT) $(prefix)/lib/grpc++.$(SHARED_EXT)
@ -2004,6 +2010,7 @@ ifneq ($(SYSTEM),Darwin)
endif
endif
install-shared_csharp: shared_csharp strip-shared_csharp
ifeq ($(SYSTEM),MINGW32)
$(E) "[INSTALL] Installing grpc_csharp_ext.$(SHARED_EXT)"
@ -2022,7 +2029,30 @@ ifneq ($(SYSTEM),Darwin)
endif
endif
install-protobuf: $(PROTOBUF_DEP)
ifneq ($(PROTOBUF_DEP),)
$(E) "[INSTALL] Installing embedded protobufs"
$(Q) $(MAKE) -C third_party/protobuf install prefix=$(prefix)
ifneq ($(SYSTEM),MINGW32)
ifneq ($(SYSTEM),Darwin)
$(Q) ldconfig
endif
endif
endif
install-plugins: $(PROTOC_PLUGINS)
ifeq ($(SYSTEM),MINGW32)
$(Q) false
else
$(E) "[INSTALL] Installing grpc protoc plugins"
$(Q) $(INSTALL) $(BINDIR)/$(CONFIG)/grpc_cpp_plugin $(prefix)/bin/grpc_cpp_plugin
$(Q) $(INSTALL) $(BINDIR)/$(CONFIG)/grpc_ruby_plugin $(prefix)/bin/grpc_ruby_plugin
$(Q) $(INSTALL) $(BINDIR)/$(CONFIG)/grpc_python_plugin $(prefix)/bin/grpc_python_plugin
endif
clean:
$(E) "[CLEAN] Cleaning build directories."
$(Q) $(RM) -rf $(OBJDIR) $(LIBDIR) $(BINDIR) $(GENDIR)
@ -3036,12 +3066,15 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \
include/grpc++/impl/service_type.h \
include/grpc++/server.h \
include/grpc++/server_builder.h \
include/grpc++/server_context.h \
include/grpc++/server_credentials.h \
include/grpc++/status.h \
include/grpc++/status_code_enum.h \
include/grpc++/stream.h \
include/grpc++/thread_pool_interface.h \
LIBGRPC++_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_SRC))))
@ -7350,35 +7383,99 @@ endif
endif
CPP_PLUGIN_SRC = \
GRPC_CPP_PLUGIN_SRC = \
src/compiler/cpp_generator.cc \
src/compiler/cpp_plugin.cc \
CPP_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CPP_PLUGIN_SRC))))
GRPC_CPP_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_CPP_PLUGIN_SRC))))
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/cpp_plugin: protobuf_dep_error
$(BINDIR)/$(CONFIG)/grpc_cpp_plugin: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/cpp_plugin: $(PROTOBUF_DEP) $(CPP_PLUGIN_OBJS)
$(BINDIR)/$(CONFIG)/grpc_cpp_plugin: $(PROTOBUF_DEP) $(GRPC_CPP_PLUGIN_OBJS)
$(E) "[HOSTLD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(CPP_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/cpp_plugin
$(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_CPP_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_cpp_plugin
endif
$(OBJDIR)/$(CONFIG)/src/compiler/cpp_generator.o:
$(OBJDIR)/$(CONFIG)/src/compiler/cpp_plugin.o:
deps_cpp_plugin: $(CPP_PLUGIN_OBJS:.o=.dep)
deps_grpc_cpp_plugin: $(GRPC_CPP_PLUGIN_OBJS:.o=.dep)
ifneq ($(NO_DEPS),true)
-include $(GRPC_CPP_PLUGIN_OBJS:.o=.dep)
endif
GRPC_RUBY_PLUGIN_SRC = \
src/compiler/ruby_generator.cc \
src/compiler/ruby_plugin.cc \
GRPC_RUBY_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_RUBY_PLUGIN_SRC))))
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/grpc_ruby_plugin: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/grpc_ruby_plugin: $(PROTOBUF_DEP) $(GRPC_RUBY_PLUGIN_OBJS)
$(E) "[HOSTLD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_RUBY_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_ruby_plugin
endif
$(OBJDIR)/$(CONFIG)/src/compiler/ruby_generator.o:
$(OBJDIR)/$(CONFIG)/src/compiler/ruby_plugin.o:
deps_grpc_ruby_plugin: $(GRPC_RUBY_PLUGIN_OBJS:.o=.dep)
ifneq ($(NO_DEPS),true)
-include $(GRPC_RUBY_PLUGIN_OBJS:.o=.dep)
endif
GRPC_PYTHON_PLUGIN_SRC = \
src/compiler/python_generator.cc \
src/compiler/python_plugin.cc \
GRPC_PYTHON_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_PYTHON_PLUGIN_SRC))))
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/grpc_python_plugin: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/grpc_python_plugin: $(PROTOBUF_DEP) $(GRPC_PYTHON_PLUGIN_OBJS)
$(E) "[HOSTLD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_PYTHON_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_python_plugin
endif
$(OBJDIR)/$(CONFIG)/src/compiler/python_generator.o:
$(OBJDIR)/$(CONFIG)/src/compiler/python_plugin.o:
deps_grpc_python_plugin: $(GRPC_PYTHON_PLUGIN_OBJS:.o=.dep)
ifneq ($(NO_DEPS),true)
-include $(CPP_PLUGIN_OBJS:.o=.dep)
-include $(GRPC_PYTHON_PLUGIN_OBJS:.o=.dep)
endif
@ -7677,38 +7774,6 @@ endif
endif
RUBY_PLUGIN_SRC = \
src/compiler/ruby_generator.cc \
src/compiler/ruby_plugin.cc \
RUBY_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(RUBY_PLUGIN_SRC))))
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/ruby_plugin: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/ruby_plugin: $(PROTOBUF_DEP) $(RUBY_PLUGIN_OBJS)
$(E) "[HOSTLD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(RUBY_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/ruby_plugin
endif
$(OBJDIR)/$(CONFIG)/src/compiler/ruby_generator.o:
$(OBJDIR)/$(CONFIG)/src/compiler/ruby_plugin.o:
deps_ruby_plugin: $(RUBY_PLUGIN_OBJS:.o=.dep)
ifneq ($(NO_DEPS),true)
-include $(RUBY_PLUGIN_OBJS:.o=.dep)
endif
STATUS_TEST_SRC = \
test/cpp/util/status_test.cc \

@ -7,6 +7,39 @@ Copyright 2015 Google Inc.
See grpc/INSTALL for installation instructions for various platforms.
#Repository Structure
This repository contains source code for gRPC libraries for multiple lanugages written on top
of shared C core library [src/core] (src/core).
* C++ source code: [src/cpp] (src/cpp)
* Python source code: [src/python] (src/python)
* Ruby source code: [src/ruby] (src/ruby)
* NodeJS source code: [src/node] (src/node)
* PHP source code: [src/php] (src/php)
* C# source code: [src/csharp] (src/csharp)
* Objective-C source code: [src/objective-c] (src/objective-c)
Java source code is in [grpc-java] (http://github.com/grpc/grpc-java) repository.
Go source code is in [grpc-go] (http://github.com/grpc/grpc-go) repository.
#Documentation
You can find more detailed documentation and examples in the [grpc-common repository](http://github.com/grpc/grpc-common).
#Current Status of libraries
Libraries in different languages are in different state of development. We are seeking contributions for all of these libraries.
* shared C core library [src/core] (src/core) : Early adopter ready - Alpha.
* C++ Library: [src/cpp] (src/cpp) : Early adopter ready - Alpha.
* Python Library: [src/python] (src/python) : Early adopter ready - Alpha.
* Ruby Library: [src/ruby] (src/ruby) : Early adopter ready - Alpha.
* NodeJS Library: [src/node] (src/node) : Early adopter ready - Alpha.
* PHP Library: [src/php] (src/php) : Pre-Alpha.
* C# Library: [src/csharp] (src/csharp) : Pre-Alpha.
* Objective-C Library: [src/objective-c] (src/objective-c): Pre-Alpha.
#Overview

@ -239,7 +239,6 @@
"include/grpc/support/useful.h"
],
"headers": [
"src/core/support/cpu.h",
"src/core/support/env.h",
"src/core/support/file.h",
"src/core/support/murmur_hash.h",
@ -411,12 +410,15 @@
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/server.h",
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/status.h",
"include/grpc++/stream.h"
"include/grpc++/status_code_enum.h",
"include/grpc++/stream.h",
"include/grpc++/thread_pool_interface.h"
],
"headers": [
"src/cpp/client/channel.h",
@ -1575,7 +1577,7 @@
]
},
{
"name": "cpp_plugin",
"name": "grpc_cpp_plugin",
"build": "protoc",
"language": "c++",
"headers": [
@ -1589,6 +1591,31 @@
"deps": [],
"secure": false
},
{
"name": "grpc_ruby_plugin",
"build": "protoc",
"language": "c++",
"src": [
"src/compiler/ruby_generator.cc",
"src/compiler/ruby_plugin.cc"
],
"deps": [],
"secure": false
},
{
"name": "grpc_python_plugin",
"build": "protoc",
"language": "c++",
"headers": [
"src/compiler/python_generator.h"
],
"src": [
"src/compiler/python_generator.cc",
"src/compiler/python_plugin.cc"
],
"deps": [],
"secure": false
},
{
"name": "credentials_test",
"build": "test",
@ -1746,17 +1773,6 @@
"gpr"
]
},
{
"name": "ruby_plugin",
"build": "protoc",
"language": "c++",
"src": [
"src/compiler/ruby_generator.cc",
"src/compiler/ruby_plugin.cc"
],
"deps": [],
"secure": false
},
{
"name": "status_test",
"build": "test",

@ -111,8 +111,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -124,8 +122,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

@ -55,6 +55,7 @@ class ServerReaderWriter;
class CompletionQueue;
class Server;
class ServerContext;
class CompletionQueueTag {
public:
@ -62,7 +63,9 @@ class CompletionQueueTag {
// Called prior to returning from Next(), return value
// is the status of the operation (return status is the default thing
// to do)
virtual void FinalizeResult(void **tag, bool *status) = 0;
// If this function returns false, the tag is dropped and not returned
// from the completion queue
virtual bool FinalizeResult(void **tag, bool *status) = 0;
};
// grpc_completion_queue wrapper class
@ -99,6 +102,7 @@ class CompletionQueue {
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
friend Status BlockingUnaryCall(ChannelInterface *channel,
const RpcMethod &method,
ClientContext *context,
@ -109,6 +113,9 @@ class CompletionQueue {
// Cannot be mixed with calls to Next().
bool Pluck(CompletionQueueTag *tag);
// Does a single polling pluck on tag
void TryPluck(CompletionQueueTag *tag);
grpc_completion_queue *cq_; // owned
};

@ -65,7 +65,7 @@ class CallOpBuffer : public CompletionQueueTag {
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendInitialMetadata(ClientContext *ctx);
void AddRecvInitialMetadata(ClientContext* ctx);
void AddRecvInitialMetadata(ClientContext *ctx);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
@ -80,7 +80,7 @@ class CallOpBuffer : public CompletionQueueTag {
void FillOps(grpc_op *ops, size_t *nops);
// Called by completion queue just prior to returning from Next() or Pluck()
void FinalizeResult(void **tag, bool *status) override;
bool FinalizeResult(void **tag, bool *status) override;
bool got_message = false;

@ -60,7 +60,9 @@ class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
class Call;
class CallOpBuffer;
class CompletionQueue;
class Server;
// Interface of server side rpc context.
@ -76,6 +78,8 @@ class ServerContext final {
void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
bool IsCancelled();
const std::multimap<grpc::string, grpc::string>& client_metadata() {
return client_metadata_;
}
@ -97,11 +101,18 @@ class ServerContext final {
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
class CompletionOp;
void BeginCompletionOp(Call* call);
ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count);
CompletionOp* completion_op_ = nullptr;
std::chrono::system_clock::time_point deadline_;
grpc_call* call_ = nullptr;
CompletionQueue* cq_ = nullptr;
bool sent_initial_metadata_ = false;
std::multimap<grpc::string, grpc::string> client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;

@ -582,8 +582,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -595,8 +593,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -643,8 +639,6 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -699,8 +693,6 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

@ -386,7 +386,7 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return new ClientAsyncResponseReader< $Response$>("
" return new ::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
"context, request, tag);\n"

@ -0,0 +1,332 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <cassert>
#include <cctype>
#include <map>
#include <ostream>
#include <sstream>
#include "src/compiler/python_generator.h"
#include <google/protobuf/io/printer.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/descriptor.h>
using google::protobuf::FileDescriptor;
using google::protobuf::ServiceDescriptor;
using google::protobuf::MethodDescriptor;
using google::protobuf::io::Printer;
using google::protobuf::io::StringOutputStream;
using std::initializer_list;
using std::map;
using std::string;
namespace grpc_python_generator {
namespace {
//////////////////////////////////
// BEGIN FORMATTING BOILERPLATE //
//////////////////////////////////
// Converts an initializer list of the form { key0, value0, key1, value1, ... }
// into a map of key* to value*. Is merely a readability helper for later code.
map<string, string> ListToDict(const initializer_list<string>& values) {
assert(values.size() % 2 == 0);
map<string, string> value_map;
auto value_iter = values.begin();
for (unsigned i = 0; i < values.size()/2; ++i) {
string key = *value_iter;
++value_iter;
string value = *value_iter;
value_map[key] = value;
++value_iter;
}
return value_map;
}
// Provides RAII indentation handling. Use as:
// {
// IndentScope raii_my_indent_var_name_here(my_py_printer);
// // constructor indented my_py_printer
// ...
// // destructor called at end of scope, un-indenting my_py_printer
// }
class IndentScope {
public:
explicit IndentScope(Printer* printer) : printer_(printer) {
printer_->Indent();
}
~IndentScope() {
printer_->Outdent();
}
private:
Printer* printer_;
};
////////////////////////////////
// END FORMATTING BOILERPLATE //
////////////////////////////////
void PrintService(const ServiceDescriptor* service,
Printer* out) {
string doc = "<fill me in later!>";
map<string, string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
out->Print(dict, "class $Service$Service(object):\n");
{
IndentScope raii_class_indent(out);
out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
out->Print("def __init__(self):\n");
{
IndentScope raii_method_indent(out);
out->Print("pass\n");
}
}
}
void PrintServicer(const ServiceDescriptor* service,
Printer* out) {
string doc = "<fill me in later!>";
map<string, string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
out->Print(dict, "class $Service$Servicer(object):\n");
{
IndentScope raii_class_indent(out);
out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
for (int i = 0; i < service->method_count(); ++i) {
auto meth = service->method(i);
out->Print("def $Method$(self, arg):\n", "Method", meth->name());
{
IndentScope raii_method_indent(out);
out->Print("raise NotImplementedError()\n");
}
}
}
}
void PrintStub(const ServiceDescriptor* service,
Printer* out) {
string doc = "<fill me in later!>";
map<string, string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
out->Print(dict, "class $Service$Stub(object):\n");
{
IndentScope raii_class_indent(out);
out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
auto methdict = ListToDict({"Method", meth->name()});
out->Print(methdict, "def $Method$(self, arg):\n");
{
IndentScope raii_method_indent(out);
out->Print("raise NotImplementedError()\n");
}
out->Print(methdict, "$Method$.async = None\n");
}
}
}
void PrintStubImpl(const ServiceDescriptor* service,
Printer* out) {
map<string, string> dict = ListToDict({
"Service", service->name(),
});
out->Print(dict, "class _$Service$Stub($Service$Stub):\n");
{
IndentScope raii_class_indent(out);
out->Print("def __init__(self, face_stub, default_timeout):\n");
{
IndentScope raii_method_indent(out);
out->Print("self._face_stub = face_stub\n"
"self._default_timeout = default_timeout\n"
"stub_self = self\n");
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
bool server_streaming = meth->server_streaming();
bool client_streaming = meth->client_streaming();
std::string blocking_call, future_call;
if (server_streaming) {
if (client_streaming) {
blocking_call = "stub_self._face_stub.inline_stream_in_stream_out";
future_call = blocking_call;
} else {
blocking_call = "stub_self._face_stub.inline_value_in_stream_out";
future_call = blocking_call;
}
} else {
if (client_streaming) {
blocking_call = "stub_self._face_stub.blocking_stream_in_value_out";
future_call = "stub_self._face_stub.future_stream_in_value_out";
} else {
blocking_call = "stub_self._face_stub.blocking_value_in_value_out";
future_call = "stub_self._face_stub.future_value_in_value_out";
}
}
// TODO(atash): use the solution described at
// http://stackoverflow.com/a/2982 to bind 'async' attribute
// functions to def'd functions instead of using callable attributes.
auto methdict = ListToDict({
"Method", meth->name(),
"BlockingCall", blocking_call,
"FutureCall", future_call
});
out->Print(methdict, "class $Method$(object):\n");
{
IndentScope raii_callable_indent(out);
out->Print("def __call__(self, arg):\n");
{
IndentScope raii_callable_call_indent(out);
out->Print(methdict,
"return $BlockingCall$(\"$Method$\", arg, "
"stub_self._default_timeout)\n");
}
out->Print("def async(self, arg):\n");
{
IndentScope raii_callable_async_indent(out);
out->Print(methdict,
"return $FutureCall$(\"$Method$\", arg, "
"stub_self._default_timeout)\n");
}
}
out->Print(methdict, "self.$Method$ = $Method$()\n");
}
}
}
}
void PrintStubGenerators(const ServiceDescriptor* service, Printer* out) {
map<string, string> dict = ListToDict({
"Service", service->name(),
});
// Write out a generator of linked pairs of Server/Stub
out->Print(dict, "def mock_$Service$(servicer, default_timeout):\n");
{
IndentScope raii_mock_indent(out);
out->Print("value_in_value_out = {}\n"
"value_in_stream_out = {}\n"
"stream_in_value_out = {}\n"
"stream_in_stream_out = {}\n");
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
std::string super_interface, meth_dict;
bool server_streaming = meth->server_streaming();
bool client_streaming = meth->client_streaming();
if (server_streaming) {
if (client_streaming) {
super_interface = "InlineStreamInStreamOutMethod";
meth_dict = "stream_in_stream_out";
} else {
super_interface = "InlineValueInStreamOutMethod";
meth_dict = "value_in_stream_out";
}
} else {
if (client_streaming) {
super_interface = "InlineStreamInValueOutMethod";
meth_dict = "stream_in_value_out";
} else {
super_interface = "InlineValueInValueOutMethod";
meth_dict = "value_in_value_out";
}
}
map<string, string> methdict = ListToDict({
"Method", meth->name(),
"SuperInterface", super_interface,
"MethodDict", meth_dict
});
out->Print(
methdict, "class $Method$(_face_interfaces.$SuperInterface$):\n");
{
IndentScope raii_inline_class_indent(out);
out->Print("def service(self, request, context):\n");
{
IndentScope raii_inline_class_fn_indent(out);
out->Print(methdict, "return servicer.$Method$(request)\n");
}
}
out->Print(methdict, "$MethodDict$['$Method$'] = $Method$()\n");
}
out->Print(
"face_linked_pair = _face_testing.server_and_stub(default_timeout,"
"inline_value_in_value_out_methods=value_in_value_out,"
"inline_value_in_stream_out_methods=value_in_stream_out,"
"inline_stream_in_value_out_methods=stream_in_value_out,"
"inline_stream_in_stream_out_methods=stream_in_stream_out)\n");
out->Print("class LinkedPair(object):\n");
{
IndentScope raii_linked_pair(out);
out->Print("def __init__(self, server, stub):\n");
{
IndentScope raii_linked_pair_init(out);
out->Print("self.server = server\n"
"self.stub = stub\n");
}
}
out->Print(
dict,
"stub = _$Service$Stub(face_linked_pair.stub, default_timeout)\n");
out->Print("return LinkedPair(None, stub)\n");
}
}
} // namespace
string GetServices(const FileDescriptor* file) {
string output;
StringOutputStream output_stream(&output);
Printer out(&output_stream, '$');
out.Print("import abc\n");
out.Print("import google3\n");
out.Print("from grpc.framework.face import demonstration as _face_testing\n");
out.Print("from grpc.framework.face import interfaces as _face_interfaces\n");
for (int i = 0; i < file->service_count(); ++i) {
auto service = file->service(i);
PrintService(service, &out);
PrintServicer(service, &out);
PrintStub(service, &out);
PrintStubImpl(service, &out);
PrintStubGenerators(service, &out);
}
return output;
}
} // namespace grpc_python_generator

@ -0,0 +1,51 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_COMPILER_PYTHON_GENERATOR_H__
#define __GRPC_COMPILER_PYTHON_GENERATOR_H__
#include <string>
namespace google {
namespace protobuf {
class FileDescriptor;
} // namespace protobuf
} // namespace google
namespace grpc_python_generator {
std::string GetServices(const google::protobuf::FileDescriptor* file);
} // namespace grpc_python_generator
#endif // __GRPC_COMPILER_PYTHON_GENERATOR_H__

@ -0,0 +1,87 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Generates a Python gRPC service interface out of Protobuf IDL.
#include <memory>
#include <string>
#include "src/compiler/python_generator.h"
#include <google/protobuf/compiler/code_generator.h>
#include <google/protobuf/compiler/plugin.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/descriptor.h>
using google::protobuf::FileDescriptor;
using google::protobuf::compiler::CodeGenerator;
using google::protobuf::compiler::GeneratorContext;
using google::protobuf::compiler::PluginMain;
using google::protobuf::io::CodedOutputStream;
using google::protobuf::io::ZeroCopyOutputStream;
using std::string;
class PythonGrpcGenerator : public CodeGenerator {
public:
PythonGrpcGenerator() {}
~PythonGrpcGenerator() override {}
bool Generate(const FileDescriptor* file,
const string& parameter,
GeneratorContext* context,
string* error) const override {
// Get output file name.
string file_name;
static const int proto_suffix_length = 6; // length of ".proto"
if (file->name().size() > proto_suffix_length &&
file->name().find_last_of(".proto") == file->name().size() - 1) {
file_name = file->name().substr(
0, file->name().size() - proto_suffix_length) + "_pb2.py";
} else {
*error = "Invalid proto file name. Proto file must end with .proto";
return false;
}
std::unique_ptr<ZeroCopyOutputStream> output(
context->OpenForInsert(file_name, "module_scope"));
CodedOutputStream coded_out(output.get());
string code = grpc_python_generator::GetServices(file);
coded_out.WriteRaw(code.data(), code.size());
return true;
}
};
int main(int argc, char* argv[]) {
PythonGrpcGenerator generator;
return PluginMain(argc, argv, &generator);
}

@ -0,0 +1,9 @@
#Overview
This directory contains source code for shared C library. Libraries in other languages in this repository (C++, Ruby,
Python, PHP, NodeJS, Objective-C) are layered on top of this library.
#Status
Alpha : Ready for early adopters

@ -49,8 +49,11 @@ struct grpc_client_setup {
grpc_alarm backoff_alarm;
gpr_timespec current_backoff_interval;
int in_alarm;
int in_cb;
int cancelled;
gpr_mu mu;
gpr_cv cv;
grpc_client_setup_request *active_request;
int refs;
};
@ -67,6 +70,7 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
static void destroy_setup(grpc_client_setup *s) {
gpr_mu_destroy(&s->mu);
gpr_cv_destroy(&s->cv);
s->done(s->user_data);
grpc_channel_args_destroy(s->args);
gpr_free(s);
@ -111,6 +115,10 @@ static void setup_cancel(grpc_transport_setup *sp) {
int cancel_alarm = 0;
gpr_mu_lock(&s->mu);
s->cancelled = 1;
while (s->in_cb) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
}
GPR_ASSERT(s->refs > 0);
/* effectively cancels the current request (if any) */
@ -129,6 +137,24 @@ static void setup_cancel(grpc_transport_setup *sp) {
}
}
int grpc_client_setup_cb_begin(grpc_client_setup_request *r) {
gpr_mu_lock(&r->setup->mu);
if (r->setup->cancelled) {
gpr_mu_unlock(&r->setup->mu);
return 0;
}
r->setup->in_cb++;
gpr_mu_unlock(&r->setup->mu);
return 1;
}
void grpc_client_setup_cb_end(grpc_client_setup_request *r) {
gpr_mu_lock(&r->setup->mu);
r->setup->in_cb--;
if (r->setup->cancelled) gpr_cv_signal(&r->setup->cv);
gpr_mu_unlock(&r->setup->mu);
}
/* vtable for transport setup */
static const grpc_transport_setup_vtable setup_vtable = {setup_initiate,
setup_cancel};
@ -142,6 +168,7 @@ void grpc_client_setup_create_and_attach(
s->base.vtable = &setup_vtable;
gpr_mu_init(&s->mu);
gpr_cv_init(&s->cv);
s->refs = 1;
s->mdctx = mdctx;
s->initiate = initiate;
@ -151,6 +178,8 @@ void grpc_client_setup_create_and_attach(
s->args = grpc_channel_args_copy(args);
s->current_backoff_interval = gpr_time_from_micros(1000000);
s->in_alarm = 0;
s->in_cb = 0;
s->cancelled = 0;
grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
}

@ -58,6 +58,12 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
const grpc_channel_args *grpc_client_setup_get_channel_args(
grpc_client_setup_request *r);
/* Call before calling back into the setup listener, and call only if
this function returns 1. If it returns 1, also promise to call
grpc_client_setup_cb_end */
int grpc_client_setup_cb_begin(grpc_client_setup_request *r);
void grpc_client_setup_cb_end(grpc_client_setup_request *r);
/* Get the deadline for a request passed in to initiate. Implementations should
make a best effort to honor this deadline. */
gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r);

@ -295,6 +295,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_fd_watcher *watcher) {
/* keep track of pollers that have requested our events, in case they change
*/
grpc_fd_ref(fd);
gpr_mu_lock(&fd->watcher_mu);
watcher->next = &fd->watcher_root;
watcher->prev = watcher->next->prev;
@ -312,6 +314,8 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
gpr_mu_unlock(&watcher->fd->watcher_mu);
grpc_fd_unref(watcher->fd);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {

@ -336,6 +336,12 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_credentials_status status = GRPC_CREDENTIALS_OK;
grpc_json *json = NULL;
if (response == NULL) {
gpr_log(GPR_ERROR, "Received NULL response.");
status = GRPC_CREDENTIALS_ERROR;
goto end;
}
if (response->body_length > 0) {
null_terminated_body = gpr_malloc(response->body_length + 1);
null_terminated_body[response->body_length] = '\0';

@ -206,15 +206,14 @@ static char *encoded_jwt_claim(const grpc_auth_json_key *json_key,
char *result = NULL;
gpr_timespec now = gpr_now();
gpr_timespec expiration = gpr_time_add(now, token_lifetime);
/* log10(2^64) ~= 20 */
char now_str[24];
char expiration_str[24];
char now_str[GPR_LTOA_MIN_BUFSIZE];
char expiration_str[GPR_LTOA_MIN_BUFSIZE];
if (gpr_time_cmp(token_lifetime, grpc_max_auth_token_lifetime) > 0) {
gpr_log(GPR_INFO, "Cropping token lifetime to maximum allowed value.");
expiration = gpr_time_add(now, grpc_max_auth_token_lifetime);
}
sprintf(now_str, "%ld", now.tv_sec);
sprintf(expiration_str, "%ld", expiration.tv_sec);
gpr_ltoa(now.tv_sec, now_str);
gpr_ltoa(expiration.tv_sec, expiration_str);
child = create_child(NULL, json, "iss", json_key->client_email,
GRPC_JSON_STRING);

@ -35,8 +35,6 @@
#ifdef GPR_CPU_POSIX
#include "src/core/support/cpu.h"
#include <errno.h>
#include <unistd.h>
#include <string.h>

@ -107,13 +107,16 @@ static void on_connect(void *rp, grpc_endpoint *tcp) {
} else {
return;
}
} else {
} else if (grpc_client_setup_cb_begin(r->cs_request)) {
grpc_create_chttp2_transport(
r->setup->setup_callback, r->setup->setup_user_data,
grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0,
grpc_client_setup_get_mdctx(r->cs_request), 1);
grpc_client_setup_cb_end(r->cs_request);
done(r, 1);
return;
} else {
done(r, 0);
}
}

@ -97,12 +97,15 @@ static void on_secure_transport_setup_done(void *rp,
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
done(r, 0);
} else {
} else if (grpc_client_setup_cb_begin(r->cs_request)) {
grpc_create_chttp2_transport(
r->setup->setup_callback, r->setup->setup_user_data,
grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint,
NULL, 0, grpc_client_setup_get_mdctx(r->cs_request), 1);
grpc_client_setup_cb_end(r->cs_request);
done(r, 1);
} else {
done(r, 0);
}
}

@ -0,0 +1,9 @@
#Overview
This directory contains source code for C++ implementation of gRPC.
#Status
Alpha : Ready for early adopters

@ -60,4 +60,5 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
return status;
}
} // namespace grpc

@ -231,7 +231,7 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
}
}
void CallOpBuffer::FinalizeResult(void** tag, bool* status) {
bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
// Release send buffers.
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
@ -274,6 +274,7 @@ void CallOpBuffer::FinalizeResult(void** tag, bool* status) {
if (recv_closed_) {
*recv_closed_ = cancelled_buf_ != 0;
}
return true;
}
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)

@ -43,7 +43,7 @@ namespace grpc {
CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); }
CompletionQueue::CompletionQueue(grpc_completion_queue *take) : cq_(take) {}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
@ -52,34 +52,48 @@ void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
// Helper class so we can declare a unique_ptr with grpc_event
class EventDeleter {
public:
void operator()(grpc_event *ev) {
void operator()(grpc_event* ev) {
if (ev) grpc_event_finish(ev);
}
};
bool CompletionQueue::Next(void **tag, bool *ok) {
bool CompletionQueue::Next(void** tag, bool* ok) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
return false;
for (;;) {
ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
return false;
}
auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
return true;
}
}
auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
*tag = cq_tag;
cq_tag->FinalizeResult(tag, ok);
return true;
}
bool CompletionQueue::Pluck(CompletionQueueTag *tag) {
bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future));
bool ok = ev->data.op_complete == GRPC_OP_OK;
void *ignored = tag;
tag->FinalizeResult(&ignored, &ok);
void* ignored = tag;
GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == tag);
return ok;
}
void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_past));
if (!ev) return;
bool ok = ev->data.op_complete == GRPC_OP_OK;
void* ignored = tag;
// the tag must be swallowed if using TryPluck
GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
}
} // namespace grpc

@ -49,84 +49,6 @@
namespace grpc {
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
ServerCredentials* creds)
: started_(false),
shutdown_(false),
num_running_cb_(0),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) {
if (creds) {
server_ =
grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
} else {
server_ = grpc_server_create(cq_.cq(), nullptr);
}
}
Server::Server() {
// Should not be called.
GPR_ASSERT(false);
}
Server::~Server() {
std::unique_lock<std::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else {
lock.unlock();
}
grpc_server_destroy(server_);
if (thread_pool_owned_) {
delete thread_pool_;
}
}
bool Server::RegisterService(RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
void* tag =
grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
sync_methods_.emplace_back(method, tag);
}
return true;
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
GPR_ASSERT(service->dispatch_impl_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->dispatch_impl_ = this;
service->request_args_ = new void* [service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag =
grpc_server_register_method(server_, service->method_names_[i], nullptr,
service->completion_queue()->cq());
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]);
return false;
}
service->request_args_[i] = tag;
}
return true;
}
int Server::AddPort(const grpc::string& addr) {
GPR_ASSERT(!started_);
if (secure_) {
return grpc_server_add_secure_http2_port(server_, addr.c_str());
} else {
return grpc_server_add_http2_port(server_, addr.c_str());
}
}
class Server::SyncRequest final : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@ -163,10 +85,11 @@ class Server::SyncRequest final : public CompletionQueueTag {
this));
}
void FinalizeResult(void** tag, bool* status) override {
bool FinalizeResult(void** tag, bool* status) override {
if (!*status) {
grpc_completion_queue_destroy(cq_);
}
return true;
}
class CallData final {
@ -204,6 +127,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
if (has_response_payload_) {
res.reset(method_->AllocateResponseProto());
}
ctx_.BeginCompletionOp(&call_);
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
CallOpBuffer buf;
@ -214,10 +138,12 @@ class Server::SyncRequest final : public CompletionQueueTag {
buf.AddSendMessage(*res);
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
bool cancelled;
buf.AddServerRecvClose(&cancelled);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
}
private:
@ -243,6 +169,84 @@ class Server::SyncRequest final : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
ServerCredentials* creds)
: started_(false),
shutdown_(false),
num_running_cb_(0),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) {
if (creds) {
server_ =
grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
} else {
server_ = grpc_server_create(cq_.cq(), nullptr);
}
}
Server::Server() {
// Should not be called.
GPR_ASSERT(false);
}
Server::~Server() {
std::unique_lock<std::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else {
lock.unlock();
}
grpc_server_destroy(server_);
if (thread_pool_owned_) {
delete thread_pool_;
}
}
bool Server::RegisterService(RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
void* tag =
grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
sync_methods_.emplace_back(method, tag);
}
return true;
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
GPR_ASSERT(service->dispatch_impl_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->dispatch_impl_ = this;
service->request_args_ = new void* [service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag =
grpc_server_register_method(server_, service->method_names_[i], nullptr,
service->completion_queue()->cq());
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]);
return false;
}
service->request_args_[i] = tag;
}
return true;
}
int Server::AddPort(const grpc::string& addr) {
GPR_ASSERT(!started_);
if (secure_) {
return grpc_server_add_secure_http2_port(server_, addr.c_str());
} else {
return grpc_server_add_http2_port(server_, addr.c_str());
}
}
bool Server::Start() {
GPR_ASSERT(!started_);
started_ = true;
@ -310,11 +314,11 @@ class Server::AsyncRequest final : public CompletionQueueTag {
grpc_metadata_array_destroy(&array_);
}
void FinalizeResult(void** tag, bool* status) override {
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
if (*status && request_) {
if (payload_) {
*status = *status && DeserializeProto(payload_, request_);
*status = DeserializeProto(payload_, request_);
} else {
*status = false;
}
@ -331,8 +335,11 @@ class Server::AsyncRequest final : public CompletionQueueTag {
}
ctx_->call_ = call_;
Call call(call_, server_, cq_);
ctx_->BeginCompletionOp(&call);
// just the pointers inside call are copied here
stream_->BindCall(&call);
delete this;
return true;
}
private:

@ -32,15 +32,67 @@
*/
#include <grpc++/server_context.h>
#include <mutex>
#include <grpc++/impl/call.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/cpp/util/time.h"
namespace grpc {
// CompletionOp
class ServerContext::CompletionOp final : public CallOpBuffer {
public:
CompletionOp();
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq);
void Unref();
private:
std::mutex mu_;
int refs_ = 2; // initial refs: one in the server context, one in the cq
bool finalized_ = false;
bool cancelled_ = false;
};
ServerContext::CompletionOp::CompletionOp() { AddServerRecvClose(&cancelled_); }
void ServerContext::CompletionOp::Unref() {
std::unique_lock<std::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
delete this;
}
}
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
std::lock_guard<std::mutex> g(mu_);
return finalized_ ? cancelled_ : false;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
std::unique_lock<std::mutex> lock(mu_);
finalized_ = true;
if (!*status) cancelled_ = true;
if (--refs_ == 0) {
lock.unlock();
delete this;
}
return false;
}
// ServerContext body
ServerContext::ServerContext() {}
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata,
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
: deadline_(Timespec2Timepoint(deadline)) {
for (size_t i = 0; i < metadata_count; i++) {
@ -55,16 +107,29 @@ ServerContext::~ServerContext() {
if (call_) {
grpc_call_destroy(call_);
}
if (completion_op_) {
completion_op_->Unref();
}
}
void ServerContext::BeginCompletionOp(Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
call->PerformOps(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
const grpc::string& value) {
const grpc::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
}
void ServerContext::AddTrailingMetadata(const grpc::string& key,
const grpc::string& value) {
const grpc::string& value) {
trailing_metadata_.insert(std::make_pair(key, value));
}
bool ServerContext::IsCancelled() {
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
} // namespace grpc

@ -32,10 +32,10 @@
#endregion
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core.Utils;
using System.Threading.Tasks;
using Grpc.Core.Utils;
namespace math
{

@ -32,11 +32,11 @@
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
namespace math
{
@ -99,31 +99,31 @@ namespace math
public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>(divMethod, channel);
var call = new Grpc.Core.Call<DivArgs, DivReply>(divMethod, channel);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>(divMethod, channel);
var call = new Grpc.Core.Call<DivArgs, DivReply>(divMethod, channel);
return Calls.AsyncUnaryCall(call, request, token);
}
public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel);
var call = new Grpc.Core.Call<FibArgs, Num>(fibMethod, channel);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<Num, Num>(sumMethod, channel);
var call = new Grpc.Core.Call<Num, Num>(sumMethod, channel);
return Calls.AsyncClientStreamingCall(call, token);
}
public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>(divManyMethod, channel);
var call = new Grpc.Core.Call<DivArgs, DivReply>(divManyMethod, channel);
return Calls.DuplexStreamingCall(call, responseObserver, token);
}
}

@ -32,11 +32,11 @@
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core.Utils;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Utils;
namespace math
{

@ -8,13 +8,13 @@ using System.Runtime.CompilerServices;
[assembly: AssemblyConfiguration ("")]
[assembly: AssemblyCompany ("")]
[assembly: AssemblyProduct ("")]
[assembly: AssemblyCopyright ("jtattermusch")]
[assembly: AssemblyCopyright ("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark ("")]
[assembly: AssemblyCulture ("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion ("1.0.*")]
[assembly: AssemblyVersion ("0.9.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]

@ -30,12 +30,13 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
namespace grpc.testing
{
@ -119,49 +120,49 @@ namespace grpc.testing
public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<Empty, Empty>(emptyCallMethod, channel);
var call = new Grpc.Core.Call<Empty, Empty>(emptyCallMethod, channel);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<Empty, Empty>(emptyCallMethod, channel);
var call = new Grpc.Core.Call<Empty, Empty>(emptyCallMethod, channel);
return Calls.AsyncUnaryCall(call, request, token);
}
public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<SimpleRequest, SimpleResponse>(unaryCallMethod, channel);
var call = new Grpc.Core.Call<SimpleRequest, SimpleResponse>(unaryCallMethod, channel);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<SimpleRequest, SimpleResponse>(unaryCallMethod, channel);
var call = new Grpc.Core.Call<SimpleRequest, SimpleResponse>(unaryCallMethod, channel);
return Calls.AsyncUnaryCall(call, request, token);
}
public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel);
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<StreamingInputCallRequest, StreamingInputCallResponse>(streamingInputCallMethod, channel);
var call = new Grpc.Core.Call<StreamingInputCallRequest, StreamingInputCallResponse>(streamingInputCallMethod, channel);
return Calls.AsyncClientStreamingCall(call, token);
}
public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(fullDuplexCallMethod, channel);
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(fullDuplexCallMethod, channel);
return Calls.DuplexStreamingCall(call, responseObserver, token);
}
public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(halfDuplexCallMethod, channel);
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(halfDuplexCallMethod, channel);
return Calls.DuplexStreamingCall(call, responseObserver, token);
}
}

@ -32,12 +32,12 @@
#endregion
using System;
using NUnit.Framework;
using Google.GRPC.Core;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
using System.Collections.Generic;
using Grpc.Core;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace math.Tests
{

@ -8,13 +8,13 @@ using System.Runtime.CompilerServices;
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("jtattermusch")]
[assembly: AssemblyCopyright("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("0.9.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]

@ -32,9 +32,9 @@
#endregion
using System;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core
namespace Grpc.Core
{
public class Call<TRequest, TResponse>
{

@ -34,9 +34,9 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core
namespace Grpc.Core
{
// NOTE: this class is work-in-progress

@ -35,9 +35,9 @@ using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core
namespace Grpc.Core
{
public class Channel : IDisposable
{

@ -34,7 +34,7 @@
using System;
using System.Threading.Tasks;
namespace Google.GRPC.Core
namespace Grpc.Core
{
/// <summary>
/// Return type for client streaming async method.

@ -32,10 +32,10 @@
#endregion
using System;
using Google.GRPC.Core.Internal;
using System.Runtime.InteropServices;
using Grpc.Core.Internal;
namespace Google.GRPC.Core
namespace Grpc.Core
{
/// <summary>
/// Encapsulates initialization and shutdown of gRPC library.

@ -32,14 +32,14 @@
#endregion
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// Handles native call lifecycle and provides convenience methods.
@ -381,7 +381,7 @@ namespace Google.GRPC.Core.Internal
private void CompleteStreamObserver(Status status)
{
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
if (status.StatusCode != StatusCode.OK)
{
// TODO: wrap to handle exceptions;
readObserver.OnError(new RpcException(status));
@ -413,13 +413,13 @@ namespace Google.GRPC.Core.Internal
if (error != GRPCOpError.GRPC_OP_OK)
{
tcs.SetException(new RpcException(
new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.")
new Status(StatusCode.Internal, "Internal error occured.")
));
return;
}
var status = ctx.GetReceivedStatus();
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
if (status.StatusCode != StatusCode.OK)
{
tcs.SetException(new RpcException(status));
return;

@ -33,9 +33,9 @@
using System;
using System.Runtime.InteropServices;
using Google.GRPC.Core;
using Grpc.Core;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// Not owned version of

@ -32,11 +32,11 @@
#endregion
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using Google.GRPC.Core;
using System.Runtime.InteropServices;
using Grpc.Core;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
//TODO: rename the delegate
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);

@ -36,7 +36,7 @@ using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_channel from <grpc/grpc.h>

@ -32,9 +32,9 @@
#endregion
using System;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{

@ -35,7 +35,7 @@ using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_completion_queue from <grpc/grpc.h>

@ -34,7 +34,7 @@
using System;
using System.Runtime.InteropServices;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// from grpc/grpc.h

@ -32,13 +32,13 @@
#endregion
using System;
using Google.GRPC.Core.Internal;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Grpc.Core.Internal;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of threads polling on the same completion queue.

@ -34,7 +34,7 @@
using System;
using System.Runtime.InteropServices;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// Safe handle to wrap native objects.

@ -32,11 +32,11 @@
#endregion
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);

@ -32,9 +32,9 @@
#endregion
using System;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
@ -52,7 +52,7 @@ namespace Google.GRPC.Core.Internal
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait();
}
public void OnError(Exception error)

@ -35,7 +35,7 @@ using System;
using System.Runtime.InteropServices;
using System.Threading;
namespace Google.GRPC.Core.Internal
namespace Grpc.Core.Internal
{
/// <summary>
/// gpr_timespec from grpc/support/time.h

@ -33,7 +33,7 @@
using System;
namespace Google.GRPC.Core
namespace Grpc.Core
{
/// <summary>
/// For serializing and deserializing messages.

@ -33,7 +33,7 @@
using System;
namespace Google.GRPC.Core
namespace Grpc.Core
{
public enum MethodType
{

@ -8,13 +8,13 @@ using System.Runtime.CompilerServices;
[assembly: AssemblyConfiguration ("")]
[assembly: AssemblyCompany ("")]
[assembly: AssemblyProduct ("")]
[assembly: AssemblyCopyright ("jtattermusch")]
[assembly: AssemblyCopyright ("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark ("")]
[assembly: AssemblyCulture ("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion ("1.0.*")]
[assembly: AssemblyVersion ("0.9.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]

@ -33,7 +33,7 @@
using System;
namespace Google.GRPC.Core
namespace Grpc.Core
{
public class RpcException : Exception
{

@ -32,14 +32,14 @@
#endregion
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Google.GRPC.Core.Internal;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Google.GRPC.Core
namespace Grpc.Core
{
/// <summary>
/// Server is implemented only to be able to do

@ -32,9 +32,9 @@
#endregion
using System;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
namespace Google.GRPC.Core
namespace Grpc.Core
{
internal interface IServerCallHandler
{
@ -111,7 +111,7 @@ namespace Google.GRPC.Core
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait();
finishedTask.Wait();
}

@ -33,7 +33,7 @@
using System;
namespace Google.GRPC.Core
namespace Grpc.Core
{
// TODO: perhaps add also serverSideStreaming and clientSideStreaming

@ -34,7 +34,7 @@
using System;
using System.Collections.Generic;
namespace Google.GRPC.Core
namespace Grpc.Core
{
public class ServerServiceDefinition
{

@ -34,7 +34,7 @@
using System;
using System.Runtime.InteropServices;
namespace Google.GRPC.Core
namespace Grpc.Core
{
/// <summary>
/// Represents RPC result.

@ -33,22 +33,22 @@
using System;
namespace Google.GRPC.Core
namespace Grpc.Core
{
// TODO: element names should changed to comply with C# naming conventions.
/// <summary>
/// grpc_status_code from grpc/status.h
/// based on grpc_status_code from grpc/status.h
/// </summary>
public enum StatusCode
{
/* Not an error; returned on success
HTTP Mapping: 200 OK */
GRPC_STATUS_OK = 0,
OK = 0,
/* The operation was cancelled (typically by the caller).
HTTP Mapping: 499 Client Closed Request */
GRPC_STATUS_CANCELLED = 1,
Cancelled = 1,
/* Unknown error. An example of where this error may be returned is
if a Status value received from another address space belongs to
an error-space that is not known in this address space. Also
@ -56,14 +56,14 @@ namespace Google.GRPC.Core
may be converted to this error.
HTTP Mapping: 500 Internal Server Error */
GRPC_STATUS_UNKNOWN = 2,
Unknown = 2,
/* Client specified an invalid argument. Note that this differs
from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments
that are problematic regardless of the state of the system
(e.g., a malformed file name).
HTTP Mapping: 400 Bad Request */
GRPC_STATUS_INVALID_ARGUMENT = 3,
InvalidArgument = 3,
/* Deadline expired before operation could complete. For operations
that change the state of the system, this error may be returned
even if the operation has completed successfully. For example, a
@ -71,16 +71,16 @@ namespace Google.GRPC.Core
enough for the deadline to expire.
HTTP Mapping: 504 Gateway Timeout */
GRPC_STATUS_DEADLINE_EXCEEDED = 4,
DeadlineExceeded = 4,
/* Some requested entity (e.g., file or directory) was not found.
HTTP Mapping: 404 Not Found */
GRPC_STATUS_NOT_FOUND = 5,
NotFound = 5,
/* Some entity that we attempted to create (e.g., file or directory)
already exists.
HTTP Mapping: 409 Conflict */
GRPC_STATUS_ALREADY_EXISTS = 6,
AlreadyExists = 6,
/* The caller does not have permission to execute the specified
operation. PERMISSION_DENIED must not be used for rejections
caused by exhausting some resource (use RESOURCE_EXHAUSTED
@ -89,17 +89,17 @@ namespace Google.GRPC.Core
instead for those errors).
HTTP Mapping: 403 Forbidden */
GRPC_STATUS_PERMISSION_DENIED = 7,
PermissionDenied = 7,
/* The request does not have valid authentication credentials for the
operation.
HTTP Mapping: 401 Unauthorized */
GRPC_STATUS_UNAUTHENTICATED = 16,
Unauthenticated = 16,
/* Some resource has been exhausted, perhaps a per-user quota, or
perhaps the entire file system is out of space.
HTTP Mapping: 429 Too Many Requests */
GRPC_STATUS_RESOURCE_EXHAUSTED = 8,
ResourceExhausted = 8,
/* Operation was rejected because the system is not in a state
required for the operation's execution. For example, directory
to be deleted may be non-empty, an rmdir operation is applied to
@ -126,7 +126,7 @@ namespace Google.GRPC.Core
the request contains Etag related headers. So if the server does see
Etag related headers in the request, it may choose to return 412
instead of 400 for this error code. */
GRPC_STATUS_FAILED_PRECONDITION = 9,
FailedPrecondition = 9,
/* The operation was aborted, typically due to a concurrency issue
like sequencer check failures, transaction aborts, etc.
@ -134,7 +134,7 @@ namespace Google.GRPC.Core
ABORTED, and UNAVAILABLE.
HTTP Mapping: 409 Conflict */
GRPC_STATUS_ABORTED = 10,
Aborted = 10,
/* Operation was attempted past the valid range. E.g., seeking or
reading past end of file.
@ -152,17 +152,17 @@ namespace Google.GRPC.Core
they are done.
HTTP Mapping: 400 Bad Request */
GRPC_STATUS_OUT_OF_RANGE = 11,
OutOfRange = 11,
/* Operation is not implemented or not supported/enabled in this service.
HTTP Mapping: 501 Not Implemented */
GRPC_STATUS_UNIMPLEMENTED = 12,
Unimplemented = 12,
/* Internal errors. Means some invariants expected by underlying
system has been broken. If you see one of these errors,
something is very broken.
HTTP Mapping: 500 Internal Server Error */
GRPC_STATUS_INTERNAL = 13,
Internal = 13,
/* The service is currently unavailable. This is a most likely a
transient condition and may be corrected by retrying with
a backoff.
@ -171,13 +171,11 @@ namespace Google.GRPC.Core
ABORTED, and UNAVAILABLE.
HTTP Mapping: 503 Service Unavailable */
GRPC_STATUS_UNAVAILABLE = 14,
Unavailable = 14,
/* Unrecoverable data loss or corruption.
HTTP Mapping: 500 Internal Server Error */
GRPC_STATUS_DATA_LOSS = 15,
/* Force users to include a default branch: */
GRPC_STATUS__DO_NOT_USE = -1
DataLoss = 15
}
}

@ -32,10 +32,10 @@
#endregion
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Google.GRPC.Core.Utils
namespace Grpc.Core.Utils
{
public class RecordingObserver<T> : IObserver<T>
{

@ -36,7 +36,7 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
namespace Google.GRPC.Core.Utils
namespace Grpc.Core.Utils
{
// TODO: replace this by something that implements IAsyncEnumerator.
/// <summary>

@ -32,15 +32,15 @@
#endregion
using System;
using NUnit.Framework;
using Google.GRPC.Core;
using Google.GRPC.Core.Internal;
using System.Threading;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Google.GRPC.Core.Tests
namespace Grpc.Core.Tests
{
public class ClientServerTest
{
@ -133,7 +133,7 @@ namespace Google.GRPC.Core.Tests
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
Assert.Fail();
} catch(RpcException e) {
Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode);
Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode);
}
}

@ -32,11 +32,11 @@
#endregion
using System;
using NUnit.Framework;
using Google.GRPC.Core;
using System.Threading;
using Grpc.Core;
using NUnit.Framework;
namespace Google.GRPC.Core.Tests
namespace Grpc.Core.Tests
{
public class GrpcEnvironmentTest
{

@ -8,13 +8,13 @@ using System.Runtime.CompilerServices;
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("jtattermusch")]
[assembly: AssemblyCopyright("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("0.9.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]

@ -32,12 +32,12 @@
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
using Google.GRPC.Core.Internal;
using Google.GRPC.Core;
using Google.GRPC.Core.Utils;
namespace Google.GRPC.Core.Tests
namespace Grpc.Core.Tests
{
public class ServerTest
{
@ -47,7 +47,7 @@ namespace Google.GRPC.Core.Tests
GrpcEnvironment.Initialize();
Server server = new Server();
int port = server.AddPort("localhost:0");
server.AddPort("localhost:0");
server.Start();
server.ShutdownAsync().Wait();

@ -15,17 +15,17 @@
<results>
<test-suite type="TestFixture" name="CallsTest" executed="True" result="Success" success="True" time="0.009" asserts="0">
<results>
<test-case name="Google.GRPC.Core.Tests.CallsTest.Test1" executed="True" result="Success" success="True" time="0.004" asserts="0" />
<test-case name="Grpc.Core.Tests.CallsTest.Test1" executed="True" result="Success" success="True" time="0.004" asserts="0" />
</results>
</test-suite>
<test-suite type="TestFixture" name="ClientServerTest" executed="True" result="Success" success="True" time="0.149" asserts="0">
<results>
<test-case name="Google.GRPC.Core.Tests.ClientServerTest.EmptyCall" executed="True" result="Success" success="True" time="0.111" asserts="0" />
<test-case name="Grpc.Core.Tests.ClientServerTest.EmptyCall" executed="True" result="Success" success="True" time="0.111" asserts="0" />
</results>
</test-suite>
<test-suite type="TestFixture" name="ServerTest" executed="True" result="Success" success="True" time="0.001" asserts="0">
<results>
<test-case name="Google.GRPC.Core.Tests.ServerTest.StartAndShutdownServer" executed="True" result="Success" success="True" time="0.001" asserts="0" />
<test-case name="Grpc.Core.Tests.ServerTest.StartAndShutdownServer" executed="True" result="Success" success="True" time="0.001" asserts="0" />
</results>
</test-suite>
</results>

@ -32,11 +32,11 @@
#endregion
using System;
using NUnit.Framework;
using System.Runtime.InteropServices;
using Google.GRPC.Core.Internal;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Google.GRPC.Core.Internal.Tests
namespace Grpc.Core.Internal.Tests
{
public class TimespecTest
{

@ -33,14 +33,14 @@
using System;
using System.Collections.Generic;
using NUnit.Framework;
using System.Text.RegularExpressions;
using Google.GRPC.Core;
using Google.GRPC.Core.Utils;
using Google.ProtocolBuffers;
using Grpc.Core;
using Grpc.Core.Utils;
using NUnit.Framework;
using grpc.testing;
namespace Google.GRPC.Interop
namespace Grpc.Interop
{
class Client
{

@ -9,7 +9,7 @@
<OutputType>Exe</OutputType>
<RootNamespace>InteropClient</RootNamespace>
<AssemblyName>InteropClient</AssemblyName>
<StartupObject>Google.GRPC.Interop.Client</StartupObject>
<StartupObject>Grpc.Interop.Client</StartupObject>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
@ -33,14 +33,13 @@
<PlatformTarget>x86</PlatformTarget>
</PropertyGroup>
<ItemGroup>
<Reference Include="Google.ProtocolBuffers, Version=2.4.1.521, Culture=neutral, PublicKeyToken=55f7125234beb589, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="nunit.framework">
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />

@ -8,13 +8,13 @@ using System.Runtime.CompilerServices;
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("jtattermusch")]
[assembly: AssemblyCopyright("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("0.9.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]

@ -33,8 +33,8 @@
using System;
using System.Runtime.InteropServices;
using Google.GRPC.Core;
using System.Threading;
using Grpc.Core;
namespace math
{

@ -8,13 +8,13 @@ using System.Runtime.CompilerServices;
[assembly: AssemblyConfiguration ("")]
[assembly: AssemblyCompany ("")]
[assembly: AssemblyProduct ("")]
[assembly: AssemblyCopyright ("jtattermusch")]
[assembly: AssemblyCopyright ("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark ("")]
[assembly: AssemblyCulture ("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion ("1.0.*")]
[assembly: AssemblyVersion ("0.9.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]

@ -1,9 +1,9 @@
gRPC C#
=======
A C# implementation of gRPC, Google's RPC library.
A C# implementation of gRPC.
EXPERIMENTAL ONLY
Status
-----------------
**This gRPC C# implementation is work-in-progress and is not expected to work yet.**

@ -0,0 +1,28 @@
{
"bitwise": true,
"curly": true,
"eqeqeq": true,
"esnext": true,
"freeze": true,
"immed": true,
"indent": 2,
"latedef": "nofunc",
"maxlen": 80,
"newcap": true,
"node": true,
"noarg": true,
"quotmark": "single",
"strict": true,
"trailing": true,
"undef": true,
"unused": "vars",
"globals": {
/* Mocha-provided globals */
"describe": false,
"it": false,
"before": false,
"beforeEach": false,
"after": false,
"afterEach": false
}
}

@ -1,5 +1,9 @@
# Node.js gRPC Library
## Status
Alpha : Ready for early adopters
## Installation
First, clone this repository (NPM package coming soon). Then follow the instructions in the `INSTALL` file in the root of the repository to install the C core library that this package depends on.

@ -31,9 +31,8 @@
*
*/
var _ = require('underscore');
var ProtoBuf = require('protobufjs');
var fs = require('fs');
'use strict';
var util = require('util');
var Transform = require('stream').Transform;

@ -31,6 +31,8 @@
*
*/
'use strict';
var grpc = require('..');
var testProto = grpc.load(__dirname + '/../interop/test.proto').grpc.testing;
var _ = require('underscore');
@ -44,7 +46,6 @@ function runTest(iterations, callback) {
function runIterations(finish) {
var start = process.hrtime();
var intervals = [];
var pending = iterations;
function next(i) {
if (i >= iterations) {
testServer.server.shutdown();
@ -69,28 +70,30 @@ function runTest(iterations, callback) {
function warmUp(num) {
var pending = num;
function startCall() {
client.emptyCall({}, function(err, resp) {
pending--;
if (pending === 0) {
runIterations(callback);
}
});
}
for (var i = 0; i < num; i++) {
(function(i) {
client.emptyCall({}, function(err, resp) {
pending--;
if (pending === 0) {
runIterations(callback);
}
});
})(i);
startCall();
}
}
warmUp(100);
}
function percentile(arr, percentile) {
if (percentile > 99) {
percentile = 99;
function percentile(arr, pct) {
if (pct > 99) {
pct = 99;
}
if (percentile < 0) {
percentile = 0;
if (pct < 0) {
pct = 0;
}
return arr[(arr.length * percentile / 100)|0];
var index = Math.floor(arr.length * pct / 100);
return arr[index];
}
if (require.main === module) {

@ -31,6 +31,8 @@
*
*/
'use strict';
var _ = require('underscore');
var grpc = require('..');
var examples = grpc.load(__dirname + '/stock.proto').examples;

@ -38,7 +38,6 @@
#include "call.h"
#include "channel.h"
#include "event.h"
#include "server.h"
#include "completion_queue_async_worker.h"
#include "credentials.h"

@ -31,6 +31,8 @@
*
*/
'use strict';
var _ = require('underscore');
var ProtoBuf = require('protobufjs');
@ -73,6 +75,37 @@ function load(filename) {
return loadObject(builder.ns);
}
/**
* Get a function that a client can use to update metadata with authentication
* information from a Google Auth credential object, which comes from the
* googleauth library.
* @param {Object} credential The credential object to use
* @return {function(Object, callback)} Metadata updater function
*/
function getGoogleAuthDelegate(credential) {
/**
* Update a metadata object with authentication information.
* @param {Object} metadata Metadata object
* @param {function(Error, Object)} callback
*/
return function updateMetadata(metadata, callback) {
metadata = _.clone(metadata);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
credential.getAccessToken(function(err, token) {
if (err) {
callback(err);
return;
}
metadata.Authorization.push('Bearer ' + token);
callback(null, metadata);
});
};
}
/**
* See docs for loadObject
*/
@ -106,3 +139,5 @@ exports.Credentials = grpc.Credentials;
* ServerCredentials factories
*/
exports.ServerCredentials = grpc.ServerCredentials;
exports.getGoogleAuthDelegate = getGoogleAuthDelegate;

@ -31,13 +31,21 @@
*
*/
'use strict';
var fs = require('fs');
var path = require('path');
var grpc = require('..');
var testProto = grpc.load(__dirname + '/test.proto').grpc.testing;
var GoogleAuth = require('googleauth');
var assert = require('assert');
var AUTH_SCOPE = 'https://www.googleapis.com/auth/xapi.zoo';
var AUTH_SCOPE_RESPONSE = 'xapi.zoo';
var AUTH_USER = ('155450119199-3psnrh1sdr3d8cpj1v46naggf81mhdnk' +
'@developer.gserviceaccount.com');
/**
* Create a buffer filled with size zeroes
* @param {number} size The length of the buffer
@ -255,6 +263,45 @@ function cancelAfterFirstResponse(client, done) {
});
}
/**
* Run one of the authentication tests.
* @param {Client} client The client to test against
* @param {function} done Callback to call when the test is completed. Included
* primarily for use with mocha
*/
function authTest(client, done) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
assert.ifError(err);
if (credential.createScopedRequired()) {
credential = credential.createScoped(AUTH_SCOPE);
}
client.updateMetadata = grpc.getGoogleAuthDelegate(credential);
var arg = {
response_type: testProto.PayloadType.COMPRESSABLE,
response_size: 314159,
payload: {
body: zeroBuffer(271828)
},
fill_username: true,
fill_oauth_scope: true
};
var call = client.unaryCall(arg, function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.payload.type, testProto.PayloadType.COMPRESSABLE);
assert.strictEqual(resp.payload.body.limit - resp.payload.body.offset,
314159);
assert.strictEqual(resp.username, AUTH_USER);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
});
});
}
/**
* Map from test case names to test functions
*/
@ -266,13 +313,15 @@ var test_cases = {
ping_pong: pingPong,
empty_stream: emptyStream,
cancel_after_begin: cancelAfterBegin,
cancel_after_first_response: cancelAfterFirstResponse
cancel_after_first_response: cancelAfterFirstResponse,
compute_engine_creds: authTest,
service_account_creds: authTest
};
/**
* Execute a single test case.
* @param {string} address The address of the server to connect to, in the
* format "hostname:port"
* format 'hostname:port'
* @param {string} host_overrirde The hostname of the server to use as an SSL
* override
* @param {string} test_case The name of the test case to run
@ -280,11 +329,16 @@ var test_cases = {
* @param {function} done Callback to call when the test is completed. Included
* primarily for use with mocha
*/
function runTest(address, host_override, test_case, tls, done) {
function runTest(address, host_override, test_case, tls, test_ca, done) {
// TODO(mlumish): enable TLS functionality
var options = {};
if (tls) {
var ca_path = path.join(__dirname, '../test/data/ca.pem');
var ca_path;
if (test_ca) {
ca_path = path.join(__dirname, '../test/data/ca.pem');
} else {
ca_path = process.env.SSL_CERT_FILE;
}
var ca_data = fs.readFileSync(ca_path);
var creds = grpc.Credentials.createSsl(ca_data);
options.credentials = creds;
@ -304,7 +358,10 @@ if (require.main === module) {
'use_tls', 'use_test_ca']
});
runTest(argv.server_host + ':' + argv.server_port, argv.server_host_override,
argv.test_case, argv.use_tls === 'true');
argv.test_case, argv.use_tls === 'true', argv.use_test_ca === 'true',
function () {
console.log('OK:', argv.test_case);
});
}
/**

@ -31,6 +31,8 @@
*
*/
'use strict';
var fs = require('fs');
var path = require('path');
var _ = require('underscore');

@ -66,6 +66,12 @@ message SimpleRequest {
// Optional input payload sent along with the request.
optional Payload payload = 3;
// Whether SimpleResponse should include username.
optional bool fill_username = 4;
// Whether SimpleResponse should include OAuth scope.
optional bool fill_oauth_scope = 5;
}
// Unary response, as configured by the request.
@ -74,7 +80,9 @@ message SimpleResponse {
optional Payload payload = 1;
// The user the request came from, for verifying authentication was
// successful when the client expected it.
optional int64 effective_gaia_user_id = 2;
optional string username = 2;
// OAuth scope.
optional string oauth_scope = 3;
}
// Client-streaming request.

@ -3,10 +3,12 @@
"version": "0.2.0",
"description": "gRPC Library for Node",
"scripts": {
"test": "./node_modules/mocha/bin/mocha"
"lint": "jshint src test examples interop index.js",
"test": "./node_modules/mocha/bin/mocha && npm run-script lint"
},
"dependencies": {
"bindings": "^1.2.1",
"jshint": "^2.5.5",
"nan": "~1.3.0",
"protobufjs": "murgatroid99/ProtoBuf.js",
"underscore": "^1.7.0",
@ -14,7 +16,18 @@
},
"devDependencies": {
"mocha": "~1.21.0",
"minimist": "^1.1.0"
"minimist": "^1.1.0",
"googleauth": "google/google-auth-library-nodejs"
},
"files": [
"README.md",
"index.js",
"binding.gyp",
"examples",
"ext",
"interop",
"src",
"test"
],
"main": "index.js"
}

@ -31,6 +31,8 @@
*
*/
'use strict';
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
@ -77,6 +79,7 @@ function ClientWritableStream(call, serialize) {
* @param {function(Error=)} callback Called when the write is complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, event) {
@ -85,7 +88,7 @@ function _write(chunk, encoding, callback) {
}
callback();
});
};
}
ClientWritableStream.prototype._write = _write;
@ -111,6 +114,7 @@ function ClientReadableStream(call, deserialize) {
* @param {*} size Ignored because we use objectMode=true
*/
function _read(size) {
/* jshint validthis: true */
var self = this;
/**
* Callback to be called when a READ event is received. Pushes the data onto
@ -126,7 +130,7 @@ function _read(size) {
return;
}
var data = event.read;
if (self.push(self.deserialize(data)) && data != null) {
if (self.push(self.deserialize(data)) && data !== null) {
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
self.call.startBatch(read_batch, readCallback);
@ -144,7 +148,7 @@ function _read(size) {
self.call.startBatch(read_batch, readCallback);
}
}
};
}
ClientReadableStream.prototype._read = _read;
@ -163,10 +167,6 @@ function ClientDuplexStream(call, serialize, deserialize) {
Duplex.call(this, {objectMode: true});
this.serialize = common.wrapIgnoreNull(serialize);
this.deserialize = common.wrapIgnoreNull(deserialize);
var self = this;
var finished = false;
// Indicates that a read is currently pending
var reading = false;
this.call = call;
this.on('finish', function() {
var batch = {};
@ -182,6 +182,7 @@ ClientDuplexStream.prototype._write = _write;
* Cancel the ongoing call
*/
function cancel() {
/* jshint validthis: true */
this.call.cancel();
}
@ -213,6 +214,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeUnaryRequest(argument, callback, metadata, deadline) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
@ -224,25 +226,32 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
emitter.cancel = function cancel() {
call.cancel();
};
var client_batch = {};
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
if (err) {
callback(err);
this.updateMetadata(metadata, function(error, metadata) {
if (error) {
call.cancel();
callback(error);
return;
}
if (response.status.code != grpc.status.OK) {
callback(response.status);
return;
}
emitter.emit('status', response.status);
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
var client_batch = {};
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
if (err) {
callback(err);
return;
}
if (response.status.code !== grpc.status.OK) {
callback(response.status);
return;
}
emitter.emit('status', response.status);
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
});
});
return emitter;
}
@ -271,6 +280,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeClientStreamRequest(callback, metadata, deadline) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
@ -279,30 +289,37 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
metadata = {};
}
var stream = new ClientWritableStream(call, serialize);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
callback(err);
this.updateMetadata(metadata, function(error, metadata) {
if (error) {
call.cancel();
callback(error);
return;
}
stream.emit('metadata', response.metadata);
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
if (err) {
callback(err);
return;
}
if (response.status.code != grpc.status.OK) {
callback(response.status);
return;
}
stream.emit('status', response.status);
callback(null, deserialize(response.read));
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
callback(err);
return;
}
stream.emit('metadata', response.metadata);
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
if (err) {
callback(err);
return;
}
if (response.status.code !== grpc.status.OK) {
callback(response.status);
return;
}
stream.emit('status', response.status);
callback(null, deserialize(response.read));
});
});
return stream;
}
@ -331,6 +348,7 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeServerStreamRequest(argument, metadata, deadline) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
@ -339,24 +357,31 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
metadata = {};
}
var stream = new ClientReadableStream(call, deserialize);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
throw err;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
throw err;
this.updateMetadata(metadata, function(error, metadata) {
if (error) {
call.cancel();
stream.emit('error', error);
return;
}
stream.emit('status', response.status);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
throw err;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
throw err;
}
stream.emit('status', response.status);
});
});
return stream;
}
@ -383,6 +408,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
* @return {EventEmitter} An event emitter for stream related events
*/
function makeBidiStreamRequest(metadata, deadline) {
/* jshint validthis: true */
if (deadline === undefined) {
deadline = Infinity;
}
@ -391,22 +417,29 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
metadata = {};
}
var stream = new ClientDuplexStream(call, serialize, deserialize);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
throw err;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
throw err;
this.updateMetadata(metadata, function(error, metadata) {
if (error) {
call.cancel();
stream.emit('error', error);
return;
}
stream.emit('status', response.status);
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
throw err;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
if (err) {
throw err;
}
stream.emit('status', response.status);
});
});
return stream;
}
@ -438,8 +471,17 @@ function makeClientConstructor(service) {
* @constructor
* @param {string} address The address of the server to connect to
* @param {Object} options Options to pass to the underlying channel
* @param {function(Object, function)=} updateMetadata function to update the
* metadata for each request
*/
function Client(address, options) {
function Client(address, options, updateMetadata) {
if (updateMetadata) {
this.updateMetadata = updateMetadata;
} else {
this.updateMetadata = function(metadata, callback) {
callback(null, metadata);
};
}
this.channel = new grpc.Channel(address, options);
}
@ -458,11 +500,13 @@ function makeClientConstructor(service) {
method_type = 'unary';
}
}
Client.prototype[decapitalize(method.name)] =
requester_makers[method_type](
prefix + capitalize(method.name),
common.serializeCls(method.resolvedRequestType.build()),
common.deserializeCls(method.resolvedResponseType.build()));
var serialize = common.serializeCls(method.resolvedRequestType.build());
var deserialize = common.deserializeCls(
method.resolvedResponseType.build());
Client.prototype[decapitalize(method.name)] = requester_makers[method_type](
prefix + capitalize(method.name), serialize, deserialize);
Client.prototype[decapitalize(method.name)].serialize = serialize;
Client.prototype[decapitalize(method.name)].deserialize = deserialize;
});
Client.service = service;

@ -31,6 +31,8 @@
*
*/
'use strict';
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');

@ -31,6 +31,8 @@
*
*/
'use strict';
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
@ -217,6 +219,7 @@ function ServerWritableStream(call, serialize) {
* complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, value) {
@ -251,6 +254,7 @@ function ServerReadableStream(call, deserialize) {
* @param {number} size Ignored
*/
function _read(size) {
/* jshint validthis: true */
var self = this;
/**
* Callback to be called when a READ event is received. Pushes the data onto
@ -267,7 +271,7 @@ function _read(size) {
return;
}
var data = event.read;
if (self.push(self.deserialize(data)) && data != null) {
if (self.push(self.deserialize(data)) && data !== null) {
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
self.call.startBatch(read_batch, readCallback);
@ -424,7 +428,6 @@ function Server(getMetadata, options) {
var handlers = this.handlers;
var server = new grpc.Server(options);
this._server = server;
var started = false;
/**
* Start the server and begin handling requests
* @this Server
@ -456,8 +459,7 @@ function Server(getMetadata, options) {
return;
}
server.requestCall(handleNewCall);
var handler = undefined;
var deadline = details.deadline;
var handler;
if (handlers.hasOwnProperty(method)) {
handler = handlers[method];
} else {
@ -465,7 +467,7 @@ function Server(getMetadata, options) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
code: grpc.status.UNIMPLEMENTED,
details: "This method is not available on this server.",
details: 'This method is not available on this server.',
metadata: {}
};
batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;

@ -31,6 +31,8 @@
*
*/
'use strict';
var assert = require('assert');
var grpc = require('bindings')('grpc.node');

@ -31,6 +31,8 @@
*
*/
'use strict';
var assert = require('assert');
var grpc = require('bindings')('grpc.node');

@ -31,6 +31,8 @@
*
*/
'use strict';
var assert = require('assert');
var grpc = require('bindings')('grpc.node');

@ -31,6 +31,8 @@
*
*/
'use strict';
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
@ -227,7 +229,7 @@ describe('end-to-end', function() {
response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
server_call.startBatch(response_batch, function(err, response) {
assert(response['send status']);
assert(!response['cancelled']);
assert(!response.cancelled);
done();
});
});

@ -31,6 +31,8 @@
*
*/
'use strict';
var interop_server = require('../interop/interop_server.js');
var interop_client = require('../interop/interop_client.js');
@ -53,30 +55,35 @@ describe('Interop tests', function() {
});
// This depends on not using a binary stream
it('should pass empty_unary', function(done) {
interop_client.runTest(port, name_override, 'empty_unary', true, done);
interop_client.runTest(port, name_override, 'empty_unary', true, true,
done);
});
// This fails due to an unknown bug
it('should pass large_unary', function(done) {
interop_client.runTest(port, name_override, 'large_unary', true, done);
interop_client.runTest(port, name_override, 'large_unary', true, true,
done);
});
it('should pass client_streaming', function(done) {
interop_client.runTest(port, name_override, 'client_streaming', true, done);
interop_client.runTest(port, name_override, 'client_streaming', true, true,
done);
});
it('should pass server_streaming', function(done) {
interop_client.runTest(port, name_override, 'server_streaming', true, done);
interop_client.runTest(port, name_override, 'server_streaming', true, true,
done);
});
it('should pass ping_pong', function(done) {
interop_client.runTest(port, name_override, 'ping_pong', true, done);
interop_client.runTest(port, name_override, 'ping_pong', true, true, done);
});
it('should pass empty_stream', function(done) {
interop_client.runTest(port, name_override, 'empty_stream', true, done);
interop_client.runTest(port, name_override, 'empty_stream', true, true,
done);
});
it('should pass cancel_after_begin', function(done) {
interop_client.runTest(port, name_override, 'cancel_after_begin', true,
done);
true, done);
});
it('should pass cancel_after_first_response', function(done) {
interop_client.runTest(port, name_override, 'cancel_after_first_response',
true, done);
true, true, done);
});
});

@ -31,6 +31,8 @@
*
*/
'use strict';
var assert = require('assert');
var grpc = require('..');
@ -59,7 +61,7 @@ describe('Math client', function() {
});
it('should handle a single request', function(done) {
var arg = {dividend: 7, divisor: 4};
var call = math_client.div(arg, function handleDivResult(err, value) {
math_client.div(arg, function handleDivResult(err, value) {
assert.ifError(err);
assert.equal(value.quotient, 1);
assert.equal(value.remainder, 3);

@ -31,9 +31,9 @@
*
*/
var assert = require('assert');
'use strict';
var surface_server = require('../src/server.js');
var assert = require('assert');
var surface_client = require('../src/client.js');

@ -0,0 +1,18 @@
# Xcode
#
build/
*.pbxuser
!default.pbxuser
*.mode1v3
!default.mode1v3
*.mode2v3
!default.mode2v3
*.perspectivev3
!default.perspectivev3
xcuserdata
*.xccheckout
*.moved-aside
DerivedData
*.hmap
*.ipa
*.xcuserstate

@ -0,0 +1,90 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import <Foundation/Foundation.h>
#import <RxLibrary/GRXWriter.h>
@class GRPCMethodName;
@class GRPCCall;
// The gRPC protocol is an RPC protocol on top of HTTP2.
//
// While the most common type of RPC receives only one request message and
// returns only one response message, the protocol also supports RPCs that
// return multiple individual messages in a streaming fashion, RPCs that
// accept a stream of request messages, or RPCs with both streaming requests
// and responses.
//
// Conceptually, each gRPC call consists of a bidirectional stream of binary
// messages, with RPCs of the "non-streaming type" sending only one message in
// the corresponding direction (the protocol doesn't make any distinction).
//
// Each RPC uses a different HTTP2 stream, and thus multiple simultaneous RPCs
// can be multiplexed transparently on the same TCP connection.
@interface GRPCCall : NSObject<GRXWriter>
// These HTTP2 headers will be passed to the server as part of this call. Each
// HTTP2 header is a name-value pair with string names and either string or binary values.
// The passed dictionary has to use NSString keys, corresponding to the header names. The
// value associated to each can be a NSString object or a NSData object. E.g.:
//
// call.requestMetadata = @{
// @"Authorization": @"Bearer ...",
// @"SomeBinaryHeader": someData
// };
//
// After the call is started, modifying this won't have any effect.
@property(nonatomic, readwrite) NSMutableDictionary *requestMetadata;
// This isn't populated until the first event is delivered to the handler.
@property(atomic, readonly) NSDictionary *responseMetadata;
// The request writer has to write NSData objects into the provided Writeable. The server will
// receive each of those separately and in order.
// A gRPC call might not complete until the request writer finishes. On the other hand, the
// request finishing doesn't necessarily make the call to finish, as the server might continue
// sending messages to the response side of the call indefinitely (depending on the semantics of
// the specific remote method called).
// To finish a call right away, invoke cancel.
- (instancetype)initWithHost:(NSString *)host
method:(GRPCMethodName *)method
requestsWriter:(id<GRXWriter>)requestsWriter NS_DESIGNATED_INITIALIZER;
// Finishes the request side of this call, notifies the server that the RPC
// should be cancelled, and finishes the response side of the call with an error
// of code CANCELED.
- (void)cancel;
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?
@end

@ -0,0 +1,406 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import "GRPCCall.h"
#include <grpc.h>
#include <support/time.h>
#import "GRPCMethodName.h"
#import "private/GRPCChannel.h"
#import "private/GRPCCompletionQueue.h"
#import "private/GRPCDelegateWrapper.h"
#import "private/GRPCMethodName+HTTP2Encoding.h"
#import "private/NSData+GRPC.h"
#import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h"
// A grpc_call_error represents a precondition failure when invoking the
// grpc_call_* functions. If one ever happens, it's a bug in this library.
//
// TODO(jcanizales): Can an application shut down gracefully when a thread other
// than the main one throws an exception?
static void AssertNoErrorInCall(grpc_call_error error) {
if (error != GRPC_CALL_OK) {
@throw [NSException exceptionWithName:NSInternalInconsistencyException
reason:@"Precondition of grpc_call_* not met."
userInfo:nil];
}
}
@interface GRPCCall () <GRXWriteable>
// Makes it readwrite.
@property(atomic, strong) NSDictionary *responseMetadata;
@end
// The following methods of a C gRPC call object aren't reentrant, and thus
// calls to them must be serialized:
// - add_metadata
// - invoke
// - start_write
// - writes_done
// - start_read
// - destroy
// The first four are called as part of responding to client commands, but
// start_read we want to call as soon as we're notified that the RPC was
// successfully established (which happens concurrently in the network queue).
// Serialization is achieved by using a private serial queue to operate the
// call object.
// Because add_metadata and invoke are called and return successfully before
// any of the other methods is called, they don't need to use the queue.
//
// Furthermore, start_write and writes_done can only be called after the
// WRITE_ACCEPTED event for any previous write is received. This is achieved by
// pausing the requests writer immediately every time it writes a value, and
// resuming it again when WRITE_ACCEPTED is received.
//
// Similarly, start_read can only be called after the READ event for any
// previous read is received. This is easier to enforce, as we're writing the
// received messages into the writeable: start_read is enqueued once upon receiving
// the CLIENT_METADATA_READ event, and then once after receiving each READ
// event.
@implementation GRPCCall {
dispatch_queue_t _callQueue;
grpc_call *_gRPCCall;
dispatch_once_t _callAlreadyInvoked;
GRPCChannel *_channel;
GRPCCompletionQueue *_completionQueue;
// The C gRPC library has less guarantees on the ordering of events than we
// do. Particularly, in the face of errors, there's no ordering guarantee at
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRPCDelegateWrapper *_responseWriteable;
id<GRXWriter> _requestWriter;
}
@synthesize state = _state;
- (instancetype)init {
return [self initWithHost:nil method:nil requestsWriter:nil];
}
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
method:(GRPCMethodName *)method
requestsWriter:(id<GRXWriter>)requestWriter {
if (!host || !method) {
[NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
}
// TODO(jcanizales): Throw if the requestWriter was already started.
if ((self = [super init])) {
static dispatch_once_t initialization;
dispatch_once(&initialization, ^{
grpc_init();
});
_completionQueue = [GRPCCompletionQueue completionQueue];
_channel = [GRPCChannel channelToHost:host];
_gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel,
method.HTTP2Path.UTF8String,
host.UTF8String,
gpr_inf_future);
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
_callQueue = dispatch_queue_create("org.grpc.call", NULL);
_requestWriter = requestWriter;
}
return self;
}
#pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil {
_requestWriter.state = GRXWriterStateFinished;
_requestWriter = nil;
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
} else {
[_responseWriteable enqueueSuccessfulCompletion];
}
}
- (void)cancelCall {
// Can be called from any thread, any number of times.
AssertNoErrorInCall(grpc_call_cancel(_gRPCCall));
}
- (void)cancel {
[self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:nil]];
[self cancelCall];
}
- (void)dealloc {
grpc_call *gRPCCall = _gRPCCall;
dispatch_async(_callQueue, ^{
grpc_call_destroy(gRPCCall);
});
}
#pragma mark Read messages
// Only called from the call queue.
// The handler will be called from the network queue.
- (void)startReadWithHandler:(GRPCEventHandler)handler {
AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler));
}
// Called initially from the network queue once response headers are received,
// then "recursively" from the responseWriteable queue after each response from the
// server has been written.
// If the call is currently paused, this is a noop. Restarting the call will invoke this
// method.
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
if (self.state == GRXWriterStatePaused) {
return;
}
__weak GRPCCall *weakSelf = self;
__weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
dispatch_async(_callQueue, ^{
[weakSelf startReadWithHandler:^(grpc_event *event) {
if (!event->data.read) {
// No more responses from the server.
return;
}
NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read];
if (!data) {
// The app doesn't have enough memory to hold the server response. We
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
//
// TODO(jcanizales): No canonical code is appropriate for this situation
// (because it's just a client problem). Use another domain and an
// appropriately-documented code.
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
[weakSelf cancelCall];
return;
}
[weakWriteable enqueueMessage:data completionHandler:^{
[weakSelf startNextRead];
}];
}];
});
}
#pragma mark Send headers
- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value {
grpc_metadata metadata;
// Safe to discard const qualifiers; we're not going to modify the contents.
metadata.key = (char *)name.UTF8String;
metadata.value = (char *)value.bytes;
metadata.value_length = value.length;
grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
}
- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value {
grpc_metadata metadata;
// Safe to discard const qualifiers; we're not going to modify the contents.
metadata.key = (char *)name.UTF8String;
metadata.value = (char *)value.UTF8String;
// The trailing \0 isn't encoded in HTTP2.
metadata.value_length = value.length;
grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
}
// TODO(jcanizales): Rename to commitHeaders.
- (void)sendHeaders:(NSDictionary *)metadata {
for (NSString *name in metadata) {
id value = metadata[name];
if ([value isKindOfClass:[NSData class]]) {
[self addHeaderWithName:name binaryValue:value];
} else if ([value isKindOfClass:[NSString class]]) {
[self addHeaderWithName:name ASCIIValue:value];
}
}
}
#pragma mark GRXWriteable implementation
// Only called from the call queue. The error handler will be called from the
// network queue if the write didn't succeed.
- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
__weak GRPCCall *weakSelf = self;
GRPCEventHandler resumingHandler = ^(grpc_event *event) {
if (event->data.write_accepted != GRPC_OP_OK) {
errorHandler();
}
// Resume the request writer (even in the case of error).
// TODO(jcanizales): No need to do it in the case of errors anymore?
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
grpc_byte_buffer *buffer = message.grpc_byteBuffer;
AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall,
buffer,
(__bridge_retained void *)resumingHandler,
0));
grpc_byte_buffer_destroy(buffer);
}
- (void)didReceiveValue:(id)value {
// TODO(jcanizales): Throw/assert if value isn't NSData.
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
_requestWriter.state = GRXWriterStatePaused;
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
[weakSelf writeMessage:value withErrorHandler:^{
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
}];
});
}
// Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
GRPCEventHandler handler = ^(grpc_event *event) {
if (event->data.finish_accepted != GRPC_OP_OK) {
errorHandler();
}
};
AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler));
}
- (void)didFinishWithError:(NSError *)errorOrNil {
if (errorOrNil) {
[self cancel];
} else {
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
[weakSelf finishRequestWithErrorHandler:^{
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
}];
});
}
}
#pragma mark Invoke
// Both handlers will eventually be called, from the network queue. Writes can start immediately
// after this.
// The first one (metadataHandler), when the response headers are received.
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler
completionHandler:(GRPCEventHandler)completionHandler {
AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall,
_completionQueue.unmanagedQueue,
(__bridge_retained void *)metadataHandler,
(__bridge_retained void *)completionHandler,
0));
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithMetadataHandler:^(grpc_event *event) {
// Response metadata received.
// TODO(jcanizales): Name the type of event->data.client_metadata_read
// in the C library so one can actually pass the object to a method.
grpc_metadata *entries = event->data.client_metadata_read.elements;
size_t count = event->data.client_metadata_read.count;
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries
count:count];
[strongSelf startNextRead];
}
} completionHandler:^(grpc_event *event) {
// TODO(jcanizales): Merge HTTP2 trailers into response metadata.
[weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]];
}];
// Now that the RPC has been initiated, request writes can start.
[_requestWriter startWithWriteable:self];
}
#pragma mark GRXWriter implementation
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
// The following produces a retain cycle self:_responseWriteable:self, which is only
// broken when didFinishWithError: is sent to the wrapped writeable.
// Care is taken not to retain self strongly in any of the blocks used in
// the implementation of GRPCCall, so that the life of the instance is
// determined by this retain cycle.
_responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self];
[self sendHeaders:_requestMetadata];
[self invokeCall];
}
- (void)setState:(GRXWriterState)newState {
// Manual transitions are only allowed from the started or paused states.
if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
}
switch (newState) {
case GRXWriterStateFinished:
_state = newState;
// Per GRXWriter's contract, setting the state to Finished manually
// means one doesn't wish the writeable to be messaged anymore.
[_responseWriteable cancelSilently];
_responseWriteable = nil;
return;
case GRXWriterStatePaused:
_state = newState;
return;
case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) {
_state = newState;
[self startNextRead];
}
return;
case GRXWriterStateNotStarted:
return;
}
}
@end

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save