diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index dadc9ab76cf..6f37b059f75 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -181,6 +181,7 @@ namespace Grpc.Core.Internal { started = true; halfcloseRequested = true; + halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. this.readObserver = readObserver; @@ -544,6 +545,8 @@ namespace Grpc.Core.Internal } observer = readObserver; status = finishedStatus; + + ReleaseResourcesIfPossible(); } // TODO: wrap deserialization... diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 462fab4454f..76a08ce5186 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -127,8 +127,7 @@ namespace math public void OnCompleted() { - Task.Factory.StartNew(() => - responseObserver.OnCompleted()); + responseObserver.OnCompleted(); } public void OnError(Exception error) @@ -138,13 +137,7 @@ namespace math public void OnNext(DivArgs value) { - // TODO: currently we need this indirection because - // responseObserver waits for write to finish, this - // callback is called from grpc threadpool which - // currently only has one thread. - // Same story for OnCompleted(). - Task.Factory.StartNew(() => - responseObserver.OnNext(DivInternal(value))); + responseObserver.OnNext(DivInternal(value)); } } } diff --git a/src/csharp/Grpc.IntegrationTesting/Client.cs b/src/csharp/Grpc.IntegrationTesting/Client.cs index 0c70744cea5..fa1c7cd051b 100644 --- a/src/csharp/Grpc.IntegrationTesting/Client.cs +++ b/src/csharp/Grpc.IntegrationTesting/Client.cs @@ -138,7 +138,7 @@ namespace Grpc.IntegrationTesting } } - private void RunEmptyUnary(TestServiceGrpc.ITestServiceClient client) + public static void RunEmptyUnary(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running empty_unary"); var response = client.EmptyCall(Empty.DefaultInstance); @@ -146,7 +146,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunLargeUnary(TestServiceGrpc.ITestServiceClient client) + public static void RunLargeUnary(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running large_unary"); var request = SimpleRequest.CreateBuilder() @@ -162,7 +162,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunClientStreaming(TestServiceGrpc.ITestServiceClient client) + public static void RunClientStreaming(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running client_streaming"); @@ -181,7 +181,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunServerStreaming(TestServiceGrpc.ITestServiceClient client) + public static void RunServerStreaming(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running server_streaming"); @@ -206,7 +206,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunPingPong(TestServiceGrpc.ITestServiceClient client) + public static void RunPingPong(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running ping_pong"); @@ -235,7 +235,7 @@ namespace Grpc.IntegrationTesting inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2635)) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) .SetPayload(CreateZerosPayload(1828)).Build()); response = recorder.Queue.Take(); @@ -252,13 +252,15 @@ namespace Grpc.IntegrationTesting Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); Assert.AreEqual(58979, response.Payload.Body.Length); + inputs.OnCompleted(); + recorder.Finished.Wait(); Assert.AreEqual(0, recorder.Queue.Count); Console.WriteLine("Passed!"); } - private void RunEmptyStream(TestServiceGrpc.ITestServiceClient client) + public static void RunEmptyStream(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running empty_stream"); @@ -273,13 +275,13 @@ namespace Grpc.IntegrationTesting } // This is not an official interop test, but it's useful. - private void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client) + public static void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client) { BenchmarkUtil.RunBenchmark(10000, 10000, () => { client.EmptyCall(Empty.DefaultInstance);}); } - private Payload CreateZerosPayload(int size) { + private static Payload CreateZerosPayload(int size) { return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 9b46a644bc1..e66f708a945 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -47,6 +47,8 @@ + + diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs new file mode 100644 index 00000000000..87d25b0a98c --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -0,0 +1,119 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using grpc.testing; + +namespace Grpc.IntegrationTesting +{ + /// + /// Runs interop tests in-process. + /// + public class InteropClientServerTest + { + string host = "localhost"; + Server server; + Channel channel; + TestServiceGrpc.ITestServiceClient client; + + [TestFixtureSetUp] + public void Init() + { + GrpcEnvironment.Initialize(); + + server = new Server(); + server.AddServiceDefinition(TestServiceGrpc.BindService(new TestServiceImpl())); + int port = server.AddPort(host + ":0"); + server.Start(); + channel = new Channel(host + ":" + port); + client = TestServiceGrpc.NewStub(channel); + } + + [TestFixtureTearDown] + public void Cleanup() + { + channel.Dispose(); + + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + + [Test] + public void EmptyUnary() + { + Client.RunEmptyUnary(client); + } + + [Test] + public void LargeUnary() + { + Client.RunEmptyUnary(client); + } + + [Test] + public void ClientStreaming() + { + Client.RunClientStreaming(client); + } + + [Test] + public void ServerStreaming() + { + Client.RunServerStreaming(client); + } + + [Test] + public void PingPong() + { + Client.RunPingPong(client); + } + + [Test] + public void EmptyStream() + { + Client.RunEmptyStream(client); + } + + // TODO: add cancel_after_begin + + // TODO: add cancel_after_first_response + + } +} + diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs new file mode 100644 index 00000000000..176843b1305 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -0,0 +1,140 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Google.ProtocolBuffers; +using Grpc.Core.Utils; + +namespace grpc.testing +{ + /// + /// Implementation of TestService server + /// + public class TestServiceImpl : TestServiceGrpc.ITestService + { + public void EmptyCall(Empty request, IObserver responseObserver) + { + responseObserver.OnNext(Empty.DefaultInstance); + responseObserver.OnCompleted(); + } + + public void UnaryCall(SimpleRequest request, IObserver responseObserver) + { + var response = SimpleResponse.CreateBuilder() + .SetPayload(CreateZerosPayload(request.ResponseSize)).Build(); + //TODO: check we support ReponseType + responseObserver.OnNext(response); + responseObserver.OnCompleted(); + } + + public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver) + { + foreach(var responseParam in request.ResponseParametersList) + { + var response = StreamingOutputCallResponse.CreateBuilder() + .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); + responseObserver.OnNext(response); + } + responseObserver.OnCompleted(); + } + + public IObserver StreamingInputCall(IObserver responseObserver) + { + var recorder = new RecordingObserver(); + Task.Run(() => { + int sum = 0; + foreach(var req in recorder.ToList().Result) + { + sum += req.Payload.Body.Length; + } + var response = StreamingInputCallResponse.CreateBuilder() + .SetAggregatedPayloadSize(sum).Build(); + responseObserver.OnNext(response); + responseObserver.OnCompleted(); + }); + return recorder; + } + + public IObserver FullDuplexCall(IObserver responseObserver) + { + return new FullDuplexObserver(responseObserver); + } + + public IObserver HalfDuplexCall(IObserver responseObserver) + { + throw new NotImplementedException(); + } + + private class FullDuplexObserver : IObserver { + + readonly IObserver responseObserver; + + public FullDuplexObserver(IObserver responseObserver) + { + this.responseObserver = responseObserver; + } + + public void OnCompleted() + { + responseObserver.OnCompleted(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnNext(StreamingOutputCallRequest value) + { + // TODO: this is not in order!!! + //Task.Factory.StartNew(() => { + + foreach(var responseParam in value.ResponseParametersList) + { + var response = StreamingOutputCallResponse.CreateBuilder() + .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); + responseObserver.OnNext(response); + } + //}); + } + } + + private static Payload CreateZerosPayload(int size) { + return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); + } + } +} +