Merge github.com:grpc/grpc into mmm-mmm-mmm-mmm

pull/1625/head
Craig Tiller 10 years ago
commit 3d49461845
  1. 14
      Makefile
  2. 4
      build.json
  3. 77
      src/compiler/csharp_generator.cc
  4. 11
      src/compiler/generator_helpers.h
  5. 7
      src/core/iomgr/pollset_posix.c
  6. 5
      src/core/surface/call.c
  7. 2
      src/core/surface/server.c
  8. 1
      src/core/transport/chttp2_transport.c
  9. 1
      src/csharp/Grpc.Auth/Grpc.Auth.csproj
  10. 3
      src/csharp/Grpc.Auth/Grpc.Auth.nuspec
  11. 7
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  12. 1
      src/csharp/Grpc.Core.Tests/packages.config
  13. 35
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  14. 44
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  15. 27
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  16. 2
      src/csharp/Grpc.Core/Call.cs
  17. 6
      src/csharp/Grpc.Core/Calls.cs
  18. 4
      src/csharp/Grpc.Core/Grpc.Core.csproj
  19. 6
      src/csharp/Grpc.Core/Grpc.Core.nuspec
  20. 9
      src/csharp/Grpc.Core/IAsyncStreamReader.cs
  21. 5
      src/csharp/Grpc.Core/IAsyncStreamWriter.cs
  22. 5
      src/csharp/Grpc.Core/IClientStreamWriter.cs
  23. 2
      src/csharp/Grpc.Core/IServerStreamWriter.cs
  24. 6
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  25. 29
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
  26. 25
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  27. 29
      src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
  28. 4
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  29. 3
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  30. 1
      src/csharp/Grpc.Core/ServerCallContext.cs
  31. 30
      src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
  32. 1
      src/csharp/Grpc.Core/packages.config
  33. 4
      src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
  34. 51
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  35. 1
      src/csharp/Grpc.Examples.Tests/packages.config
  36. 3
      src/csharp/Grpc.Examples/Grpc.Examples.csproj
  37. 40
      src/csharp/Grpc.Examples/MathExamples.cs
  38. 44
      src/csharp/Grpc.Examples/MathGrpc.cs
  39. 4
      src/csharp/Grpc.Examples/MathServiceImpl.cs
  40. 1
      src/csharp/Grpc.Examples/packages.config
  41. 3
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  42. 168
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  43. 70
      src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
  44. 4
      src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
  45. 1
      src/csharp/Grpc.IntegrationTesting/packages.config
  46. 4
      src/csharp/build_packages.bat
  47. 16
      src/node/bin/README.md
  48. 2
      src/node/bin/service_packager
  49. 142
      src/node/cli/service_packager.js
  50. 36
      src/node/cli/service_packager/index.js
  51. 17
      src/node/cli/service_packager/package.json.template
  52. 3
      src/node/package.json
  53. 33
      src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
  54. 2
      src/python/src/grpc/framework/face/_calls.py
  55. 4
      src/ruby/bin/apis/pubsub_demo.rb
  56. 6
      src/ruby/bin/interop/interop_client.rb
  57. 8
      src/ruby/bin/interop/interop_server.rb
  58. 44
      src/ruby/bin/math_client.rb
  59. 8
      src/ruby/bin/math_server.rb
  60. 8
      src/ruby/bin/noproto_client.rb
  61. 6
      src/ruby/bin/noproto_server.rb
  62. 10
      src/ruby/lib/grpc/generic/active_call.rb
  63. 22
      src/ruby/lib/grpc/generic/bidi_call.rb
  64. 14
      src/ruby/lib/grpc/generic/rpc_desc.rb
  65. 28
      src/ruby/lib/grpc/generic/rpc_server.rb
  66. 8
      src/ruby/lib/grpc/generic/service.rb
  67. 5
      src/ruby/lib/grpc/logconfig.rb
  68. 4
      src/ruby/spec/generic/rpc_server_spec.rb
  69. 6
      templates/Makefile.template
  70. 3
      test/core/end2end/dualstack_socket_test.c
  71. 1
      test/core/end2end/tests/max_message_length.c
  72. 2
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  73. 4
      test/cpp/qps/async_streaming_ping_pong_test.cc
  74. 4
      test/cpp/qps/async_unary_ping_pong_test.cc
  75. 42
      test/cpp/qps/driver.cc
  76. 15
      test/cpp/qps/driver.h
  77. 15
      test/cpp/qps/qps_driver.cc
  78. 4
      test/cpp/qps/qps_test.cc
  79. 2
      test/cpp/qps/qpstest.proto
  80. 4
      test/cpp/qps/sync_streaming_ping_pong_test.cc
  81. 4
      test/cpp/qps/sync_unary_ping_pong_test.cc
  82. 36
      tools/run_tests/tests.json
  83. 4
      vsprojects/nuget_package/grpc.native.csharp_ext.nuspec

@ -1127,11 +1127,7 @@ third_party/protobuf/configure:
$(LIBDIR)/$(CONFIG)/protobuf/libprotobuf.a: third_party/protobuf/configure
$(E) "[MAKE] Building protobuf"
ifeq ($(HAVE_CXX11),true)
$(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-DLANG_CXX11 -std=c++11" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static)
else
$(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-std=c++0x" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static)
endif
$(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static)
$(Q)$(MAKE) -C third_party/protobuf clean
$(Q)$(MAKE) -C third_party/protobuf
$(Q)mkdir -p $(LIBDIR)/$(CONFIG)/protobuf
@ -2053,6 +2049,10 @@ test_c: buildtests_c
test_cxx: buildtests_cxx
$(E) "[RUN] Testing async_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing async_streaming_ping_pong_test"
$(Q) $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test || ( echo test async_streaming_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing async_unary_ping_pong_test"
$(Q) $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test || ( echo test async_unary_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing channel_arguments_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
@ -2075,6 +2075,10 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/server_crash_test || ( echo test server_crash_test failed ; exit 1 )
$(E) "[RUN] Testing status_test"
$(Q) $(BINDIR)/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 )
$(E) "[RUN] Testing sync_streaming_ping_pong_test"
$(Q) $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test || ( echo test sync_streaming_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing sync_unary_ping_pong_test"
$(Q) $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test || ( echo test sync_unary_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing thread_pool_test"
$(Q) $(BINDIR)/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 )
$(E) "[RUN] Testing thread_stress_test"

@ -1814,7 +1814,6 @@
{
"name": "async_streaming_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/async_streaming_ping_pong_test.cc"
@ -1832,7 +1831,6 @@
{
"name": "async_unary_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/async_unary_ping_pong_test.cc"
@ -2274,7 +2272,6 @@
{
"name": "sync_streaming_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/sync_streaming_ping_pong_test.cc"
@ -2292,7 +2289,6 @@
{
"name": "sync_unary_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/sync_unary_ping_pong_test.cc"

@ -51,20 +51,49 @@ using grpc_generator::METHODTYPE_NO_STREAMING;
using grpc_generator::METHODTYPE_CLIENT_STREAMING;
using grpc_generator::METHODTYPE_SERVER_STREAMING;
using grpc_generator::METHODTYPE_BIDI_STREAMING;
using grpc_generator::StringReplace;
using std::map;
using std::vector;
namespace grpc_csharp_generator {
namespace {
std::string GetCSharpNamespace(const FileDescriptor* file) {
// TODO(jtattermusch): this should be based on csharp_namespace option
// TODO(jtattermusch): make GetFileNamespace part of libprotoc public API.
// NOTE: Implementation needs to match exactly to GetFileNamespace
// defined in csharp_helpers.h in protoc csharp plugin.
// We cannot reference it directly because google3 protobufs
// don't have a csharp protoc plugin.
std::string GetFileNamespace(const FileDescriptor* file) {
if (file->options().has_csharp_namespace()) {
return file->options().csharp_namespace();
}
return file->package();
}
std::string GetMessageType(const Descriptor* message) {
// TODO(jtattermusch): this has to match with C# protobuf generator
return message->name();
std::string ToCSharpName(const std::string& name, const FileDescriptor* file) {
std::string result = GetFileNamespace(file);
if (result != "") {
result += '.';
}
std::string classname;
if (file->package().empty()) {
classname = name;
} else {
// Strip the proto package from full_name since we've replaced it with
// the C# namespace.
classname = name.substr(file->package().size() + 1);
}
result += StringReplace(classname, ".", ".Types.", false);
return "global::" + result;
}
// TODO(jtattermusch): make GetClassName part of libprotoc public API.
// NOTE: Implementation needs to match exactly to GetClassName
// defined in csharp_helpers.h in protoc csharp plugin.
// We cannot reference it directly because google3 protobufs
// don't have a csharp protoc plugin.
std::string GetClassName(const Descriptor* message) {
return ToCSharpName(message->full_name(), message->file());
}
std::string GetServiceClassName(const ServiceDescriptor* service) {
@ -114,22 +143,22 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) {
if (method->client_streaming()) {
return "";
}
return GetMessageType(method->input_type()) + " request, ";
return GetClassName(method->input_type()) + " request, ";
}
std::string GetMethodReturnTypeClient(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
return "Task<" + GetMessageType(method->output_type()) + ">";
return "Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_CLIENT_STREAMING:
return "AsyncClientStreamingCall<" + GetMessageType(method->input_type())
+ ", " + GetMessageType(method->output_type()) + ">";
return "AsyncClientStreamingCall<" + GetClassName(method->input_type())
+ ", " + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
return "AsyncServerStreamingCall<" + GetMessageType(method->output_type())
return "AsyncServerStreamingCall<" + GetClassName(method->output_type())
+ ">";
case METHODTYPE_BIDI_STREAMING:
return "AsyncDuplexStreamingCall<" + GetMessageType(method->input_type())
+ ", " + GetMessageType(method->output_type()) + ">";
return "AsyncDuplexStreamingCall<" + GetClassName(method->input_type())
+ ", " + GetClassName(method->output_type()) + ">";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
return "";
@ -139,10 +168,10 @@ std::string GetMethodRequestParamServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_SERVER_STREAMING:
return GetMessageType(method->input_type()) + " request";
return GetClassName(method->input_type()) + " request";
case METHODTYPE_CLIENT_STREAMING:
case METHODTYPE_BIDI_STREAMING:
return "IAsyncStreamReader<" + GetMessageType(method->input_type())
return "IAsyncStreamReader<" + GetClassName(method->input_type())
+ "> requestStream";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
@ -153,7 +182,7 @@ std::string GetMethodReturnTypeServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_CLIENT_STREAMING:
return "Task<" + GetMessageType(method->output_type()) + ">";
return "Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
return "Task";
@ -169,7 +198,7 @@ std::string GetMethodResponseStreamMaybe(const MethodDescriptor *method) {
return "";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
return ", IServerStreamWriter<" + GetMessageType(method->output_type())
return ", IServerStreamWriter<" + GetClassName(method->output_type())
+ "> responseStream";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
@ -202,7 +231,7 @@ void GenerateMarshallerFields(Printer* out, const ServiceDescriptor *service) {
out->Print(
"static readonly Marshaller<$type$> $fieldname$ = Marshallers.Create((arg) => arg.ToByteArray(), $type$.ParseFrom);\n",
"fieldname", GetMarshallerFieldName(message), "type",
GetMessageType(message));
GetClassName(message));
}
out->Print("\n");
}
@ -211,8 +240,8 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) {
out->Print(
"static readonly Method<$request$, $response$> $fieldname$ = new Method<$request$, $response$>(\n",
"fieldname", GetMethodFieldName(method), "request",
GetMessageType(method->input_type()), "response",
GetMessageType(method->output_type()));
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
out->Indent();
out->Indent();
out->Print("$methodtype$,\n", "methodtype",
@ -242,8 +271,8 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
out->Print(
"$response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken));\n",
"methodname", method->name(), "request",
GetMessageType(method->input_type()), "response",
GetMessageType(method->output_type()));
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
}
std::string method_name = method->name();
@ -310,8 +339,8 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
out->Print(
"public $response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken))\n",
"methodname", method->name(), "request",
GetMessageType(method->input_type()), "response",
GetMessageType(method->output_type()));
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
out->Print("{\n");
out->Indent();
out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n",
@ -466,7 +495,7 @@ grpc::string GetServices(const FileDescriptor *file) {
// TODO(jtattermusch): add using for protobuf message classes
out.Print("\n");
out.Print("namespace $namespace$ {\n", "namespace", GetCSharpNamespace(file));
out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file));
out.Indent();
for (int i = 0; i < file->service_count(); i++) {
GenerateService(&out, file->service(i));

@ -60,21 +60,26 @@ inline grpc::string StripProto(grpc::string filename) {
}
inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
const grpc::string &to) {
const grpc::string &to, bool replace_all) {
size_t pos = 0;
for (;;) {
do {
pos = str.find(from, pos);
if (pos == grpc::string::npos) {
break;
}
str.replace(pos, from.length(), to);
pos += to.length();
}
} while(replace_all);
return str;
}
inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
const grpc::string &to) {
return StringReplace(str, from, to, true);
}
inline std::vector<grpc::string> tokenize(const grpc::string &input,
const grpc::string &delimiters) {
std::vector<grpc::string> tokens;

@ -258,7 +258,6 @@ static void unary_poll_do_promote(void *args, int success) {
grpc_pollset *pollset = up_args->pollset;
grpc_fd *fd = up_args->fd;
int do_shutdown_cb = 0;
gpr_free(up_args);
/*
* This is quite tricky. There are a number of cases to keep in mind here:
@ -273,8 +272,12 @@ static void unary_poll_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
while (pollset->counter != 0) {
grpc_pollset_kick(pollset);
gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future);
grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
gpr_mu_unlock(&pollset->mu);
return;
}
gpr_free(up_args);
/* At this point the pollset may no longer be a unary poller. In that case
* we should just call the right add function and be done. */
/* TODO(klempner): If we're not careful this could cause infinite recursion.

@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
call->read_state < READ_STATE_GOT_INITIAL_METADATA);
(call->write_state == WRITE_STATE_INITIAL && !call->is_client);
}
static void unlock(grpc_call *call) {
@ -601,6 +601,7 @@ static void call_on_done_send(void *pc, int success) {
if (!success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
call->send_ops.nops = 0;
call->last_send_contains = 0;
call->sending = 0;
unlock(call);

@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback(kill_zombie, elem);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
grpc_iomgr_add_callback(kill_zombie, elem);
}
gpr_mu_unlock(&chand->server->mu);
break;

@ -1142,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->recv_ops) {
GPR_ASSERT(s->incoming_sopb == NULL);
GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
s->recv_done_closure.cb = op->on_done_recv;
s->recv_done_closure.user_data = op->recv_user_data;
s->incoming_sopb = op->recv_ops;

@ -10,6 +10,7 @@
<RootNamespace>Grpc.Auth</RootNamespace>
<AssemblyName>Grpc.Auth</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<DocumentationFile>bin\$(Configuration)\Grpc.Auth.Xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>

@ -22,5 +22,8 @@
</metadata>
<files>
<file src="bin/Release/Grpc.Auth.dll" target="lib/net45" />
<file src="bin/Release/Grpc.Auth.pdb" target="lib/net45" />
<file src="bin/Release/Grpc.Auth.xml" target="lib/net45" />
<file src="**\*.cs" target="src" />
</files>
</package>

@ -34,6 +34,9 @@
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
@ -57,7 +60,5 @@
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<ItemGroup>
<Folder Include="Internal\" />
</ItemGroup>
<ItemGroup />
</Project>

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>

@ -40,33 +40,17 @@ namespace Grpc.Core
/// <summary>
/// Return type for client streaming calls.
/// </summary>
public sealed class AsyncClientStreamingCall<TRequest, TResponse>
where TRequest : class
where TResponse : class
public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
readonly Action disposeAction;
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result)
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction)
{
this.requestStream = requestStream;
this.result = result;
}
/// <summary>
/// Writes a request to RequestStream.
/// </summary>
public Task Write(TRequest message)
{
return requestStream.Write(message);
}
/// <summary>
/// Closes the RequestStream.
/// </summary>
public Task Close()
{
return requestStream.Close();
this.disposeAction = disposeAction;
}
/// <summary>
@ -99,5 +83,16 @@ namespace Grpc.Core
{
return result.GetAwaiter();
}
/// <summary>
/// Provides means to provide after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
public void Dispose()
{
disposeAction.Invoke();
}
}
}

@ -40,42 +40,17 @@ namespace Grpc.Core
/// <summary>
/// Return type for bidirectional streaming calls.
/// </summary>
public sealed class AsyncDuplexStreamingCall<TRequest, TResponse>
where TRequest : class
where TResponse : class
public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Action disposeAction;
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream)
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
}
/// <summary>
/// Writes a request to RequestStream.
/// </summary>
public Task Write(TRequest message)
{
return requestStream.Write(message);
}
/// <summary>
/// Closes the RequestStream.
/// </summary>
public Task Close()
{
return requestStream.Close();
}
/// <summary>
/// Reads a response from ResponseStream.
/// </summary>
/// <returns></returns>
public Task<TResponse> ReadNext()
{
return responseStream.ReadNext();
this.disposeAction = disposeAction;
}
/// <summary>
@ -99,5 +74,16 @@ namespace Grpc.Core
return requestStream;
}
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
public void Dispose()
{
disposeAction.Invoke();
}
}
}

@ -40,23 +40,15 @@ namespace Grpc.Core
/// <summary>
/// Return type for server streaming calls.
/// </summary>
public sealed class AsyncServerStreamingCall<TResponse>
where TResponse : class
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Action disposeAction;
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream)
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{
this.responseStream = responseStream;
}
/// <summary>
/// Reads the next response from ResponseStream
/// </summary>
/// <returns></returns>
public Task<TResponse> ReadNext()
{
return responseStream.ReadNext();
this.disposeAction = disposeAction;
}
/// <summary>
@ -69,5 +61,16 @@ namespace Grpc.Core
return responseStream;
}
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (response stream has been fully read), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
public void Dispose()
{
disposeAction.Invoke();
}
}
}

@ -41,8 +41,6 @@ namespace Grpc.Core
/// Abstraction of a call to be invoked on a client.
/// </summary>
public class Call<TRequest, TResponse>
where TRequest : class
where TResponse : class
{
readonly string name;
readonly Marshaller<TRequest> requestMarshaller;

@ -73,7 +73,7 @@ namespace Grpc.Core
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel);
}
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -85,7 +85,7 @@ namespace Grpc.Core
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel);
}
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -98,7 +98,7 @@ namespace Grpc.Core
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel);
}
private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)

@ -13,6 +13,7 @@
<AssemblyName>Grpc.Core</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<NuGetPackageImportStamp>8bb563fb</NuGetPackageImportStamp>
<DocumentationFile>bin\$(Configuration)\Grpc.Core.Xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@ -37,6 +38,9 @@
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />

@ -16,10 +16,14 @@
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
<dependency id="Microsoft.Bcl.Immutable" version="1.0.34" />
<dependency id="grpc.native.csharp_ext" version="0.8.0.0" />
<dependency id="Ix-Async" version="1.2.3" />
<dependency id="grpc.native.csharp_ext" version="0.9.0.0" />
</dependencies>
</metadata>
<files>
<file src="bin/Release/Grpc.Core.dll" target="lib/net45" />
<file src="bin/Release/Grpc.Core.pdb" target="lib/net45" />
<file src="bin/Release/Grpc.Core.xml" target="lib/net45" />
<file src="**\*.cs" target="src" />
</files>
</package>

@ -43,13 +43,8 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamReader<T>
where T : class
public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
{
/// <summary>
/// Reads a single message. Returns null if the last message was already read.
/// A following read can only be started when the previous one finishes.
/// </summary>
Task<T> ReadNext();
// TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface.
}
}

@ -44,12 +44,11 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamWriter<T>
where T : class
{
/// <summary>
/// Writes a single message. Only one write can be pending at a time.
/// Writes a single asynchronously. Only one write can be pending at a time.
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task Write(T message);
Task WriteAsync(T message);
}
}

@ -44,11 +44,10 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IClientStreamWriter<T> : IAsyncStreamWriter<T>
where T : class
{
/// <summary>
/// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// </summary>
Task Close();
Task CompleteAsync();
}
}

@ -43,7 +43,7 @@ namespace Grpc.Core
/// A writable stream of messages that is used in server-side handlers.
/// </summary>
public interface IServerStreamWriter<T> : IAsyncStreamWriter<T>
where T : class
{
// TODO(jtattermusch): consider just using IAsyncStreamWriter instead of this interface.
}
}

@ -38,8 +38,6 @@ namespace Grpc.Core.Internal
/// Writes requests asynchronously to an underlying AsyncCall object.
/// </summary>
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
where TRequest : class
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
@ -48,14 +46,14 @@ namespace Grpc.Core.Internal
this.call = call;
}
public Task Write(TRequest message)
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task Close()
public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);

@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
TResponse current;
public ClientResponseStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
}
public Task<TResponse> ReadNext()
public TResponse Current
{
get
{
if (current == null)
{
throw new InvalidOperationException("No current element is available.");
}
return current;
}
}
public async Task<bool> MoveNext(CancellationToken token)
{
if (token != CancellationToken.None)
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
return taskSource.Task;
var result = await taskSource.Task;
this.current = result;
return result != null;
}
public void Dispose()
{
// TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}

@ -32,6 +32,7 @@
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -71,12 +72,13 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
var request = await requestStream.ReadNext();
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(await requestStream.ReadNext() == null);
Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
var result = await handler(context, request);
await responseStream.Write(result);
await responseStream.WriteAsync(result);
}
catch (Exception e)
{
@ -85,7 +87,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatus(status);
await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@ -122,9 +124,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
var request = await requestStream.ReadNext();
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(await requestStream.ReadNext() == null);
Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
await handler(context, request, responseStream);
@ -137,7 +140,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatus(status);
await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@ -178,7 +181,7 @@ namespace Grpc.Core.Internal
var result = await handler(context, requestStream);
try
{
await responseStream.Write(result);
await responseStream.WriteAsync(result);
}
catch (OperationCanceledException)
{
@ -193,7 +196,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatus(status);
await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@ -240,7 +243,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatus(status);
await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@ -263,7 +266,7 @@ namespace Grpc.Core.Internal
var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method."));
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."));
// TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed.
await requestStream.ToList();
await finishedTask;

@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
TRequest current;
public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
public Task<TRequest> ReadNext()
public TRequest Current
{
get
{
if (current == null)
{
throw new InvalidOperationException("No current element is available.");
}
return current;
}
}
public async Task<bool> MoveNext(CancellationToken token)
{
if (token != CancellationToken.None)
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
return taskSource.Task;
var result = await taskSource.Task;
this.current = result;
return result != null;
}
public void Dispose()
{
// TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}

@ -49,14 +49,14 @@ namespace Grpc.Core.Internal
this.call = call;
}
public Task Write(TResponse message)
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteStatus(Status status)
public Task WriteStatusAsync(Status status)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, taskSource.CompletionDelegate);

@ -39,9 +39,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
//internal delegate void ServerShutdownCallbackDelegate(bool success);
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>

@ -42,7 +42,6 @@ namespace Grpc.Core
/// </summary>
public sealed class ServerCallContext
{
// TODO(jtattermusch): add cancellationToken
// TODO(jtattermusch): add deadline info

@ -49,14 +49,9 @@ namespace Grpc.Core.Utils
public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
while (true)
while (await streamReader.MoveNext())
{
var elem = await streamReader.ReadNext();
if (elem == null)
{
break;
}
await asyncAction(elem);
await asyncAction(streamReader.Current);
}
}
@ -67,32 +62,27 @@ namespace Grpc.Core.Utils
where T : class
{
var result = new List<T>();
while (true)
while (await streamReader.MoveNext())
{
var elem = await streamReader.ReadNext();
if (elem == null)
{
break;
}
result.Add(elem);
result.Add(streamReader.Current);
}
return result;
}
/// <summary>
/// Writes all elements from given enumerable to the stream.
/// Closes the stream afterwards unless close = false.
/// Completes the stream afterwards unless close = false.
/// </summary>
public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true)
public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
{
await streamWriter.Write(element);
await streamWriter.WriteAsync(element);
}
if (close)
if (complete)
{
await streamWriter.Close();
await streamWriter.CompleteAsync();
}
}
@ -104,7 +94,7 @@ namespace Grpc.Core.Utils
{
foreach (var element in elements)
{
await streamWriter.Write(element);
await streamWriter.WriteAsync(element);
}
}
}

@ -2,5 +2,6 @@
<packages>
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" />
</packages>

@ -37,6 +37,10 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />

@ -96,7 +96,19 @@ namespace math.Tests
Assert.AreEqual(0, response.Remainder);
}
// TODO(jtattermusch): test division by zero
[Test]
public void DivByZero()
{
try
{
DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
}
[Test]
public void DivAsync()
@ -114,11 +126,12 @@ namespace math.Tests
{
Task.Run(async () =>
{
var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build());
var responses = await call.ResponseStream.ToList();
CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
responses.ConvertAll((n) => n.Num_));
using (var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build()))
{
var responses = await call.ResponseStream.ToList();
CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
responses.ConvertAll((n) => n.Num_));
}
}).Wait();
}
@ -128,13 +141,15 @@ namespace math.Tests
{
Task.Run(async () =>
{
var call = client.Sum();
var numbers = new List<long> { 10, 20, 30 }.ConvertAll(
n => Num.CreateBuilder().SetNum_(n).Build());
using (var call = client.Sum())
{
var numbers = new List<long> { 10, 20, 30 }.ConvertAll(
n => Num.CreateBuilder().SetNum_(n).Build());
await call.RequestStream.WriteAll(numbers);
var result = await call.Result;
Assert.AreEqual(60, result.Num_);
await call.RequestStream.WriteAll(numbers);
var result = await call.Result;
Assert.AreEqual(60, result.Num_);
}
}).Wait();
}
@ -150,12 +165,14 @@ namespace math.Tests
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
var call = client.DivMany();
await call.RequestStream.WriteAll(divArgsList);
var result = await call.ResponseStream.ToList();
using (var call = client.DivMany())
{
await call.RequestStream.WriteAll(divArgsList);
var result = await call.ResponseStream.ToList();
CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder));
CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder));
}
}).Wait();
}
}

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>

@ -35,6 +35,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />

@ -51,18 +51,13 @@ namespace math
Console.WriteLine("DivAsync Result: " + result);
}
public static async Task DivAsyncWithCancellationExample(Math.IMathClient stub)
{
Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = await resultTask;
Console.WriteLine(result);
}
public static async Task FibExample(Math.IMathClient stub)
{
var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build());
List<Num> result = await call.ResponseStream.ToList();
Console.WriteLine("Fib Result: " + string.Join("|", result));
using (var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build()))
{
List<Num> result = await call.ResponseStream.ToList();
Console.WriteLine("Fib Result: " + string.Join("|", result));
}
}
public static async Task SumExample(Math.IMathClient stub)
@ -74,9 +69,11 @@ namespace math
new Num.Builder { Num_ = 3 }.Build()
};
var call = stub.Sum();
await call.RequestStream.WriteAll(numbers);
Console.WriteLine("Sum Result: " + await call.Result);
using (var call = stub.Sum())
{
await call.RequestStream.WriteAll(numbers);
Console.WriteLine("Sum Result: " + await call.Result);
}
}
public static async Task DivManyExample(Math.IMathClient stub)
@ -87,9 +84,11 @@ namespace math
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
var call = stub.DivMany();
await call.RequestStream.WriteAll(divArgsList);
Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
using (var call = stub.DivMany())
{
await call.RequestStream.WriteAll(divArgsList);
Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
}
}
public static async Task DependendRequestsExample(Math.IMathClient stub)
@ -101,9 +100,12 @@ namespace math
new Num.Builder { Num_ = 3 }.Build()
};
var sumCall = stub.Sum();
await sumCall.RequestStream.WriteAll(numbers);
Num sum = await sumCall.Result;
Num sum;
using (var sumCall = stub.Sum())
{
await sumCall.RequestStream.WriteAll(numbers);
sum = await sumCall.Result;
}
DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build());
Console.WriteLine("Avg Result: " + result);

@ -12,30 +12,30 @@ namespace math {
{
static readonly string __ServiceName = "math.Math";
static readonly Marshaller<DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom);
static readonly Marshaller<DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom);
static readonly Marshaller<FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom);
static readonly Marshaller<Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom);
static readonly Marshaller<global::math.DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivArgs.ParseFrom);
static readonly Marshaller<global::math.DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivReply.ParseFrom);
static readonly Marshaller<global::math.FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.FibArgs.ParseFrom);
static readonly Marshaller<global::math.Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), global::math.Num.ParseFrom);
static readonly Method<DivArgs, DivReply> __Method_Div = new Method<DivArgs, DivReply>(
static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_Div = new Method<global::math.DivArgs, global::math.DivReply>(
MethodType.Unary,
"Div",
__Marshaller_DivArgs,
__Marshaller_DivReply);
static readonly Method<DivArgs, DivReply> __Method_DivMany = new Method<DivArgs, DivReply>(
static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_DivMany = new Method<global::math.DivArgs, global::math.DivReply>(
MethodType.DuplexStreaming,
"DivMany",
__Marshaller_DivArgs,
__Marshaller_DivReply);
static readonly Method<FibArgs, Num> __Method_Fib = new Method<FibArgs, Num>(
static readonly Method<global::math.FibArgs, global::math.Num> __Method_Fib = new Method<global::math.FibArgs, global::math.Num>(
MethodType.ServerStreaming,
"Fib",
__Marshaller_FibArgs,
__Marshaller_Num);
static readonly Method<Num, Num> __Method_Sum = new Method<Num, Num>(
static readonly Method<global::math.Num, global::math.Num> __Method_Sum = new Method<global::math.Num, global::math.Num>(
MethodType.ClientStreaming,
"Sum",
__Marshaller_Num,
@ -44,20 +44,20 @@ namespace math {
// client-side stub interface
public interface IMathClient
{
DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken));
Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken));
AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken));
AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken));
global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken));
AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken));
AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken));
}
// server-side interface
public interface IMath
{
Task<DivReply> Div(ServerCallContext context, DivArgs request);
Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream);
Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream);
Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream);
Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request);
Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream);
Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream);
Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream);
}
// client stub
@ -69,27 +69,27 @@ namespace math {
public MathClient(Channel channel, StubConfiguration config) : base(channel, config)
{
}
public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div);
return Calls.AsyncUnaryCall(call, request, token);
}
public AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken))
public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_DivMany);
return Calls.AsyncDuplexStreamingCall(call, token);
}
public AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken))
public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Fib);
return Calls.AsyncServerStreamingCall(call, request, token);
}
public AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken))
public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Sum);
return Calls.AsyncClientStreamingCall(call, token);

@ -62,7 +62,7 @@ namespace math
{
foreach (var num in FibInternal(request.Limit))
{
await responseStream.Write(num);
await responseStream.WriteAsync(num);
}
}
}
@ -81,7 +81,7 @@ namespace math
{
await requestStream.ForEach(async divArgs =>
{
await responseStream.Write(DivInternal(divArgs));
await responseStream.WriteAsync(DivInternal(divArgs));
});
}

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>

@ -54,6 +54,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.Extensions">

@ -213,11 +213,13 @@ namespace Grpc.IntegrationTesting
var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
var call = client.StreamingInputCall();
await call.RequestStream.WriteAll(bodySizes);
using (var call = client.StreamingInputCall())
{
await call.RequestStream.WriteAll(bodySizes);
var response = await call.Result;
Assert.AreEqual(74922, response.AggregatedPayloadSize);
var response = await call.Result;
Assert.AreEqual(74922, response.AggregatedPayloadSize);
}
Console.WriteLine("Passed!");
}).Wait();
}
@ -236,14 +238,15 @@ namespace Grpc.IntegrationTesting
(size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
.Build();
var call = client.StreamingOutputCall(request);
var responseList = await call.ResponseStream.ToList();
foreach (var res in responseList)
using (var call = client.StreamingOutputCall(request))
{
Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
var responseList = await call.ResponseStream.ToList();
foreach (var res in responseList)
{
Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
}
CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
}
CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
Console.WriteLine("Passed!");
}).Wait();
}
@ -254,51 +257,48 @@ namespace Grpc.IntegrationTesting
{
Console.WriteLine("running ping_pong");
var call = client.FullDuplexCall();
StreamingOutputCallResponse response;
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(31415, response.Payload.Body.Length);
using (var call = client.FullDuplexCall())
{
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(9, response.Payload.Body.Length);
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(2653, response.Payload.Body.Length);
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(58979, response.Payload.Body.Length);
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
await call.RequestStream.Close();
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(null, response);
await call.RequestStream.CompleteAsync();
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
Console.WriteLine("Passed!");
}).Wait();
}
@ -308,12 +308,13 @@ namespace Grpc.IntegrationTesting
Task.Run(async () =>
{
Console.WriteLine("running empty_stream");
var call = client.FullDuplexCall();
await call.Close();
var responseList = await call.ResponseStream.ToList();
Assert.AreEqual(0, responseList.Count);
using (var call = client.FullDuplexCall())
{
await call.RequestStream.CompleteAsync();
var responseList = await call.ResponseStream.ToList();
Assert.AreEqual(0, responseList.Count);
}
Console.WriteLine("Passed!");
}).Wait();
}
@ -365,19 +366,21 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running cancel_after_begin");
var cts = new CancellationTokenSource();
var call = client.StreamingInputCall(cts.Token);
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
cts.Cancel();
try
using (var call = client.StreamingInputCall(cts.Token))
{
var response = await call.Result;
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
cts.Cancel();
try
{
var response = await call.Result;
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
}
}
Console.WriteLine("Passed!");
}).Wait();
@ -390,29 +393,28 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running cancel_after_first_response");
var cts = new CancellationTokenSource();
var call = client.FullDuplexCall(cts.Token);
StreamingOutputCallResponse response;
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
using (var call = client.FullDuplexCall(cts.Token))
{
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
response = await call.ResponseStream.ReadNext();
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(31415, response.Payload.Body.Length);
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
cts.Cancel();
cts.Cancel();
try
{
response = await call.ResponseStream.ReadNext();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
try
{
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
}
}
Console.WriteLine("Passed!");
}).Wait();

@ -12,45 +12,45 @@ namespace grpc.testing {
{
static readonly string __ServiceName = "grpc.testing.TestService";
static readonly Marshaller<Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom);
static readonly Marshaller<SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom);
static readonly Marshaller<SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom);
static readonly Marshaller<StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallRequest.ParseFrom);
static readonly Marshaller<StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallResponse.ParseFrom);
static readonly Marshaller<StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallRequest.ParseFrom);
static readonly Marshaller<StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallResponse.ParseFrom);
static readonly Marshaller<global::grpc.testing.Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.Empty.ParseFrom);
static readonly Marshaller<global::grpc.testing.SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleRequest.ParseFrom);
static readonly Marshaller<global::grpc.testing.SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleResponse.ParseFrom);
static readonly Marshaller<global::grpc.testing.StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallRequest.ParseFrom);
static readonly Marshaller<global::grpc.testing.StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallResponse.ParseFrom);
static readonly Marshaller<global::grpc.testing.StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallRequest.ParseFrom);
static readonly Marshaller<global::grpc.testing.StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallResponse.ParseFrom);
static readonly Method<Empty, Empty> __Method_EmptyCall = new Method<Empty, Empty>(
static readonly Method<global::grpc.testing.Empty, global::grpc.testing.Empty> __Method_EmptyCall = new Method<global::grpc.testing.Empty, global::grpc.testing.Empty>(
MethodType.Unary,
"EmptyCall",
__Marshaller_Empty,
__Marshaller_Empty);
static readonly Method<SimpleRequest, SimpleResponse> __Method_UnaryCall = new Method<SimpleRequest, SimpleResponse>(
static readonly Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse> __Method_UnaryCall = new Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse>(
MethodType.Unary,
"UnaryCall",
__Marshaller_SimpleRequest,
__Marshaller_SimpleResponse);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.ServerStreaming,
"StreamingOutputCall",
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> __Method_StreamingInputCall = new Method<StreamingInputCallRequest, StreamingInputCallResponse>(
static readonly Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> __Method_StreamingInputCall = new Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse>(
MethodType.ClientStreaming,
"StreamingInputCall",
__Marshaller_StreamingInputCallRequest,
__Marshaller_StreamingInputCallResponse);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.DuplexStreaming,
"FullDuplexCall",
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.DuplexStreaming,
"HalfDuplexCall",
__Marshaller_StreamingOutputCallRequest,
@ -59,25 +59,25 @@ namespace grpc.testing {
// client-side stub interface
public interface ITestServiceClient
{
Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken));
Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken));
SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken));
Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
}
// server-side interface
public interface ITestService
{
Task<Empty> EmptyCall(ServerCallContext context, Empty request);
Task<SimpleResponse> UnaryCall(ServerCallContext context, SimpleRequest request);
Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
Task<StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<StreamingInputCallRequest> requestStream);
Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
Task<global::grpc.testing.Empty> EmptyCall(ServerCallContext context, global::grpc.testing.Empty request);
Task<global::grpc.testing.SimpleResponse> UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request);
Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream);
Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
}
// client stub
@ -89,42 +89,42 @@ namespace grpc.testing {
public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config)
{
}
public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken))
public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken))
public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall);
return Calls.AsyncUnaryCall(call, request, token);
}
public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken))
public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken))
public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall);
return Calls.AsyncUnaryCall(call, request, token);
}
public AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingOutputCall);
return Calls.AsyncServerStreamingCall(call, request, token);
}
public AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingInputCall);
return Calls.AsyncClientStreamingCall(call, token);
}
public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_FullDuplexCall);
return Calls.AsyncDuplexStreamingCall(call, token);
}
public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_HalfDuplexCall);
return Calls.AsyncDuplexStreamingCall(call, token);

@ -64,7 +64,7 @@ namespace grpc.testing
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
await responseStream.Write(response);
await responseStream.WriteAsync(response);
}
}
@ -86,7 +86,7 @@ namespace grpc.testing
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
await responseStream.Write(response);
await responseStream.WriteAsync(response);
}
});
}

@ -3,6 +3,7 @@
<package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
<package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" />
<package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" />
<package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" />

@ -11,8 +11,8 @@ endlocal
@call buildall.bat || goto :error
%NUGET% pack ..\..\vsprojects\nuget_package\grpc.native.csharp_ext.nuspec || goto :error
%NUGET% pack Grpc.Core\Grpc.Core.nuspec || goto :error
%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec || goto :error
%NUGET% pack Grpc.Core\Grpc.Core.nuspec -Symbols || goto :error
%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec -Symbols || goto :error
%NUGET% pack Grpc.nuspec || goto :error
goto :EOF

@ -0,0 +1,16 @@
# Command Line Tools
# Service Packager
The command line tool `bin/service_packager`, when called with the following command line:
```bash
service_packager proto_file -o output_path -n name -v version [-i input_path...]
```
Populates `output_path` with a node package consisting of a `package.json` populated with `name` and `version`, an `index.js`, a `LICENSE` file copied from gRPC, and a `service.json`, which is compiled from `proto_file` and the given `input_path`s. `require('output_path')` returns an object that is equivalent to
```js
{ client: require('grpc').load('service.json'),
auth: require('google-auth-library') }
```

@ -0,0 +1,2 @@
#!/usr/bin/env node
require(__dirname+'/../cli/service_packager.js').main(process.argv.slice(2));

@ -0,0 +1,142 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var fs = require('fs');
var path = require('path');
var _ = require('underscore');
var async = require('async');
var pbjs = require('protobufjs/cli/pbjs');
var parseArgs = require('minimist');
var Mustache = require('mustache');
var package_json = require('../package.json');
var template_path = path.resolve(__dirname, 'service_packager');
var package_tpl_path = path.join(template_path, 'package.json.template');
var arg_format = {
string: ['include', 'out', 'name', 'version'],
alias: {
include: 'i',
out: 'o',
name: 'n',
version: 'v'
}
};
// TODO(mlumish): autogenerate README.md from proto file
/**
* Render package.json file from template using provided parameters.
* @param {Object} params Map of parameter names to values
* @param {function(Error, string)} callback Callback to pass rendered template
* text to
*/
function generatePackage(params, callback) {
fs.readFile(package_tpl_path, {encoding: 'utf-8'}, function(err, template) {
if (err) {
callback(err);
} else {
var rendered = Mustache.render(template, params);
callback(null, rendered);
}
});
}
/**
* Copy a file
* @param {string} src_path The filepath to copy from
* @param {string} dest_path The filepath to copy to
*/
function copyFile(src_path, dest_path) {
fs.createReadStream(src_path).pipe(fs.createWriteStream(dest_path));
}
/**
* Run the script. Copies the index.js and LICENSE files to the output path,
* renders the package.json template to the output path, and generates a
* service.json file from the input proto files using pbjs. The arguments are
* taken directly from the command line, and handled as follows:
* -i (--include) : An include path for pbjs (can be dpulicated)
* -o (--output): The output path
* -n (--name): The name of the package
* -v (--version): The package version
* @param {Array} argv The argument vector
*/
function main(argv) {
var args = parseArgs(argv, arg_format);
var out_path = path.resolve(args.out);
var include_dirs = [];
if (args.include) {
include_dirs = _.map(_.flatten([args.include]), function(p) {
return path.resolve(p);
});
}
args.grpc_version = package_json.version;
generatePackage(args, function(err, rendered) {
if (err) throw err;
fs.writeFile(path.join(out_path, 'package.json'), rendered, function(err) {
if (err) throw err;
});
});
copyFile(path.join(template_path, 'index.js'),
path.join(out_path, 'index.js'));
copyFile(path.join(__dirname, '..', 'LICENSE'),
path.join(out_path, 'LICENSE'));
var service_stream = fs.createWriteStream(path.join(out_path,
'service.json'));
var pbjs_args = _.flatten(['node', 'pbjs',
args._[0],
'-legacy',
_.map(include_dirs, function(dir) {
return "-path=" + dir;
})]);
var old_stdout = process.stdout;
process.__defineGetter__('stdout', function() {
return service_stream;
});
var pbjs_status = pbjs.main(pbjs_args);
process.__defineGetter__('stdout', function() {
return old_stdout;
});
if (pbjs_status !== pbjs.STATUS_OK) {
throw new Error('pbjs failed with status code ' + pbjs_status);
}
}
exports.main = main;

@ -0,0 +1,36 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var grpc = require('grpc');
exports.client = grpc.load(__dirname + '/service.json', 'json');
exports.auth = require('google-auth-library');

@ -0,0 +1,17 @@
{
"name": "{{{name}}}",
"version": "{{{version}}}",
"author": "Google Inc.",
"description": "Client library for {{{name}}} built on gRPC",
"license": "Apache-2.0",
"dependencies": {
"grpc": "{{{grpc_version}}}",
"google-auth-library": "^0.9.2"
},
"main": "index.js",
"files": [
"LICENSE",
"index.js",
"service.json"
]
}

@ -36,6 +36,7 @@
"jshint": "^2.5.0",
"minimist": "^1.1.0",
"mocha": "~1.21.0",
"mustache": "^2.0.0",
"strftime": "^0.8.2"
},
"engines": {
@ -46,6 +47,8 @@
"README.md",
"index.js",
"binding.gyp",
"bin",
"cli",
"examples",
"ext",
"interop",

@ -269,4 +269,37 @@
[self waitForExpectationsWithTimeout:1 handler:nil];
}
- (void)testCancelAfterFirstResponseRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterFirstResponse"];
// A buffered pipe to which we write a single value but never close
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
__block BOOL receivedResponse = NO;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
requestedResponseSize:@31415];
[requestsBuffer writeValue:request];
__block ProtoRPC *call = [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
handler:^(BOOL done,
RMTStreamingOutputCallResponse *response,
NSError *error) {
if (receivedResponse) {
XCTAssert(done, @"Unexpected extra response %@", response);
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[expectation fulfill];
} else {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
XCTAssertFalse(done, @"Finished without response");
XCTAssertNotNil(response);
receivedResponse = YES;
[call cancel];
}
}];
[call start];
[self waitForExpectationsWithTimeout:4 handler:nil];
}
@end

@ -248,7 +248,7 @@ class _OperationFuture(future.Future):
"""See future.Future.add_done_callback for specification."""
with self._condition:
if self._callbacks is not None:
self._callbacks.add(fn)
self._callbacks.append(fn)
return
callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self)

@ -79,7 +79,7 @@ end
def publisher_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
logger.info("... access PublisherService at #{address}")
GRPC.logger.info("... access PublisherService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)
@ -89,7 +89,7 @@ end
def subscriber_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
logger.info("... access SubscriberService at #{address}")
GRPC.logger.info("... access SubscriberService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)

@ -70,7 +70,7 @@ end
# loads the certificates used to access the test server securely.
def load_prod_cert
fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil?
logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
File.open(ENV['SSL_CERT_FILE']).read
end
@ -115,10 +115,10 @@ def create_stub(opts)
stub_opts[:update_metadata] = auth_creds.updater_proc
end
logger.info("... connecting securely to #{address}")
GRPC.logger.info("... connecting securely to #{address}")
Grpc::Testing::TestService::Stub.new(address, **stub_opts)
else
logger.info("... connecting insecurely to #{address}")
GRPC.logger.info("... connecting insecurely to #{address}")
Grpc::Testing::TestService::Stub.new(address)
end
end

@ -129,13 +129,13 @@ class TestTarget < Grpc::Testing::TestService::Service
Thread.new do
begin
reqs.each do |req|
logger.info("read #{req.inspect}")
GRPC.logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
logger.info('finished reads')
GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@ -179,10 +179,10 @@ def main
s = GRPC::RpcServer.new
if opts['secure']
s.add_http2_port(host, test_server_creds)
logger.info("... running securely on #{host}")
GRPC.logger.info("... running securely on #{host}")
else
s.add_http2_port(host)
logger.info("... running insecurely on #{host}")
GRPC.logger.info("... running insecurely on #{host}")
end
s.handle(TestTarget)
s.run_till_terminated

@ -46,51 +46,51 @@ require 'optparse'
include GRPC::Core::TimeConsts
def do_div(stub)
logger.info('request_response')
logger.info('----------------')
GRPC.logger.info('request_response')
GRPC.logger.info('----------------')
req = Math::DivArgs.new(dividend: 7, divisor: 3)
logger.info("div(7/3): req=#{req.inspect}")
GRPC.logger.info("div(7/3): req=#{req.inspect}")
resp = stub.div(req, INFINITE_FUTURE)
logger.info("Answer: #{resp.inspect}")
logger.info('----------------')
GRPC.logger.info("Answer: #{resp.inspect}")
GRPC.logger.info('----------------')
end
def do_sum(stub)
# to make client streaming requests, pass an enumerable of the inputs
logger.info('client_streamer')
logger.info('---------------')
GRPC.logger.info('client_streamer')
GRPC.logger.info('---------------')
reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) }
logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
resp = stub.sum(reqs) # reqs.is_a?(Enumerable)
logger.info("Answer: #{resp.inspect}")
logger.info('---------------')
GRPC.logger.info("Answer: #{resp.inspect}")
GRPC.logger.info('---------------')
end
def do_fib(stub)
logger.info('server_streamer')
logger.info('----------------')
GRPC.logger.info('server_streamer')
GRPC.logger.info('----------------')
req = Math::FibArgs.new(limit: 11)
logger.info("fib(11): req=#{req.inspect}")
GRPC.logger.info("fib(11): req=#{req.inspect}")
resp = stub.fib(req, INFINITE_FUTURE)
resp.each do |r|
logger.info("Answer: #{r.inspect}")
GRPC.logger.info("Answer: #{r.inspect}")
end
logger.info('----------------')
GRPC.logger.info('----------------')
end
def do_div_many(stub)
logger.info('bidi_streamer')
logger.info('-------------')
GRPC.logger.info('bidi_streamer')
GRPC.logger.info('-------------')
reqs = []
reqs << Math::DivArgs.new(dividend: 7, divisor: 3)
reqs << Math::DivArgs.new(dividend: 5, divisor: 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 2)
logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
resp = stub.div_many(reqs, 10)
resp.each do |r|
logger.info("Answer: #{r.inspect}")
GRPC.logger.info("Answer: #{r.inspect}")
end
logger.info('----------------')
GRPC.logger.info('----------------')
end
def load_test_certs
@ -132,10 +132,10 @@ def main
p stub_opts
p options['host']
stub = Math::Math::Stub.new(options['host'], **stub_opts)
logger.info("... connecting securely on #{options['host']}")
GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = Math::Math::Stub.new(options['host'])
logger.info("... connecting insecurely on #{options['host']}")
GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
do_div(stub)

@ -128,13 +128,13 @@ class Calculator < Math::Math::Service
t = Thread.new do
begin
requests.each do |req|
logger.info("read #{req.inspect}")
GRPC.logger.info("read #{req.inspect}")
resp = Math::DivReply.new(quotient: req.dividend / req.divisor,
remainder: req.dividend % req.divisor)
q.push(resp)
Thread.pass # let the internal Bidi threads run
end
logger.info('finished reads')
GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@ -176,10 +176,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
logger.info("... running securely on #{options['host']}")
GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
logger.info("... running insecurely on #{options['host']}")
GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(Calculator)

@ -94,15 +94,15 @@ def main
p stub_opts
p options['host']
stub = NoProtoStub.new(options['host'], **stub_opts)
logger.info("... connecting securely on #{options['host']}")
GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = NoProtoStub.new(options['host'])
logger.info("... connecting insecurely on #{options['host']}")
GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
logger.info('sending a NoProto rpc')
GRPC.logger.info('sending a NoProto rpc')
resp = stub.an_rpc(NoProtoMsg.new)
logger.info("got a response: #{resp}")
GRPC.logger.info("got a response: #{resp}")
end
main

@ -63,7 +63,7 @@ class NoProto < NoProtoService
end
def an_rpc(req, _call)
logger.info('echo service received a request')
GRPC.logger.info('echo service received a request')
req
end
end
@ -98,10 +98,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
logger.info("... running securely on #{options['host']}")
GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
logger.info("... running insecurely on #{options['host']}")
GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(NoProto)

@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
logger.debug("sending #{req}, marshalled? #{marshalled}")
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
@ -230,14 +230,14 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
logger.debug("received req: #{batch_result}")
GRPC.logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
logger.debug("received req.to_s: #{batch_result.message}")
GRPC.logger.debug("received req.to_s: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
logger.debug("received_req (unmarshalled): #{res.inspect}")
GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
logger.debug('found nil; the final response has been sent')
GRPC.logger.debug('found nil; the final response has been sent')
nil
end

@ -115,10 +115,10 @@ module GRPC
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
logger.debug("each_queued_msg: msg##{count}")
GRPC.logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
logger.debug("each_queued_msg: req = #{req}")
GRPC.logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
@ -134,22 +134,22 @@ module GRPC
begin
count = 0
requests.each do |req|
logger.debug("bidi-write_loop: #{count}")
GRPC.logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
logger.warn('bidi-write_loop: failed')
logger.warn(e)
GRPC.logger.warn('bidi-write_loop: failed')
GRPC.logger.warn(e)
raise e
end
end
@ -164,7 +164,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
logger.debug("bidi-read_loop: #{count}")
GRPC.logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@ -172,19 +172,19 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
logger.debug('bidi-read-loop: done reading!')
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
logger.debug("received req: #{batch_result.message}")
GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e
logger.warn('bidi: read_loop failed')
logger.warn(e)
GRPC.logger.warn('bidi: read_loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
end

@ -84,22 +84,22 @@ module GRPC
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details, **e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
logger.warn(e)
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
@ -139,8 +139,8 @@ module GRPC
details = 'Not sure why' if details.nil?
active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
GRPC.logger.warn("Could not send status #{code}:#{details}")
GRPC.logger.warn(e)
end
end
end

@ -94,7 +94,7 @@ module GRPC
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
GRPC.logger.info('schedule another job')
@jobs << [blk, args]
end
@ -114,14 +114,14 @@ module GRPC
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
logger.info('stopped, all workers are shutdown')
GRPC.logger.info('stopped, all workers are shutdown')
end
protected
@ -129,14 +129,14 @@ module GRPC
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
logger.info("forcibly terminating #{@workers.size} worker(s)")
GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
GRPC.logger.warn('error while terminating a worker')
GRPC.logger.warn(e)
end
end
end
@ -156,8 +156,8 @@ module GRPC
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
end
end
@ -365,7 +365,7 @@ module GRPC
# the server to stop.
def run
if rpc_descs.size.zero?
logger.warn('did not run as no services were present')
GRPC.logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@ -381,9 +381,9 @@ module GRPC
# Sends UNAVAILABLE if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::UNAVAILABLE, '')
@ -394,7 +394,7 @@ module GRPC
def found?(an_rpc)
mth = an_rpc.method.to_sym
return an_rpc if rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{an_rpc}")
GRPC.logger.warn("NOT_FOUND: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::NOT_FOUND, '')
@ -434,7 +434,7 @@ module GRPC
return nil unless found?(an_rpc)
# Create the ActiveCall
logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
ActiveCall.new(an_rpc.call, @cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
@ -474,7 +474,7 @@ module GRPC
else
handlers[route] = service.method(rpc_name)
end
logger.info("handling #{route} with #{handlers[route]}")
GRPC.logger.info("handling #{route} with #{handlers[route]}")
end
end
end

@ -175,23 +175,23 @@ module GRPC
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline, **kw)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline, **kw)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, **kw,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw,
&blk)
end

@ -29,7 +29,10 @@
require 'logging'
include Logging.globally # logger is accessible everywhere
# GRPC contains the General RPC module.
module GRPC
extend Logging.globally
end
Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info

@ -69,7 +69,7 @@ class EchoService
end
def an_rpc(req, call)
logger.info('echo service received a request')
GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
@ -109,7 +109,7 @@ class SlowService
end
def an_rpc(req, call)
logger.info("starting a slow #{@delay} rpc")
GRPC.logger.info("starting a slow #{@delay} rpc")
sleep @delay
@received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response

@ -673,11 +673,7 @@ third_party/protobuf/configure:
$(LIBDIR)/$(CONFIG)/protobuf/libprotobuf.a: third_party/protobuf/configure
$(E) "[MAKE] Building protobuf"
ifeq ($(HAVE_CXX11),true)
$(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-DLANG_CXX11 -std=c++11" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static)
else
$(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-std=c++0x" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static)
endif
$(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static)
$(Q)$(MAKE) -C third_party/protobuf clean
$(Q)$(MAKE) -C third_party/protobuf
$(Q)mkdir -p $(LIBDIR)/$(CONFIG)/protobuf

@ -211,6 +211,9 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_completion_queue_shutdown(server_cq);
drain_cq(server_cq);
grpc_completion_queue_destroy(server_cq);
grpc_call_details_destroy(&call_details);
gpr_free(details);
}
int main(int argc, char **argv) {

@ -188,6 +188,7 @@ static void test_max_message_length(grpc_end2end_test_config config) {
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_call_destroy(c);
grpc_call_destroy(s);

@ -125,6 +125,8 @@ static void test_call_creds_failure(grpc_end2end_test_config config) {
GPR_ASSERT(grpc_call_set_credentials(c, creds) != GRPC_CALL_OK);
grpc_credentials_release(creds);
grpc_call_destroy(c);
end_test(&f);
config.tear_down_data(&f);
}

@ -64,8 +64,8 @@ static void RunAsyncStreamingPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
ReportQPS(result);
ReportLatency(result);
ReportQPS(*result);
ReportLatency(*result);
}
} // namespace testing

@ -64,8 +64,8 @@ static void RunAsyncUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
ReportQPS(result);
ReportLatency(result);
ReportQPS(*result);
ReportLatency(*result);
}
} // namespace testing

@ -44,9 +44,11 @@
#include <thread>
#include <deque>
#include <vector>
#include <unistd.h>
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
using std::list;
using std::thread;
@ -75,13 +77,10 @@ static deque<string> get_hosts(const string& name) {
}
}
ScenarioResult RunScenario(const ClientConfig& initial_client_config,
size_t num_clients,
const ServerConfig& server_config,
size_t num_servers,
int warmup_seconds,
int benchmark_seconds,
int spawn_local_worker_count) {
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
int benchmark_seconds, int spawn_local_worker_count) {
// ClientContext allocator (all are destroyed at scope exit)
list<ClientContext> contexts;
auto alloc_context = [&contexts]() {
@ -89,6 +88,11 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
return &contexts.back();
};
// To be added to the result, containing the final configuration used for
// client and config (incluiding host, etc.)
ClientConfig result_client_config;
ServerConfig result_server_config;
// Get client, server lists
auto workers = get_hosts("QPS_WORKERS");
ClientConfig client_config = initial_client_config;
@ -96,6 +100,16 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
// Spawn some local workers if desired
vector<unique_ptr<QpsWorker>> local_workers;
for (int i = 0; i < abs(spawn_local_worker_count); i++) {
// act as if we're a new test -- gets a good rng seed
static bool called_init = false;
if (!called_init) {
char args_buf[100];
strcpy(args_buf, "some-benchmark");
char *args[] = {args_buf};
grpc_test_init(1, args);
called_init = true;
}
int driver_port = grpc_pick_unused_port_or_die();
int benchmark_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
@ -127,6 +141,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
sd.stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context()));
GPR_ASSERT(sd.stream->Write(args));
@ -156,6 +172,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
cd.stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context()));
GPR_ASSERT(cd.stream->Write(args));
@ -196,7 +214,9 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds)));
// Finish a run
ScenarioResult result;
std::unique_ptr<ScenarioResult> result(new ScenarioResult);
result->client_config = result_client_config;
result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing");
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Write(server_mark));
@ -207,14 +227,14 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result.server_resources.push_back(ResourceUsage{
result->server_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result.latencies.MergeProto(stats.latencies());
result.client_resources.push_back(ResourceUsage{
result->latencies.MergeProto(stats.latencies());
result->client_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}

@ -34,6 +34,8 @@
#ifndef TEST_QPS_DRIVER_H
#define TEST_QPS_DRIVER_H
#include <memory>
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qpstest.grpc.pb.h"
@ -49,15 +51,14 @@ struct ScenarioResult {
Histogram latencies;
std::vector<ResourceUsage> client_resources;
std::vector<ResourceUsage> server_resources;
ClientConfig client_config;
ServerConfig server_config;
};
ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config,
size_t num_clients,
const grpc::testing::ServerConfig& server_config,
size_t num_servers,
int warmup_seconds,
int benchmark_seconds,
int spawn_local_worker_count);
std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
} // namespace testing
} // namespace grpc

@ -103,14 +103,13 @@ int main(int argc, char** argv) {
FLAGS_server_threads < FLAGS_client_channels *
FLAGS_outstanding_rpcs_per_channel));
auto result = RunScenario(client_config, FLAGS_num_clients,
server_config, FLAGS_num_servers,
FLAGS_warmup_seconds, FLAGS_benchmark_seconds,
FLAGS_local_workers);
ReportQPSPerCore(result, server_config);
ReportLatency(result);
ReportTimes(result);
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers);
ReportQPSPerCore(*result, server_config);
ReportLatency(*result);
ReportTimes(*result);
return 0;
}

@ -64,8 +64,8 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
ReportQPSPerCore(result, server_config);
ReportLatency(result);
ReportQPSPerCore(*result, server_config);
ReportLatency(*result);
}
} // namespace testing

@ -102,6 +102,7 @@ message ClientConfig {
// only for async client:
optional int32 async_client_threads = 7;
optional RpcType rpc_type = 8 [default=UNARY];
optional string host = 9;
}
// Request current stats
@ -129,6 +130,7 @@ message ServerConfig {
required ServerType server_type = 1;
optional int32 threads = 2 [default=1];
optional bool enable_ssl = 3 [default=false];
optional string host = 4;
}
message ServerArgs {

@ -63,8 +63,8 @@ static void RunSynchronousStreamingPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
ReportQPS(result);
ReportLatency(result);
ReportQPS(*result);
ReportLatency(*result);
}
} // namespace testing

@ -63,8 +63,8 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
ReportQPS(result);
ReportLatency(result);
ReportQPS(*result);
ReportLatency(*result);
}
} // namespace testing

@ -596,6 +596,24 @@
"posix"
]
},
{
"flaky": false,
"language": "c++",
"name": "async_streaming_ping_pong_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c++",
"name": "async_unary_ping_pong_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c++",
@ -695,6 +713,24 @@
"posix"
]
},
{
"flaky": false,
"language": "c++",
"name": "sync_streaming_ping_pong_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c++",
"name": "sync_unary_ping_pong_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c++",

@ -2,14 +2,14 @@
<package>
<metadata>
<id>grpc.native.csharp_ext</id>
<version>0.8.0.0</version>
<version>0.9.0.0</version>
<authors>Google Inc.</authors>
<owners>Jan Tattermusch</owners>
<licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl>
<projectUrl>http://github.com/grpc/grpc</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Native extension needed by gRPC C# library. This is not the package you are looking for, it is only meant to be used as a dependency.</description>
<releaseNotes>Release of gRPC C core 0.8.0 libraries.</releaseNotes>
<releaseNotes>Release of gRPC C core 0.9.0 libraries.</releaseNotes>
<copyright>Copyright 2015</copyright>
<title>gRPC C# Native Extension</title>
<summary>Native library required by gRPC C#</summary>

Loading…
Cancel
Save