diff --git a/Makefile b/Makefile index 6d84fe12574..f4129859b44 100644 --- a/Makefile +++ b/Makefile @@ -1127,11 +1127,7 @@ third_party/protobuf/configure: $(LIBDIR)/$(CONFIG)/protobuf/libprotobuf.a: third_party/protobuf/configure $(E) "[MAKE] Building protobuf" -ifeq ($(HAVE_CXX11),true) - $(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-DLANG_CXX11 -std=c++11" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static) -else - $(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-std=c++0x" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static) -endif + $(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static) $(Q)$(MAKE) -C third_party/protobuf clean $(Q)$(MAKE) -C third_party/protobuf $(Q)mkdir -p $(LIBDIR)/$(CONFIG)/protobuf @@ -2053,6 +2049,10 @@ test_c: buildtests_c test_cxx: buildtests_cxx $(E) "[RUN] Testing async_end2end_test" $(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 ) + $(E) "[RUN] Testing async_streaming_ping_pong_test" + $(Q) $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test || ( echo test async_streaming_ping_pong_test failed ; exit 1 ) + $(E) "[RUN] Testing async_unary_ping_pong_test" + $(Q) $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test || ( echo test async_unary_ping_pong_test failed ; exit 1 ) $(E) "[RUN] Testing channel_arguments_test" $(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 ) $(E) "[RUN] Testing cli_call_test" @@ -2075,6 +2075,10 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/server_crash_test || ( echo test server_crash_test failed ; exit 1 ) $(E) "[RUN] Testing status_test" $(Q) $(BINDIR)/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 ) + $(E) "[RUN] Testing sync_streaming_ping_pong_test" + $(Q) $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test || ( echo test sync_streaming_ping_pong_test failed ; exit 1 ) + $(E) "[RUN] Testing sync_unary_ping_pong_test" + $(Q) $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test || ( echo test sync_unary_ping_pong_test failed ; exit 1 ) $(E) "[RUN] Testing thread_pool_test" $(Q) $(BINDIR)/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 ) $(E) "[RUN] Testing thread_stress_test" diff --git a/build.json b/build.json index 95bf7a623bb..b893692205c 100644 --- a/build.json +++ b/build.json @@ -1814,7 +1814,6 @@ { "name": "async_streaming_ping_pong_test", "build": "test", - "run": false, "language": "c++", "src": [ "test/cpp/qps/async_streaming_ping_pong_test.cc" @@ -1832,7 +1831,6 @@ { "name": "async_unary_ping_pong_test", "build": "test", - "run": false, "language": "c++", "src": [ "test/cpp/qps/async_unary_ping_pong_test.cc" @@ -2274,7 +2272,6 @@ { "name": "sync_streaming_ping_pong_test", "build": "test", - "run": false, "language": "c++", "src": [ "test/cpp/qps/sync_streaming_ping_pong_test.cc" @@ -2292,7 +2289,6 @@ { "name": "sync_unary_ping_pong_test", "build": "test", - "run": false, "language": "c++", "src": [ "test/cpp/qps/sync_unary_ping_pong_test.cc" diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 82dd06bcec4..5dd078b303b 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -51,20 +51,49 @@ using grpc_generator::METHODTYPE_NO_STREAMING; using grpc_generator::METHODTYPE_CLIENT_STREAMING; using grpc_generator::METHODTYPE_SERVER_STREAMING; using grpc_generator::METHODTYPE_BIDI_STREAMING; +using grpc_generator::StringReplace; using std::map; using std::vector; namespace grpc_csharp_generator { namespace { -std::string GetCSharpNamespace(const FileDescriptor* file) { - // TODO(jtattermusch): this should be based on csharp_namespace option +// TODO(jtattermusch): make GetFileNamespace part of libprotoc public API. +// NOTE: Implementation needs to match exactly to GetFileNamespace +// defined in csharp_helpers.h in protoc csharp plugin. +// We cannot reference it directly because google3 protobufs +// don't have a csharp protoc plugin. +std::string GetFileNamespace(const FileDescriptor* file) { + if (file->options().has_csharp_namespace()) { + return file->options().csharp_namespace(); + } return file->package(); } -std::string GetMessageType(const Descriptor* message) { - // TODO(jtattermusch): this has to match with C# protobuf generator - return message->name(); +std::string ToCSharpName(const std::string& name, const FileDescriptor* file) { + std::string result = GetFileNamespace(file); + if (result != "") { + result += '.'; + } + std::string classname; + if (file->package().empty()) { + classname = name; + } else { + // Strip the proto package from full_name since we've replaced it with + // the C# namespace. + classname = name.substr(file->package().size() + 1); + } + result += StringReplace(classname, ".", ".Types.", false); + return "global::" + result; +} + +// TODO(jtattermusch): make GetClassName part of libprotoc public API. +// NOTE: Implementation needs to match exactly to GetClassName +// defined in csharp_helpers.h in protoc csharp plugin. +// We cannot reference it directly because google3 protobufs +// don't have a csharp protoc plugin. +std::string GetClassName(const Descriptor* message) { + return ToCSharpName(message->full_name(), message->file()); } std::string GetServiceClassName(const ServiceDescriptor* service) { @@ -114,22 +143,22 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) { if (method->client_streaming()) { return ""; } - return GetMessageType(method->input_type()) + " request, "; + return GetClassName(method->input_type()) + " request, "; } std::string GetMethodReturnTypeClient(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: - return "Task<" + GetMessageType(method->output_type()) + ">"; + return "Task<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_CLIENT_STREAMING: - return "AsyncClientStreamingCall<" + GetMessageType(method->input_type()) - + ", " + GetMessageType(method->output_type()) + ">"; + return "AsyncClientStreamingCall<" + GetClassName(method->input_type()) + + ", " + GetClassName(method->output_type()) + ">"; case METHODTYPE_SERVER_STREAMING: - return "AsyncServerStreamingCall<" + GetMessageType(method->output_type()) + return "AsyncServerStreamingCall<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_BIDI_STREAMING: - return "AsyncDuplexStreamingCall<" + GetMessageType(method->input_type()) - + ", " + GetMessageType(method->output_type()) + ">"; + return "AsyncDuplexStreamingCall<" + GetClassName(method->input_type()) + + ", " + GetClassName(method->output_type()) + ">"; } GOOGLE_LOG(FATAL)<< "Can't get here."; return ""; @@ -139,10 +168,10 @@ std::string GetMethodRequestParamServer(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: case METHODTYPE_SERVER_STREAMING: - return GetMessageType(method->input_type()) + " request"; + return GetClassName(method->input_type()) + " request"; case METHODTYPE_CLIENT_STREAMING: case METHODTYPE_BIDI_STREAMING: - return "IAsyncStreamReader<" + GetMessageType(method->input_type()) + return "IAsyncStreamReader<" + GetClassName(method->input_type()) + "> requestStream"; } GOOGLE_LOG(FATAL)<< "Can't get here."; @@ -153,7 +182,7 @@ std::string GetMethodReturnTypeServer(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: case METHODTYPE_CLIENT_STREAMING: - return "Task<" + GetMessageType(method->output_type()) + ">"; + return "Task<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_SERVER_STREAMING: case METHODTYPE_BIDI_STREAMING: return "Task"; @@ -169,7 +198,7 @@ std::string GetMethodResponseStreamMaybe(const MethodDescriptor *method) { return ""; case METHODTYPE_SERVER_STREAMING: case METHODTYPE_BIDI_STREAMING: - return ", IServerStreamWriter<" + GetMessageType(method->output_type()) + return ", IServerStreamWriter<" + GetClassName(method->output_type()) + "> responseStream"; } GOOGLE_LOG(FATAL)<< "Can't get here."; @@ -202,7 +231,7 @@ void GenerateMarshallerFields(Printer* out, const ServiceDescriptor *service) { out->Print( "static readonly Marshaller<$type$> $fieldname$ = Marshallers.Create((arg) => arg.ToByteArray(), $type$.ParseFrom);\n", "fieldname", GetMarshallerFieldName(message), "type", - GetMessageType(message)); + GetClassName(message)); } out->Print("\n"); } @@ -211,8 +240,8 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) { out->Print( "static readonly Method<$request$, $response$> $fieldname$ = new Method<$request$, $response$>(\n", "fieldname", GetMethodFieldName(method), "request", - GetMessageType(method->input_type()), "response", - GetMessageType(method->output_type())); + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); out->Indent(); out->Indent(); out->Print("$methodtype$,\n", "methodtype", @@ -242,8 +271,8 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) { out->Print( "$response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken));\n", "methodname", method->name(), "request", - GetMessageType(method->input_type()), "response", - GetMessageType(method->output_type())); + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); } std::string method_name = method->name(); @@ -310,8 +339,8 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) { out->Print( "public $response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken))\n", "methodname", method->name(), "request", - GetMessageType(method->input_type()), "response", - GetMessageType(method->output_type())); + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); out->Print("{\n"); out->Indent(); out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n", @@ -466,7 +495,7 @@ grpc::string GetServices(const FileDescriptor *file) { // TODO(jtattermusch): add using for protobuf message classes out.Print("\n"); - out.Print("namespace $namespace$ {\n", "namespace", GetCSharpNamespace(file)); + out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file)); out.Indent(); for (int i = 0; i < file->service_count(); i++) { GenerateService(&out, file->service(i)); diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h index a232fcc9548..7bdaff1c9b0 100644 --- a/src/compiler/generator_helpers.h +++ b/src/compiler/generator_helpers.h @@ -60,21 +60,26 @@ inline grpc::string StripProto(grpc::string filename) { } inline grpc::string StringReplace(grpc::string str, const grpc::string &from, - const grpc::string &to) { + const grpc::string &to, bool replace_all) { size_t pos = 0; - for (;;) { + do { pos = str.find(from, pos); if (pos == grpc::string::npos) { break; } str.replace(pos, from.length(), to); pos += to.length(); - } + } while(replace_all); return str; } +inline grpc::string StringReplace(grpc::string str, const grpc::string &from, + const grpc::string &to) { + return StringReplace(str, from, to, true); +} + inline std::vector tokenize(const grpc::string &input, const grpc::string &delimiters) { std::vector tokens; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 4d1bcad9e27..ab1af0d4eec 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -258,7 +258,6 @@ static void unary_poll_do_promote(void *args, int success) { grpc_pollset *pollset = up_args->pollset; grpc_fd *fd = up_args->fd; int do_shutdown_cb = 0; - gpr_free(up_args); /* * This is quite tricky. There are a number of cases to keep in mind here: @@ -273,8 +272,12 @@ static void unary_poll_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ while (pollset->counter != 0) { grpc_pollset_kick(pollset); - gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future); + grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + gpr_mu_unlock(&pollset->mu); + return; } + + gpr_free(up_args); /* At this point the pollset may no longer be a unary poller. In that case * we should just call the right add function and be done. */ /* TODO(klempner): If we're not careful this could cause infinite recursion. diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 8b1ef0f2be0..ca42eddad00 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { + if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || @@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) { is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client && - call->read_state < READ_STATE_GOT_INITIAL_METADATA); + (call->write_state == WRITE_STATE_INITIAL && !call->is_client); } static void unlock(grpc_call *call) { @@ -601,6 +601,7 @@ static void call_on_done_send(void *pc, int success) { if (!success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } + call->send_ops.nops = 0; call->last_send_contains = 0; call->sending = 0; unlock(call); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 351ed5b7586..d75af7291bb 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_add_callback(kill_zombie, elem); } else if (calld->state == PENDING) { call_list_remove(calld, PENDING_START); + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); } gpr_mu_unlock(&chand->server->mu); break; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 678b3610363..9dc5f233899 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1142,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { if (op->recv_ops) { GPR_ASSERT(s->incoming_sopb == NULL); + GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED); s->recv_done_closure.cb = op->on_done_recv; s->recv_done_closure.user_data = op->recv_user_data; s->incoming_sopb = op->recv_ops; diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj index f7724ea6433..e6abbbfdf01 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj +++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj @@ -10,6 +10,7 @@ Grpc.Auth Grpc.Auth v4.5 + bin\$(Configuration)\Grpc.Auth.Xml true diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec index 28ec93d3c57..85aee355661 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec +++ b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec @@ -22,5 +22,8 @@ + + + diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index eac8d16fb15..62cb4432725 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -34,6 +34,9 @@ ..\packages\NUnit.2.6.4\lib\nunit.framework.dll + + ..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll + @@ -57,7 +60,5 @@ - - - + \ No newline at end of file diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config index c714ef3a23e..28af8d78c6c 100644 --- a/src/csharp/Grpc.Core.Tests/packages.config +++ b/src/csharp/Grpc.Core.Tests/packages.config @@ -1,4 +1,5 @@  + \ No newline at end of file diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index b95776f66d4..d66b0d49749 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -40,33 +40,17 @@ namespace Grpc.Core /// /// Return type for client streaming calls. /// - public sealed class AsyncClientStreamingCall - where TRequest : class - where TResponse : class + public sealed class AsyncClientStreamingCall : IDisposable { readonly IClientStreamWriter requestStream; readonly Task result; + readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter requestStream, Task result) + public AsyncClientStreamingCall(IClientStreamWriter requestStream, Task result, Action disposeAction) { this.requestStream = requestStream; this.result = result; - } - - /// - /// Writes a request to RequestStream. - /// - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// - /// Closes the RequestStream. - /// - public Task Close() - { - return requestStream.Close(); + this.disposeAction = disposeAction; } /// @@ -99,5 +83,16 @@ namespace Grpc.Core { return result.GetAwaiter(); } + + /// + /// Provides means to provide after the call. + /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ee054374167..4c0d5936ac4 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -40,42 +40,17 @@ namespace Grpc.Core /// /// Return type for bidirectional streaming calls. /// - public sealed class AsyncDuplexStreamingCall - where TRequest : class - where TResponse : class + public sealed class AsyncDuplexStreamingCall : IDisposable { readonly IClientStreamWriter requestStream; readonly IAsyncStreamReader responseStream; + readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream) + public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; - } - - /// - /// Writes a request to RequestStream. - /// - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// - /// Closes the RequestStream. - /// - public Task Close() - { - return requestStream.Close(); - } - - /// - /// Reads a response from ResponseStream. - /// - /// - public Task ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// @@ -99,5 +74,16 @@ namespace Grpc.Core return requestStream; } } + + /// + /// Provides means to cleanup after the call. + /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 73b96149850..7a479b9a23d 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -40,23 +40,15 @@ namespace Grpc.Core /// /// Return type for server streaming calls. /// - public sealed class AsyncServerStreamingCall - where TResponse : class + public sealed class AsyncServerStreamingCall : IDisposable { readonly IAsyncStreamReader responseStream; + readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader responseStream) + public AsyncServerStreamingCall(IAsyncStreamReader responseStream, Action disposeAction) { this.responseStream = responseStream; - } - - /// - /// Reads the next response from ResponseStream - /// - /// - public Task ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// @@ -69,5 +61,16 @@ namespace Grpc.Core return responseStream; } } + + /// + /// Provides means to cleanup after the call. + /// If the call has already finished normally (response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index d1ee59ff0a0..37b452f020d 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Abstraction of a call to be invoked on a client. /// public class Call - where TRequest : class - where TResponse : class { readonly string name; readonly Marshaller requestMarshaller; diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index ba42a2d4f87..9f8baac6841 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -73,7 +73,7 @@ namespace Grpc.Core asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream(asyncCall); - return new AsyncServerStreamingCall(responseStream); + return new AsyncServerStreamingCall(responseStream, asyncCall.Cancel); } public static AsyncClientStreamingCall AsyncClientStreamingCall(Call call, CancellationToken token) @@ -85,7 +85,7 @@ namespace Grpc.Core var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream(asyncCall); - return new AsyncClientStreamingCall(requestStream, resultTask); + return new AsyncClientStreamingCall(requestStream, resultTask, asyncCall.Cancel); } public static AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Call call, CancellationToken token) @@ -98,7 +98,7 @@ namespace Grpc.Core RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream(asyncCall); var responseStream = new ClientResponseStream(asyncCall); - return new AsyncDuplexStreamingCall(requestStream, responseStream); + return new AsyncDuplexStreamingCall(requestStream, responseStream, asyncCall.Cancel); } private static void RegisterCancellationCallback(AsyncCall asyncCall, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index f5f2cf5f220..fe2d446a35d 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -13,6 +13,7 @@ Grpc.Core v4.5 8bb563fb + bin\$(Configuration)\Grpc.Core.Xml true @@ -37,6 +38,9 @@ ..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll + + ..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll + diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index e54908cb8ba..69e8497bb70 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -16,10 +16,14 @@ gRPC RPC Protocol HTTP/2 - + + + + + diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 699741cd054..371fbf27ce5 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,13 +43,8 @@ namespace Grpc.Core /// A stream of messages to be read. /// /// - public interface IAsyncStreamReader - where T : class + public interface IAsyncStreamReader : IAsyncEnumerator { - /// - /// Reads a single message. Returns null if the last message was already read. - /// A following read can only be started when the previous one finishes. - /// - Task ReadNext(); + // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 4bd8bfb8df2..2000210252d 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -44,12 +44,11 @@ namespace Grpc.Core /// /// public interface IAsyncStreamWriter - where T : class { /// - /// Writes a single message. Only one write can be pending at a time. + /// Writes a single asynchronously. Only one write can be pending at a time. /// /// the message to be written. Cannot be null. - Task Write(T message); + Task WriteAsync(T message); } } diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs index 0847a928e6c..a3028bc3741 100644 --- a/src/csharp/Grpc.Core/IClientStreamWriter.cs +++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs @@ -44,11 +44,10 @@ namespace Grpc.Core /// /// public interface IClientStreamWriter : IAsyncStreamWriter - where T : class { /// - /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this. + /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this. /// - Task Close(); + Task CompleteAsync(); } } diff --git a/src/csharp/Grpc.Core/IServerStreamWriter.cs b/src/csharp/Grpc.Core/IServerStreamWriter.cs index 199a585a3fe..9f3af59109b 100644 --- a/src/csharp/Grpc.Core/IServerStreamWriter.cs +++ b/src/csharp/Grpc.Core/IServerStreamWriter.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A writable stream of messages that is used in server-side handlers. /// public interface IServerStreamWriter : IAsyncStreamWriter - where T : class { + // TODO(jtattermusch): consider just using IAsyncStreamWriter instead of this interface. } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 16970587328..58f493463be 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -38,8 +38,6 @@ namespace Grpc.Core.Internal /// Writes requests asynchronously to an underlying AsyncCall object. /// internal class ClientRequestStream : IClientStreamWriter - where TRequest : class - where TResponse : class { readonly AsyncCall call; @@ -48,14 +46,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TRequest message) + public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task Close() + public Task CompleteAsync() { var taskSource = new AsyncCompletionTaskSource(); call.StartSendCloseFromClient(taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index b2378cade66..6c445210381 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCall call; + TResponse current; public ClientResponseStream(AsyncCall call) { this.call = call; } - public Task ReadNext() + public TResponse Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 95d8e978692..f494d9e0ffc 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -71,12 +72,13 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context var result = await handler(context, request); - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (Exception e) { @@ -85,7 +87,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -122,9 +124,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context await handler(context, request, responseStream); @@ -137,7 +140,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -178,7 +181,7 @@ namespace Grpc.Core.Internal var result = await handler(context, requestStream); try { - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (OperationCanceledException) { @@ -193,7 +196,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -240,7 +243,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -263,7 +266,7 @@ namespace Grpc.Core.Internal var requestStream = new ServerRequestStream(asyncCall); var responseStream = new ServerResponseStream(asyncCall); - await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. await requestStream.ToList(); await finishedTask; diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b2..3fccb88abba 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer call; + TRequest current; public ServerRequestStream(AsyncCallServer call) { this.call = call; } - public Task ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index da688d504f2..a2d77dd5b7b 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -49,14 +49,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TResponse message) + public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task WriteStatus(Status status) + public Task WriteStatusAsync(Status status) { var taskSource = new AsyncCompletionTaskSource(); call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 731ea2be81b..7a1c016ae20 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -39,9 +39,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - //internal delegate void ServerShutdownCallbackDelegate(bool success); - /// /// grpc_server from grpc/grpc.h /// diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index e873b3e88a7..bc9a499c518 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -42,7 +42,6 @@ namespace Grpc.Core /// public sealed class ServerCallContext { - // TODO(jtattermusch): add cancellationToken // TODO(jtattermusch): add deadline info diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs index f915155f8a4..8a748b45a81 100644 --- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs +++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs @@ -49,14 +49,9 @@ namespace Grpc.Core.Utils public static async Task ForEach(this IAsyncStreamReader streamReader, Func asyncAction) where T : class { - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - await asyncAction(elem); + await asyncAction(streamReader.Current); } } @@ -67,32 +62,27 @@ namespace Grpc.Core.Utils where T : class { var result = new List(); - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - result.Add(elem); + result.Add(streamReader.Current); } return result; } /// /// Writes all elements from given enumerable to the stream. - /// Closes the stream afterwards unless close = false. + /// Completes the stream afterwards unless close = false. /// - public static async Task WriteAll(this IClientStreamWriter streamWriter, IEnumerable elements, bool close = true) + public static async Task WriteAll(this IClientStreamWriter streamWriter, IEnumerable elements, bool complete = true) where T : class { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } - if (close) + if (complete) { - await streamWriter.Close(); + await streamWriter.CompleteAsync(); } } @@ -104,7 +94,7 @@ namespace Grpc.Core.Utils { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } } } diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index 71967de56e5..fb7eaaeeda1 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -2,5 +2,6 @@ + \ No newline at end of file diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index 87ccf07dd8b..6e84add42ba 100644 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -37,6 +37,10 @@ ..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll + + False + ..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll + diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 4997d3aa42d..5aa6f4162d4 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -96,7 +96,19 @@ namespace math.Tests Assert.AreEqual(0, response.Remainder); } - // TODO(jtattermusch): test division by zero + [Test] + public void DivByZero() + { + try + { + DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); + } + } [Test] public void DivAsync() @@ -114,11 +126,12 @@ namespace math.Tests { Task.Run(async () => { - var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build()); - - var responses = await call.ResponseStream.ToList(); - CollectionAssert.AreEqual(new List { 1, 1, 2, 3, 5, 8 }, - responses.ConvertAll((n) => n.Num_)); + using (var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build())) + { + var responses = await call.ResponseStream.ToList(); + CollectionAssert.AreEqual(new List { 1, 1, 2, 3, 5, 8 }, + responses.ConvertAll((n) => n.Num_)); + } }).Wait(); } @@ -128,13 +141,15 @@ namespace math.Tests { Task.Run(async () => { - var call = client.Sum(); - var numbers = new List { 10, 20, 30 }.ConvertAll( - n => Num.CreateBuilder().SetNum_(n).Build()); + using (var call = client.Sum()) + { + var numbers = new List { 10, 20, 30 }.ConvertAll( + n => Num.CreateBuilder().SetNum_(n).Build()); - await call.RequestStream.WriteAll(numbers); - var result = await call.Result; - Assert.AreEqual(60, result.Num_); + await call.RequestStream.WriteAll(numbers); + var result = await call.Result; + Assert.AreEqual(60, result.Num_); + } }).Wait(); } @@ -150,12 +165,14 @@ namespace math.Tests new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() }; - var call = client.DivMany(); - await call.RequestStream.WriteAll(divArgsList); - var result = await call.ResponseStream.ToList(); + using (var call = client.DivMany()) + { + await call.RequestStream.WriteAll(divArgsList); + var result = await call.ResponseStream.ToList(); - CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); - CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder)); + CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); + CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder)); + } }).Wait(); } } diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config index 4d6ec63b3cd..cc6e9af40f6 100644 --- a/src/csharp/Grpc.Examples.Tests/packages.config +++ b/src/csharp/Grpc.Examples.Tests/packages.config @@ -1,5 +1,6 @@  + \ No newline at end of file diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj index 2c5019c214b..5ce490f4034 100644 --- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj +++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj @@ -35,6 +35,9 @@ ..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll + + ..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll + diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index ab06a44c0d8..d2cfbee18f7 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -51,18 +51,13 @@ namespace math Console.WriteLine("DivAsync Result: " + result); } - public static async Task DivAsyncWithCancellationExample(Math.IMathClient stub) - { - Task resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = await resultTask; - Console.WriteLine(result); - } - public static async Task FibExample(Math.IMathClient stub) { - var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build()); - List result = await call.ResponseStream.ToList(); - Console.WriteLine("Fib Result: " + string.Join("|", result)); + using (var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build())) + { + List result = await call.ResponseStream.ToList(); + Console.WriteLine("Fib Result: " + string.Join("|", result)); + } } public static async Task SumExample(Math.IMathClient stub) @@ -74,9 +69,11 @@ namespace math new Num.Builder { Num_ = 3 }.Build() }; - var call = stub.Sum(); - await call.RequestStream.WriteAll(numbers); - Console.WriteLine("Sum Result: " + await call.Result); + using (var call = stub.Sum()) + { + await call.RequestStream.WriteAll(numbers); + Console.WriteLine("Sum Result: " + await call.Result); + } } public static async Task DivManyExample(Math.IMathClient stub) @@ -87,9 +84,11 @@ namespace math new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() }; - var call = stub.DivMany(); - await call.RequestStream.WriteAll(divArgsList); - Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList())); + using (var call = stub.DivMany()) + { + await call.RequestStream.WriteAll(divArgsList); + Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList())); + } } public static async Task DependendRequestsExample(Math.IMathClient stub) @@ -101,9 +100,12 @@ namespace math new Num.Builder { Num_ = 3 }.Build() }; - var sumCall = stub.Sum(); - await sumCall.RequestStream.WriteAll(numbers); - Num sum = await sumCall.Result; + Num sum; + using (var sumCall = stub.Sum()) + { + await sumCall.RequestStream.WriteAll(numbers); + sum = await sumCall.Result; + } DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build()); Console.WriteLine("Avg Result: " + result); diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index 2546fd220de..b9efc44e8c1 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -12,30 +12,30 @@ namespace math { { static readonly string __ServiceName = "math.Math"; - static readonly Marshaller __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom); - static readonly Marshaller __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom); - static readonly Marshaller __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom); - static readonly Marshaller __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom); + static readonly Marshaller __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivArgs.ParseFrom); + static readonly Marshaller __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivReply.ParseFrom); + static readonly Marshaller __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.FibArgs.ParseFrom); + static readonly Marshaller __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), global::math.Num.ParseFrom); - static readonly Method __Method_Div = new Method( + static readonly Method __Method_Div = new Method( MethodType.Unary, "Div", __Marshaller_DivArgs, __Marshaller_DivReply); - static readonly Method __Method_DivMany = new Method( + static readonly Method __Method_DivMany = new Method( MethodType.DuplexStreaming, "DivMany", __Marshaller_DivArgs, __Marshaller_DivReply); - static readonly Method __Method_Fib = new Method( + static readonly Method __Method_Fib = new Method( MethodType.ServerStreaming, "Fib", __Marshaller_FibArgs, __Marshaller_Num); - static readonly Method __Method_Sum = new Method( + static readonly Method __Method_Sum = new Method( MethodType.ClientStreaming, "Sum", __Marshaller_Num, @@ -44,20 +44,20 @@ namespace math { // client-side stub interface public interface IMathClient { - DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)); - Task DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall DivMany(CancellationToken token = default(CancellationToken)); - AsyncServerStreamingCall Fib(FibArgs request, CancellationToken token = default(CancellationToken)); - AsyncClientStreamingCall Sum(CancellationToken token = default(CancellationToken)); + global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken)); + Task DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall DivMany(CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall Sum(CancellationToken token = default(CancellationToken)); } // server-side interface public interface IMath { - Task Div(ServerCallContext context, DivArgs request); - Task DivMany(ServerCallContext context, IAsyncStreamReader requestStream, IServerStreamWriter responseStream); - Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter responseStream); - Task Sum(ServerCallContext context, IAsyncStreamReader requestStream); + Task Div(ServerCallContext context, global::math.DivArgs request); + Task DivMany(ServerCallContext context, IAsyncStreamReader requestStream, IServerStreamWriter responseStream); + Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter responseStream); + Task Sum(ServerCallContext context, IAsyncStreamReader requestStream); } // client stub @@ -69,27 +69,27 @@ namespace math { public MathClient(Channel channel, StubConfiguration config) : base(channel, config) { } - public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)) + public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div); return Calls.BlockingUnaryCall(call, request, token); } - public Task DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)) + public Task DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div); return Calls.AsyncUnaryCall(call, request, token); } - public AsyncDuplexStreamingCall DivMany(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall DivMany(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_DivMany); return Calls.AsyncDuplexStreamingCall(call, token); } - public AsyncServerStreamingCall Fib(FibArgs request, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Fib); return Calls.AsyncServerStreamingCall(call, request, token); } - public AsyncClientStreamingCall Sum(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall Sum(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Sum); return Calls.AsyncClientStreamingCall(call, token); diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 3b33b09bbda..e247ac9d735 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -62,7 +62,7 @@ namespace math { foreach (var num in FibInternal(request.Limit)) { - await responseStream.Write(num); + await responseStream.WriteAsync(num); } } } @@ -81,7 +81,7 @@ namespace math { await requestStream.ForEach(async divArgs => { - await responseStream.Write(DivInternal(divArgs)); + await responseStream.WriteAsync(DivInternal(divArgs)); }); } diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config index 51c17bcd5e7..4c8d60fa62a 100644 --- a/src/csharp/Grpc.Examples/packages.config +++ b/src/csharp/Grpc.Examples/packages.config @@ -1,5 +1,6 @@  + \ No newline at end of file diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 1ca3dd24e1c..b3a0a2917be 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -54,6 +54,9 @@ ..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll + + ..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll + diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 02f8a369def..dfaf18cae13 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -213,11 +213,13 @@ namespace Grpc.IntegrationTesting var bodySizes = new List { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build()); - var call = client.StreamingInputCall(); - await call.RequestStream.WriteAll(bodySizes); + using (var call = client.StreamingInputCall()) + { + await call.RequestStream.WriteAll(bodySizes); - var response = await call.Result; - Assert.AreEqual(74922, response.AggregatedPayloadSize); + var response = await call.Result; + Assert.AreEqual(74922, response.AggregatedPayloadSize); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -236,14 +238,15 @@ namespace Grpc.IntegrationTesting (size) => ResponseParameters.CreateBuilder().SetSize(size).Build())) .Build(); - var call = client.StreamingOutputCall(request); - - var responseList = await call.ResponseStream.ToList(); - foreach (var res in responseList) + using (var call = client.StreamingOutputCall(request)) { - Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + var responseList = await call.ResponseStream.ToList(); + foreach (var res in responseList) + { + Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + } + CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); } - CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); Console.WriteLine("Passed!"); }).Wait(); } @@ -254,51 +257,48 @@ namespace Grpc.IntegrationTesting { Console.WriteLine("running ping_pong"); - var call = client.FullDuplexCall(); - - StreamingOutputCallResponse response; - - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) - .SetPayload(CreateZerosPayload(27182)).Build()); - - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + using (var call = client.FullDuplexCall()) + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) - .SetPayload(CreateZerosPayload(8)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(9, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) + .SetPayload(CreateZerosPayload(8)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) - .SetPayload(CreateZerosPayload(1828)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(2653, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) + .SetPayload(CreateZerosPayload(1828)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) - .SetPayload(CreateZerosPayload(45904)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(58979, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) + .SetPayload(CreateZerosPayload(45904)).Build()); - await call.RequestStream.Close(); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(null, response); + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -308,12 +308,13 @@ namespace Grpc.IntegrationTesting Task.Run(async () => { Console.WriteLine("running empty_stream"); - var call = client.FullDuplexCall(); - await call.Close(); - - var responseList = await call.ResponseStream.ToList(); - Assert.AreEqual(0, responseList.Count); + using (var call = client.FullDuplexCall()) + { + await call.RequestStream.CompleteAsync(); + var responseList = await call.ResponseStream.ToList(); + Assert.AreEqual(0, responseList.Count); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -365,19 +366,21 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running cancel_after_begin"); var cts = new CancellationTokenSource(); - var call = client.StreamingInputCall(cts.Token); - // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. - await Task.Delay(1000); - cts.Cancel(); - - try + using (var call = client.StreamingInputCall(cts.Token)) { - var response = await call.Result; - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. + await Task.Delay(1000); + cts.Cancel(); + + try + { + var response = await call.Result; + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } } Console.WriteLine("Passed!"); }).Wait(); @@ -390,29 +393,28 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running cancel_after_first_response"); var cts = new CancellationTokenSource(); - var call = client.FullDuplexCall(cts.Token); - - StreamingOutputCallResponse response; - - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) - .SetPayload(CreateZerosPayload(27182)).Build()); + using (var call = client.FullDuplexCall(cts.Token)) + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); - cts.Cancel(); + cts.Cancel(); - try - { - response = await call.ResponseStream.ReadNext(); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + try + { + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } } Console.WriteLine("Passed!"); }).Wait(); diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index 679aafb57a9..ee077f9f56f 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -12,45 +12,45 @@ namespace grpc.testing { { static readonly string __ServiceName = "grpc.testing.TestService"; - static readonly Marshaller __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom); - static readonly Marshaller __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom); - static readonly Marshaller __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom); - static readonly Marshaller __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallRequest.ParseFrom); - static readonly Marshaller __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallResponse.ParseFrom); - static readonly Marshaller __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallRequest.ParseFrom); - static readonly Marshaller __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallResponse.ParseFrom); + static readonly Marshaller __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.Empty.ParseFrom); + static readonly Marshaller __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleRequest.ParseFrom); + static readonly Marshaller __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleResponse.ParseFrom); + static readonly Marshaller __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallRequest.ParseFrom); + static readonly Marshaller __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallResponse.ParseFrom); + static readonly Marshaller __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallRequest.ParseFrom); + static readonly Marshaller __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallResponse.ParseFrom); - static readonly Method __Method_EmptyCall = new Method( + static readonly Method __Method_EmptyCall = new Method( MethodType.Unary, "EmptyCall", __Marshaller_Empty, __Marshaller_Empty); - static readonly Method __Method_UnaryCall = new Method( + static readonly Method __Method_UnaryCall = new Method( MethodType.Unary, "UnaryCall", __Marshaller_SimpleRequest, __Marshaller_SimpleResponse); - static readonly Method __Method_StreamingOutputCall = new Method( + static readonly Method __Method_StreamingOutputCall = new Method( MethodType.ServerStreaming, "StreamingOutputCall", __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); - static readonly Method __Method_StreamingInputCall = new Method( + static readonly Method __Method_StreamingInputCall = new Method( MethodType.ClientStreaming, "StreamingInputCall", __Marshaller_StreamingInputCallRequest, __Marshaller_StreamingInputCallResponse); - static readonly Method __Method_FullDuplexCall = new Method( + static readonly Method __Method_FullDuplexCall = new Method( MethodType.DuplexStreaming, "FullDuplexCall", __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); - static readonly Method __Method_HalfDuplexCall = new Method( + static readonly Method __Method_HalfDuplexCall = new Method( MethodType.DuplexStreaming, "HalfDuplexCall", __Marshaller_StreamingOutputCallRequest, @@ -59,25 +59,25 @@ namespace grpc.testing { // client-side stub interface public interface ITestServiceClient { - Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)); - Task EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)); - SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)); - Task UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - AsyncServerStreamingCall StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); - AsyncClientStreamingCall StreamingInputCall(CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall FullDuplexCall(CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall HalfDuplexCall(CancellationToken token = default(CancellationToken)); + global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)); + Task EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)); + global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)); + Task UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall StreamingInputCall(CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall FullDuplexCall(CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall HalfDuplexCall(CancellationToken token = default(CancellationToken)); } // server-side interface public interface ITestService { - Task EmptyCall(ServerCallContext context, Empty request); - Task UnaryCall(ServerCallContext context, SimpleRequest request); - Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter responseStream); - Task StreamingInputCall(ServerCallContext context, IAsyncStreamReader requestStream); - Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader requestStream, IServerStreamWriter responseStream); - Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader requestStream, IServerStreamWriter responseStream); + Task EmptyCall(ServerCallContext context, global::grpc.testing.Empty request); + Task UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request); + Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter responseStream); + Task StreamingInputCall(ServerCallContext context, IAsyncStreamReader requestStream); + Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader requestStream, IServerStreamWriter responseStream); + Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader requestStream, IServerStreamWriter responseStream); } // client stub @@ -89,42 +89,42 @@ namespace grpc.testing { public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config) { } - public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)) + public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall); return Calls.BlockingUnaryCall(call, request, token); } - public Task EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)) + public Task EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall); return Calls.AsyncUnaryCall(call, request, token); } - public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)) + public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall); return Calls.BlockingUnaryCall(call, request, token); } - public Task UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)) + public Task UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall); return Calls.AsyncUnaryCall(call, request, token); } - public AsyncServerStreamingCall StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_StreamingOutputCall); return Calls.AsyncServerStreamingCall(call, request, token); } - public AsyncClientStreamingCall StreamingInputCall(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall StreamingInputCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_StreamingInputCall); return Calls.AsyncClientStreamingCall(call, token); } - public AsyncDuplexStreamingCall FullDuplexCall(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall FullDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_FullDuplexCall); return Calls.AsyncDuplexStreamingCall(call, token); } - public AsyncDuplexStreamingCall HalfDuplexCall(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall HalfDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_HalfDuplexCall); return Calls.AsyncDuplexStreamingCall(call, token); diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs index d6ba61ef82a..6bd997d1f46 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -64,7 +64,7 @@ namespace grpc.testing { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - await responseStream.Write(response); + await responseStream.WriteAsync(response); } } @@ -86,7 +86,7 @@ namespace grpc.testing { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - await responseStream.Write(response); + await responseStream.WriteAsync(response); } }); } diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config index e33b6e3e464..291b7b8599a 100644 --- a/src/csharp/Grpc.IntegrationTesting/packages.config +++ b/src/csharp/Grpc.IntegrationTesting/packages.config @@ -3,6 +3,7 @@ + diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index fe7e0a495f5..7cb78bddf40 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -11,8 +11,8 @@ endlocal @call buildall.bat || goto :error %NUGET% pack ..\..\vsprojects\nuget_package\grpc.native.csharp_ext.nuspec || goto :error -%NUGET% pack Grpc.Core\Grpc.Core.nuspec || goto :error -%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec || goto :error +%NUGET% pack Grpc.Core\Grpc.Core.nuspec -Symbols || goto :error +%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec -Symbols || goto :error %NUGET% pack Grpc.nuspec || goto :error goto :EOF diff --git a/src/node/bin/README.md b/src/node/bin/README.md new file mode 100644 index 00000000000..2f856e428ef --- /dev/null +++ b/src/node/bin/README.md @@ -0,0 +1,16 @@ +# Command Line Tools + +# Service Packager + +The command line tool `bin/service_packager`, when called with the following command line: + +```bash +service_packager proto_file -o output_path -n name -v version [-i input_path...] +``` + +Populates `output_path` with a node package consisting of a `package.json` populated with `name` and `version`, an `index.js`, a `LICENSE` file copied from gRPC, and a `service.json`, which is compiled from `proto_file` and the given `input_path`s. `require('output_path')` returns an object that is equivalent to + +```js +{ client: require('grpc').load('service.json'), + auth: require('google-auth-library') } +``` diff --git a/src/node/bin/service_packager b/src/node/bin/service_packager new file mode 100755 index 00000000000..c7f24609977 --- /dev/null +++ b/src/node/bin/service_packager @@ -0,0 +1,2 @@ +#!/usr/bin/env node +require(__dirname+'/../cli/service_packager.js').main(process.argv.slice(2)); \ No newline at end of file diff --git a/src/node/cli/service_packager.js b/src/node/cli/service_packager.js new file mode 100644 index 00000000000..f29180b2526 --- /dev/null +++ b/src/node/cli/service_packager.js @@ -0,0 +1,142 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +'use strict'; + +var fs = require('fs'); +var path = require('path'); + +var _ = require('underscore'); +var async = require('async'); +var pbjs = require('protobufjs/cli/pbjs'); +var parseArgs = require('minimist'); +var Mustache = require('mustache'); + +var package_json = require('../package.json'); + +var template_path = path.resolve(__dirname, 'service_packager'); + +var package_tpl_path = path.join(template_path, 'package.json.template'); + +var arg_format = { + string: ['include', 'out', 'name', 'version'], + alias: { + include: 'i', + out: 'o', + name: 'n', + version: 'v' + } +}; + +// TODO(mlumish): autogenerate README.md from proto file + +/** + * Render package.json file from template using provided parameters. + * @param {Object} params Map of parameter names to values + * @param {function(Error, string)} callback Callback to pass rendered template + * text to + */ +function generatePackage(params, callback) { + fs.readFile(package_tpl_path, {encoding: 'utf-8'}, function(err, template) { + if (err) { + callback(err); + } else { + var rendered = Mustache.render(template, params); + callback(null, rendered); + } + }); +} + +/** + * Copy a file + * @param {string} src_path The filepath to copy from + * @param {string} dest_path The filepath to copy to + */ +function copyFile(src_path, dest_path) { + fs.createReadStream(src_path).pipe(fs.createWriteStream(dest_path)); +} + +/** + * Run the script. Copies the index.js and LICENSE files to the output path, + * renders the package.json template to the output path, and generates a + * service.json file from the input proto files using pbjs. The arguments are + * taken directly from the command line, and handled as follows: + * -i (--include) : An include path for pbjs (can be dpulicated) + * -o (--output): The output path + * -n (--name): The name of the package + * -v (--version): The package version + * @param {Array} argv The argument vector + */ +function main(argv) { + var args = parseArgs(argv, arg_format); + var out_path = path.resolve(args.out); + var include_dirs = []; + if (args.include) { + include_dirs = _.map(_.flatten([args.include]), function(p) { + return path.resolve(p); + }); + } + args.grpc_version = package_json.version; + generatePackage(args, function(err, rendered) { + if (err) throw err; + fs.writeFile(path.join(out_path, 'package.json'), rendered, function(err) { + if (err) throw err; + }); + }); + copyFile(path.join(template_path, 'index.js'), + path.join(out_path, 'index.js')); + copyFile(path.join(__dirname, '..', 'LICENSE'), + path.join(out_path, 'LICENSE')); + + var service_stream = fs.createWriteStream(path.join(out_path, + 'service.json')); + var pbjs_args = _.flatten(['node', 'pbjs', + args._[0], + '-legacy', + _.map(include_dirs, function(dir) { + return "-path=" + dir; + })]); + var old_stdout = process.stdout; + process.__defineGetter__('stdout', function() { + return service_stream; + }); + var pbjs_status = pbjs.main(pbjs_args); + process.__defineGetter__('stdout', function() { + return old_stdout; + }); + if (pbjs_status !== pbjs.STATUS_OK) { + throw new Error('pbjs failed with status code ' + pbjs_status); + } +} + +exports.main = main; diff --git a/src/node/cli/service_packager/index.js b/src/node/cli/service_packager/index.js new file mode 100644 index 00000000000..811e08b89ac --- /dev/null +++ b/src/node/cli/service_packager/index.js @@ -0,0 +1,36 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +var grpc = require('grpc'); +exports.client = grpc.load(__dirname + '/service.json', 'json'); +exports.auth = require('google-auth-library'); diff --git a/src/node/cli/service_packager/package.json.template b/src/node/cli/service_packager/package.json.template new file mode 100644 index 00000000000..9f9019172e0 --- /dev/null +++ b/src/node/cli/service_packager/package.json.template @@ -0,0 +1,17 @@ +{ + "name": "{{{name}}}", + "version": "{{{version}}}", + "author": "Google Inc.", + "description": "Client library for {{{name}}} built on gRPC", + "license": "Apache-2.0", + "dependencies": { + "grpc": "{{{grpc_version}}}", + "google-auth-library": "^0.9.2" + }, + "main": "index.js", + "files": [ + "LICENSE", + "index.js", + "service.json" + ] +} diff --git a/src/node/package.json b/src/node/package.json index 8d413c3ffaa..3f31ba49ffe 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -36,6 +36,7 @@ "jshint": "^2.5.0", "minimist": "^1.1.0", "mocha": "~1.21.0", + "mustache": "^2.0.0", "strftime": "^0.8.2" }, "engines": { @@ -46,6 +47,8 @@ "README.md", "index.js", "binding.gyp", + "bin", + "cli", "examples", "ext", "interop", diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m index 7e063fddb4e..8e0e11d23da 100644 --- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m +++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m @@ -269,4 +269,37 @@ [self waitForExpectationsWithTimeout:1 handler:nil]; } +- (void)testCancelAfterFirstResponseRPC { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterFirstResponse"]; + + // A buffered pipe to which we write a single value but never close + GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; + + __block BOOL receivedResponse = NO; + + id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782 + requestedResponseSize:@31415]; + + [requestsBuffer writeValue:request]; + + __block ProtoRPC *call = [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer + handler:^(BOOL done, + RMTStreamingOutputCallResponse *response, + NSError *error) { + if (receivedResponse) { + XCTAssert(done, @"Unexpected extra response %@", response); + XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); + [expectation fulfill]; + } else { + XCTAssertNil(error, @"Finished with unexpected error: %@", error); + XCTAssertFalse(done, @"Finished without response"); + XCTAssertNotNil(response); + receivedResponse = YES; + [call cancel]; + } + }]; + [call start]; + [self waitForExpectationsWithTimeout:4 handler:nil]; +} + @end diff --git a/src/python/src/grpc/framework/face/_calls.py b/src/python/src/grpc/framework/face/_calls.py index 75a550e3c7d..87edeb0f0e5 100644 --- a/src/python/src/grpc/framework/face/_calls.py +++ b/src/python/src/grpc/framework/face/_calls.py @@ -248,7 +248,7 @@ class _OperationFuture(future.Future): """See future.Future.add_done_callback for specification.""" with self._condition: if self._callbacks is not None: - self._callbacks.add(fn) + self._callbacks.append(fn) return callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self) diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb index 6d69b0f21e5..a039d036ace 100755 --- a/src/ruby/bin/apis/pubsub_demo.rb +++ b/src/ruby/bin/apis/pubsub_demo.rb @@ -79,7 +79,7 @@ end def publisher_stub(opts) address = "#{opts.host}:#{opts.port}" stub_clz = Tech::Pubsub::PublisherService::Stub # shorter - logger.info("... access PublisherService at #{address}") + GRPC.logger.info("... access PublisherService at #{address}") stub_clz.new(address, creds: ssl_creds, update_metadata: auth_proc(opts), GRPC::Core::Channel::SSL_TARGET => opts.host) @@ -89,7 +89,7 @@ end def subscriber_stub(opts) address = "#{opts.host}:#{opts.port}" stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter - logger.info("... access SubscriberService at #{address}") + GRPC.logger.info("... access SubscriberService at #{address}") stub_clz.new(address, creds: ssl_creds, update_metadata: auth_proc(opts), GRPC::Core::Channel::SSL_TARGET => opts.host) diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index a3889247225..8df03ffb3cd 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -70,7 +70,7 @@ end # loads the certificates used to access the test server securely. def load_prod_cert fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil? - logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}") + GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}") File.open(ENV['SSL_CERT_FILE']).read end @@ -115,10 +115,10 @@ def create_stub(opts) stub_opts[:update_metadata] = auth_creds.updater_proc end - logger.info("... connecting securely to #{address}") + GRPC.logger.info("... connecting securely to #{address}") Grpc::Testing::TestService::Stub.new(address, **stub_opts) else - logger.info("... connecting insecurely to #{address}") + GRPC.logger.info("... connecting insecurely to #{address}") Grpc::Testing::TestService::Stub.new(address) end end diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 72570d92f3c..78cb8dd8364 100755 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -129,13 +129,13 @@ class TestTarget < Grpc::Testing::TestService::Service Thread.new do begin reqs.each do |req| - logger.info("read #{req.inspect}") + GRPC.logger.info("read #{req.inspect}") resp_size = req.response_parameters[0].size resp = cls.new(payload: Payload.new(type: req.response_type, body: nulls(resp_size))) q.push(resp) end - logger.info('finished reads') + GRPC.logger.info('finished reads') q.push(self) rescue StandardError => e q.push(e) # share the exception with the enumerator @@ -179,10 +179,10 @@ def main s = GRPC::RpcServer.new if opts['secure'] s.add_http2_port(host, test_server_creds) - logger.info("... running securely on #{host}") + GRPC.logger.info("... running securely on #{host}") else s.add_http2_port(host) - logger.info("... running insecurely on #{host}") + GRPC.logger.info("... running insecurely on #{host}") end s.handle(TestTarget) s.run_till_terminated diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb index db254efb002..6319cda3091 100755 --- a/src/ruby/bin/math_client.rb +++ b/src/ruby/bin/math_client.rb @@ -46,51 +46,51 @@ require 'optparse' include GRPC::Core::TimeConsts def do_div(stub) - logger.info('request_response') - logger.info('----------------') + GRPC.logger.info('request_response') + GRPC.logger.info('----------------') req = Math::DivArgs.new(dividend: 7, divisor: 3) - logger.info("div(7/3): req=#{req.inspect}") + GRPC.logger.info("div(7/3): req=#{req.inspect}") resp = stub.div(req, INFINITE_FUTURE) - logger.info("Answer: #{resp.inspect}") - logger.info('----------------') + GRPC.logger.info("Answer: #{resp.inspect}") + GRPC.logger.info('----------------') end def do_sum(stub) # to make client streaming requests, pass an enumerable of the inputs - logger.info('client_streamer') - logger.info('---------------') + GRPC.logger.info('client_streamer') + GRPC.logger.info('---------------') reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) } - logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}") + GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}") resp = stub.sum(reqs) # reqs.is_a?(Enumerable) - logger.info("Answer: #{resp.inspect}") - logger.info('---------------') + GRPC.logger.info("Answer: #{resp.inspect}") + GRPC.logger.info('---------------') end def do_fib(stub) - logger.info('server_streamer') - logger.info('----------------') + GRPC.logger.info('server_streamer') + GRPC.logger.info('----------------') req = Math::FibArgs.new(limit: 11) - logger.info("fib(11): req=#{req.inspect}") + GRPC.logger.info("fib(11): req=#{req.inspect}") resp = stub.fib(req, INFINITE_FUTURE) resp.each do |r| - logger.info("Answer: #{r.inspect}") + GRPC.logger.info("Answer: #{r.inspect}") end - logger.info('----------------') + GRPC.logger.info('----------------') end def do_div_many(stub) - logger.info('bidi_streamer') - logger.info('-------------') + GRPC.logger.info('bidi_streamer') + GRPC.logger.info('-------------') reqs = [] reqs << Math::DivArgs.new(dividend: 7, divisor: 3) reqs << Math::DivArgs.new(dividend: 5, divisor: 2) reqs << Math::DivArgs.new(dividend: 7, divisor: 2) - logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") + GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") resp = stub.div_many(reqs, 10) resp.each do |r| - logger.info("Answer: #{r.inspect}") + GRPC.logger.info("Answer: #{r.inspect}") end - logger.info('----------------') + GRPC.logger.info('----------------') end def load_test_certs @@ -132,10 +132,10 @@ def main p stub_opts p options['host'] stub = Math::Math::Stub.new(options['host'], **stub_opts) - logger.info("... connecting securely on #{options['host']}") + GRPC.logger.info("... connecting securely on #{options['host']}") else stub = Math::Math::Stub.new(options['host']) - logger.info("... connecting insecurely on #{options['host']}") + GRPC.logger.info("... connecting insecurely on #{options['host']}") end do_div(stub) diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb index e46d9c671f9..b41ccf6ce16 100755 --- a/src/ruby/bin/math_server.rb +++ b/src/ruby/bin/math_server.rb @@ -128,13 +128,13 @@ class Calculator < Math::Math::Service t = Thread.new do begin requests.each do |req| - logger.info("read #{req.inspect}") + GRPC.logger.info("read #{req.inspect}") resp = Math::DivReply.new(quotient: req.dividend / req.divisor, remainder: req.dividend % req.divisor) q.push(resp) Thread.pass # let the internal Bidi threads run end - logger.info('finished reads') + GRPC.logger.info('finished reads') q.push(self) rescue StandardError => e q.push(e) # share the exception with the enumerator @@ -176,10 +176,10 @@ def main s = GRPC::RpcServer.new if options['secure'] s.add_http2_port(options['host'], test_server_creds) - logger.info("... running securely on #{options['host']}") + GRPC.logger.info("... running securely on #{options['host']}") else s.add_http2_port(options['host']) - logger.info("... running insecurely on #{options['host']}") + GRPC.logger.info("... running insecurely on #{options['host']}") end s.handle(Calculator) diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb index f3fd1103478..390a9c59c3b 100755 --- a/src/ruby/bin/noproto_client.rb +++ b/src/ruby/bin/noproto_client.rb @@ -94,15 +94,15 @@ def main p stub_opts p options['host'] stub = NoProtoStub.new(options['host'], **stub_opts) - logger.info("... connecting securely on #{options['host']}") + GRPC.logger.info("... connecting securely on #{options['host']}") else stub = NoProtoStub.new(options['host']) - logger.info("... connecting insecurely on #{options['host']}") + GRPC.logger.info("... connecting insecurely on #{options['host']}") end - logger.info('sending a NoProto rpc') + GRPC.logger.info('sending a NoProto rpc') resp = stub.an_rpc(NoProtoMsg.new) - logger.info("got a response: #{resp}") + GRPC.logger.info("got a response: #{resp}") end main diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb index f71daeadb37..90baaf9a2e8 100755 --- a/src/ruby/bin/noproto_server.rb +++ b/src/ruby/bin/noproto_server.rb @@ -63,7 +63,7 @@ class NoProto < NoProtoService end def an_rpc(req, _call) - logger.info('echo service received a request') + GRPC.logger.info('echo service received a request') req end end @@ -98,10 +98,10 @@ def main s = GRPC::RpcServer.new if options['secure'] s.add_http2_port(options['host'], test_server_creds) - logger.info("... running securely on #{options['host']}") + GRPC.logger.info("... running securely on #{options['host']}") else s.add_http2_port(options['host']) - logger.info("... running insecurely on #{options['host']}") + GRPC.logger.info("... running insecurely on #{options['host']}") end s.handle(NoProto) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 947c39cd226..5f7beb5ab1e 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -188,7 +188,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - logger.debug("sending #{req}, marshalled? #{marshalled}") + GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") if marshalled payload = req else @@ -230,14 +230,14 @@ module GRPC @call.metadata = batch_result.metadata @metadata_tag = nil end - logger.debug("received req: #{batch_result}") + GRPC.logger.debug("received req: #{batch_result}") unless batch_result.nil? || batch_result.message.nil? - logger.debug("received req.to_s: #{batch_result.message}") + GRPC.logger.debug("received req.to_s: #{batch_result.message}") res = @unmarshal.call(batch_result.message) - logger.debug("received_req (unmarshalled): #{res.inspect}") + GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}") return res end - logger.debug('found nil; the final response has been sent') + GRPC.logger.debug('found nil; the final response has been sent') nil end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 4ca3004d6f0..67143d40cf5 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -115,10 +115,10 @@ module GRPC return enum_for(:each_queued_msg) unless block_given? count = 0 loop do - logger.debug("each_queued_msg: msg##{count}") + GRPC.logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop - logger.debug("each_queued_msg: req = #{req}") + GRPC.logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req @@ -134,22 +134,22 @@ module GRPC begin count = 0 requests.each do |req| - logger.debug("bidi-write_loop: #{count}") + GRPC.logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end if is_client - logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_CLOSE_FROM_CLIENT => nil, RECV_STATUS_ON_CLIENT => nil) batch_result.check_status end rescue StandardError => e - logger.warn('bidi-write_loop: failed') - logger.warn(e) + GRPC.logger.warn('bidi-write_loop: failed') + GRPC.logger.warn(e) raise e end end @@ -164,7 +164,7 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("bidi-read_loop: #{count}") + GRPC.logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, @@ -172,19 +172,19 @@ module GRPC # handle the next message if batch_result.message.nil? @readq.push(END_OF_READS) - logger.debug('bidi-read-loop: done reading!') + GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading - logger.debug("received req: #{batch_result.message}") + GRPC.logger.debug("received req: #{batch_result.message}") res = @unmarshal.call(batch_result.message) @readq.push(res) end rescue StandardError => e - logger.warn('bidi: read_loop failed') - logger.warn(e) + GRPC.logger.warn('bidi: read_loop failed') + GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 10211ae2397..2fd61c5f7e7 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -84,22 +84,22 @@ module GRPC rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error # code and detail message and some additional app-specific metadata. - logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") + GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}") send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. - logger.warn("failed call: #{active_call}\n#{e}") + GRPC.logger.warn("failed call: #{active_call}\n#{e}") rescue Core::OutOfTime # This is raised when active_call#method.call exceeeds the deadline # event. Send a status of deadline exceeded - logger.warn("late call: #{active_call}") + GRPC.logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') rescue StandardError => e # This will usuaally be an unhandled error in the handling code. # Send back a UNKNOWN status to the client - logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") - logger.warn(e) + GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") + GRPC.logger.warn(e) send_status(active_call, UNKNOWN, 'no reason given') end @@ -139,8 +139,8 @@ module GRPC details = 'Not sure why' if details.nil? active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e - logger.warn("Could not send status #{code}:#{details}") - logger.warn(e) + GRPC.logger.warn("Could not send status #{code}:#{details}") + GRPC.logger.warn(e) end end end diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index de224660894..665c1444324 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -94,7 +94,7 @@ module GRPC def schedule(*args, &blk) fail 'already stopped' if @stopped return if blk.nil? - logger.info('schedule another job') + GRPC.logger.info('schedule another job') @jobs << [blk, args] end @@ -114,14 +114,14 @@ module GRPC # Stops the jobs in the pool def stop - logger.info('stopping, will wait for all the workers to exit') + GRPC.logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } @stopped = true @stop_mutex.synchronize do # wait @keep_alive for works to stop @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers - logger.info('stopped, all workers are shutdown') + GRPC.logger.info('stopped, all workers are shutdown') end protected @@ -129,14 +129,14 @@ module GRPC # Forcibly shutdown any threads that are still alive. def forcibly_stop_workers return unless @workers.size > 0 - logger.info("forcibly terminating #{@workers.size} worker(s)") + GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin t.exit rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) + GRPC.logger.warn('error while terminating a worker') + GRPC.logger.warn(e) end end end @@ -156,8 +156,8 @@ module GRPC blk, args = @jobs.pop blk.call(*args) rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) + GRPC.logger.warn('Error in worker thread') + GRPC.logger.warn(e) end end end @@ -365,7 +365,7 @@ module GRPC # the server to stop. def run if rpc_descs.size.zero? - logger.warn('did not run as no services were present') + GRPC.logger.warn('did not run as no services were present') return end @run_mutex.synchronize do @@ -381,9 +381,9 @@ module GRPC # Sends UNAVAILABLE if there are too many unprocessed jobs def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests - logger.info("waiting: #{jobs_count}, max: #{max}") + GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") return an_rpc if @pool.jobs_waiting <= @max_waiting_requests - logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::UNAVAILABLE, '') @@ -394,7 +394,7 @@ module GRPC def found?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) - logger.warn("NOT_FOUND: #{an_rpc}") + GRPC.logger.warn("NOT_FOUND: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::NOT_FOUND, '') @@ -434,7 +434,7 @@ module GRPC return nil unless found?(an_rpc) # Create the ActiveCall - logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") + GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] ActiveCall.new(an_rpc.call, @cq, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), @@ -474,7 +474,7 @@ module GRPC else handlers[route] = service.method(rpc_name) end - logger.info("handling #{route} with #{handlers[route]}") + GRPC.logger.info("handling #{route} with #{handlers[route]}") end end end diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index 8ea2c82f171..3b9743ea668 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -175,23 +175,23 @@ module GRPC route = "/#{route_prefix}/#{name}" if desc.request_response? define_method(mth_name) do |req, deadline = nil, **kw| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") request_response(route, req, marshal, unmarshal, deadline, **kw) end elsif desc.client_streamer? define_method(mth_name) do |reqs, deadline = nil, **kw| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") client_streamer(route, reqs, marshal, unmarshal, deadline, **kw) end elsif desc.server_streamer? define_method(mth_name) do |req, deadline = nil, **kw, &blk| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") server_streamer(route, req, marshal, unmarshal, deadline, **kw, &blk) end else # is a bidi_stream define_method(mth_name) do |reqs, deadline = nil, **kw, &blk| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw, &blk) end diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb index f36906fc45f..96812170ba8 100644 --- a/src/ruby/lib/grpc/logconfig.rb +++ b/src/ruby/lib/grpc/logconfig.rb @@ -29,7 +29,10 @@ require 'logging' -include Logging.globally # logger is accessible everywhere +# GRPC contains the General RPC module. +module GRPC + extend Logging.globally +end Logging.logger.root.appenders = Logging.appenders.stdout Logging.logger.root.level = :info diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 2cd21a15e34..640b0f656c6 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -69,7 +69,7 @@ class EchoService end def an_rpc(req, call) - logger.info('echo service received a request') + GRPC.logger.info('echo service received a request') call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req @@ -109,7 +109,7 @@ class SlowService end def an_rpc(req, call) - logger.info("starting a slow #{@delay} rpc") + GRPC.logger.info("starting a slow #{@delay} rpc") sleep @delay @received_md << call.metadata unless call.metadata.nil? req # send back the req as the response diff --git a/templates/Makefile.template b/templates/Makefile.template index 66703f812b1..2da884e262f 100644 --- a/templates/Makefile.template +++ b/templates/Makefile.template @@ -673,11 +673,7 @@ third_party/protobuf/configure: $(LIBDIR)/$(CONFIG)/protobuf/libprotobuf.a: third_party/protobuf/configure $(E) "[MAKE] Building protobuf" -ifeq ($(HAVE_CXX11),true) - $(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-DLANG_CXX11 -std=c++11" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static) -else - $(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CXXFLAGS="-std=c++0x" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static) -endif + $(Q)(cd third_party/protobuf ; CC="$(CC)" CXX="$(CXX)" LDFLAGS="$(LDFLAGS_$(CONFIG)) -g" CPPFLAGS="$(PIC_CPPFLAGS) $(CPPFLAGS_$(CONFIG)) -g" ./configure --disable-shared --enable-static) $(Q)$(MAKE) -C third_party/protobuf clean $(Q)$(MAKE) -C third_party/protobuf $(Q)mkdir -p $(LIBDIR)/$(CONFIG)/protobuf diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index be3c7ca17f7..06614a93e77 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -211,6 +211,9 @@ void test_connect(const char *server_host, const char *client_host, int port, grpc_completion_queue_shutdown(server_cq); drain_cq(server_cq); grpc_completion_queue_destroy(server_cq); + + grpc_call_details_destroy(&call_details); + gpr_free(details); } int main(int argc, char **argv) { diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index cee8306213c..78502e4bb95 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -188,6 +188,7 @@ static void test_max_message_length(grpc_end2end_test_config config) { grpc_metadata_array_destroy(&trailing_metadata_recv); grpc_metadata_array_destroy(&request_metadata_recv); grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); grpc_call_destroy(c); grpc_call_destroy(s); diff --git a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c index 0ebea438c69..437345960ac 100644 --- a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c +++ b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c @@ -125,6 +125,8 @@ static void test_call_creds_failure(grpc_end2end_test_config config) { GPR_ASSERT(grpc_call_set_credentials(c, creds) != GRPC_CALL_OK); grpc_credentials_release(creds); + grpc_call_destroy(c); + end_test(&f); config.tear_down_data(&f); } diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc index a1822b7e156..d4871c0ba11 100644 --- a/test/cpp/qps/async_streaming_ping_pong_test.cc +++ b/test/cpp/qps/async_streaming_ping_pong_test.cc @@ -64,8 +64,8 @@ static void RunAsyncStreamingPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - ReportQPS(result); - ReportLatency(result); + ReportQPS(*result); + ReportLatency(*result); } } // namespace testing diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc index 8b037a86562..35f188c9862 100644 --- a/test/cpp/qps/async_unary_ping_pong_test.cc +++ b/test/cpp/qps/async_unary_ping_pong_test.cc @@ -64,8 +64,8 @@ static void RunAsyncUnaryPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - ReportQPS(result); - ReportLatency(result); + ReportQPS(*result); + ReportLatency(*result); } } // namespace testing diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 9f7d3b56a44..682d515ebea 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -44,9 +44,11 @@ #include #include #include +#include #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" #include "test/core/util/port.h" +#include "test/core/util/test_config.h" using std::list; using std::thread; @@ -75,13 +77,10 @@ static deque get_hosts(const string& name) { } } -ScenarioResult RunScenario(const ClientConfig& initial_client_config, - size_t num_clients, - const ServerConfig& server_config, - size_t num_servers, - int warmup_seconds, - int benchmark_seconds, - int spawn_local_worker_count) { +std::unique_ptr RunScenario( + const ClientConfig& initial_client_config, size_t num_clients, + const ServerConfig& server_config, size_t num_servers, int warmup_seconds, + int benchmark_seconds, int spawn_local_worker_count) { // ClientContext allocator (all are destroyed at scope exit) list contexts; auto alloc_context = [&contexts]() { @@ -89,6 +88,11 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, return &contexts.back(); }; + // To be added to the result, containing the final configuration used for + // client and config (incluiding host, etc.) + ClientConfig result_client_config; + ServerConfig result_server_config; + // Get client, server lists auto workers = get_hosts("QPS_WORKERS"); ClientConfig client_config = initial_client_config; @@ -96,6 +100,16 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, // Spawn some local workers if desired vector> local_workers; for (int i = 0; i < abs(spawn_local_worker_count); i++) { + // act as if we're a new test -- gets a good rng seed + static bool called_init = false; + if (!called_init) { + char args_buf[100]; + strcpy(args_buf, "some-benchmark"); + char *args[] = {args_buf}; + grpc_test_init(1, args); + called_init = true; + } + int driver_port = grpc_pick_unused_port_or_die(); int benchmark_port = grpc_pick_unused_port_or_die(); local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port)); @@ -127,6 +141,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, sd.stub = std::move(Worker::NewStub( CreateChannel(workers[i], InsecureCredentials(), ChannelArguments()))); ServerArgs args; + result_server_config = server_config; + result_server_config.set_host(workers[i]); *args.mutable_setup() = server_config; sd.stream = std::move(sd.stub->RunServer(alloc_context())); GPR_ASSERT(sd.stream->Write(args)); @@ -156,6 +172,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, cd.stub = std::move(Worker::NewStub(CreateChannel( workers[i + num_servers], InsecureCredentials(), ChannelArguments()))); ClientArgs args; + result_client_config = client_config; + result_client_config.set_host(workers[i + num_servers]); *args.mutable_setup() = client_config; cd.stream = std::move(cd.stub->RunTest(alloc_context())); GPR_ASSERT(cd.stream->Write(args)); @@ -196,7 +214,9 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, gpr_sleep_until(gpr_time_add(start, gpr_time_from_seconds(benchmark_seconds))); // Finish a run - ScenarioResult result; + std::unique_ptr result(new ScenarioResult); + result->client_config = result_client_config; + result->server_config = result_server_config; gpr_log(GPR_INFO, "Finishing"); for (auto server = servers.begin(); server != servers.end(); server++) { GPR_ASSERT(server->stream->Write(server_mark)); @@ -207,14 +227,14 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, for (auto server = servers.begin(); server != servers.end(); server++) { GPR_ASSERT(server->stream->Read(&server_status)); const auto& stats = server_status.stats(); - result.server_resources.push_back(ResourceUsage{ + result->server_resources.push_back(ResourceUsage{ stats.time_elapsed(), stats.time_user(), stats.time_system()}); } for (auto client = clients.begin(); client != clients.end(); client++) { GPR_ASSERT(client->stream->Read(&client_status)); const auto& stats = client_status.stats(); - result.latencies.MergeProto(stats.latencies()); - result.client_resources.push_back(ResourceUsage{ + result->latencies.MergeProto(stats.latencies()); + result->client_resources.push_back(ResourceUsage{ stats.time_elapsed(), stats.time_user(), stats.time_system()}); } diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index eb7119a89dd..5e9d4b3cb92 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -34,6 +34,8 @@ #ifndef TEST_QPS_DRIVER_H #define TEST_QPS_DRIVER_H +#include + #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -49,15 +51,14 @@ struct ScenarioResult { Histogram latencies; std::vector client_resources; std::vector server_resources; + ClientConfig client_config; + ServerConfig server_config; }; -ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config, - size_t num_clients, - const grpc::testing::ServerConfig& server_config, - size_t num_servers, - int warmup_seconds, - int benchmark_seconds, - int spawn_local_worker_count); +std::unique_ptr RunScenario( + const grpc::testing::ClientConfig& client_config, size_t num_clients, + const grpc::testing::ServerConfig& server_config, size_t num_servers, + int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index fc8e04201cd..f49cd38b339 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -103,14 +103,13 @@ int main(int argc, char** argv) { FLAGS_server_threads < FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel)); - auto result = RunScenario(client_config, FLAGS_num_clients, - server_config, FLAGS_num_servers, - FLAGS_warmup_seconds, FLAGS_benchmark_seconds, - FLAGS_local_workers); - - ReportQPSPerCore(result, server_config); - ReportLatency(result); - ReportTimes(result); + const auto result = RunScenario( + client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, + FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers); + + ReportQPSPerCore(*result, server_config); + ReportLatency(*result); + ReportTimes(*result); return 0; } diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index f567e4cf061..9a81d0fc90c 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -64,8 +64,8 @@ static void RunQPS() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - ReportQPSPerCore(result, server_config); - ReportLatency(result); + ReportQPSPerCore(*result, server_config); + ReportLatency(*result); } } // namespace testing diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 1553ef5f07e..122a7df1ac9 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -102,6 +102,7 @@ message ClientConfig { // only for async client: optional int32 async_client_threads = 7; optional RpcType rpc_type = 8 [default=UNARY]; + optional string host = 9; } // Request current stats @@ -129,6 +130,7 @@ message ServerConfig { required ServerType server_type = 1; optional int32 threads = 2 [default=1]; optional bool enable_ssl = 3 [default=false]; + optional string host = 4; } message ServerArgs { diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc index 48c7ff63e03..218306846b5 100644 --- a/test/cpp/qps/sync_streaming_ping_pong_test.cc +++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc @@ -63,8 +63,8 @@ static void RunSynchronousStreamingPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - ReportQPS(result); - ReportLatency(result); + ReportQPS(*result); + ReportLatency(*result); } } // namespace testing diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc index 4c4de6377b8..137ef79f2f8 100644 --- a/test/cpp/qps/sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/sync_unary_ping_pong_test.cc @@ -63,8 +63,8 @@ static void RunSynchronousUnaryPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - ReportQPS(result); - ReportLatency(result); + ReportQPS(*result); + ReportLatency(*result); } } // namespace testing diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 346fe08ab25..5dae3e8eb55 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -596,6 +596,24 @@ "posix" ] }, + { + "flaky": false, + "language": "c++", + "name": "async_streaming_ping_pong_test", + "platforms": [ + "windows", + "posix" + ] + }, + { + "flaky": false, + "language": "c++", + "name": "async_unary_ping_pong_test", + "platforms": [ + "windows", + "posix" + ] + }, { "flaky": false, "language": "c++", @@ -695,6 +713,24 @@ "posix" ] }, + { + "flaky": false, + "language": "c++", + "name": "sync_streaming_ping_pong_test", + "platforms": [ + "windows", + "posix" + ] + }, + { + "flaky": false, + "language": "c++", + "name": "sync_unary_ping_pong_test", + "platforms": [ + "windows", + "posix" + ] + }, { "flaky": false, "language": "c++", diff --git a/vsprojects/nuget_package/grpc.native.csharp_ext.nuspec b/vsprojects/nuget_package/grpc.native.csharp_ext.nuspec index 91740ac8fd4..dfaa867db0e 100644 --- a/vsprojects/nuget_package/grpc.native.csharp_ext.nuspec +++ b/vsprojects/nuget_package/grpc.native.csharp_ext.nuspec @@ -2,14 +2,14 @@ grpc.native.csharp_ext - 0.8.0.0 + 0.9.0.0 Google Inc. Jan Tattermusch https://github.com/grpc/grpc/blob/master/LICENSE http://github.com/grpc/grpc false Native extension needed by gRPC C# library. This is not the package you are looking for, it is only meant to be used as a dependency. - Release of gRPC C core 0.8.0 libraries. + Release of gRPC C core 0.9.0 libraries. Copyright 2015 gRPC C# Native Extension Native library required by gRPC C#