diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln index 2fd10cb94af..a7b2c9b5804 100644 --- a/src/csharp/Grpc.sln +++ b/src/csharp/Grpc.sln @@ -45,6 +45,6 @@ Global {CCC4440E-49F7-4790-B0AF-FEABB0837AE7}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(MonoDevelopProperties) = preSolution - StartupItem = GrpcApi\GrpcApi.csproj + StartupItem = InteropClient\InteropClient.csproj EndGlobalSection EndGlobal diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index fbfe50e4d88..95df8909173 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -62,6 +62,7 @@ + diff --git a/src/csharp/GrpcCore/Utils/RecordingQueue.cs b/src/csharp/GrpcCore/Utils/RecordingQueue.cs new file mode 100644 index 00000000000..8e2f8a496d4 --- /dev/null +++ b/src/csharp/GrpcCore/Utils/RecordingQueue.cs @@ -0,0 +1,45 @@ +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; + +namespace Google.GRPC.Core.Utils +{ + public class RecordingQueue : IObserver + { + readonly BlockingCollection queue = new BlockingCollection(); + TaskCompletionSource tcs = new TaskCompletionSource(); + + public void OnCompleted() + { + tcs.SetResult(null); + } + + public void OnError(Exception error) + { + tcs.SetException(error); + } + + public void OnNext(T value) + { + queue.Add(value); + } + + public BlockingCollection Queue + { + get + { + return queue; + } + } + + public Task Finished + { + get + { + return tcs.Task; + } + } + } +} + diff --git a/src/csharp/InteropClient/InteropClient.cs b/src/csharp/InteropClient/InteropClient.cs index 62091c5fdc3..40c4781304d 100644 --- a/src/csharp/InteropClient/InteropClient.cs +++ b/src/csharp/InteropClient/InteropClient.cs @@ -1,7 +1,9 @@ using System; +using System.Collections.Generic; using NUnit.Framework; using System.Text.RegularExpressions; using Google.GRPC.Core; +using Google.GRPC.Core.Utils; using Google.ProtocolBuffers; using grpc.testing; @@ -78,6 +80,18 @@ namespace InteropClient case "large_unary": RunLargeUnary(client); break; + case "client_streaming": + RunClientStreaming(client); + break; + case "server_streaming": + RunServerStreaming(client); + break; + case "ping_pong": + RunPingPong(client); + break; + case "empty_stream": + RunEmptyStream(client); + break; default: throw new ArgumentException("Unknown test case " + testCase); } @@ -88,6 +102,7 @@ namespace InteropClient Console.WriteLine("running empty_unary"); var response = client.EmptyCall(Empty.DefaultInstance); Assert.IsNotNull(response); + Console.WriteLine("Passed!"); } private void RunLargeUnary(TestServiceGrpc.ITestServiceClient client) @@ -96,14 +111,129 @@ namespace InteropClient var request = SimpleRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .SetResponseSize(314159) - .SetPayload(Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[271828]))) + .SetPayload(CreateZerosPayload(271828)) .Build(); var response = client.UnaryCall(request); Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); Assert.AreEqual(314159, response.Payload.Body.Length); - // TODO: assert that the response is all zeros... + Console.WriteLine("Passed!"); + } + + private void RunClientStreaming(TestServiceGrpc.ITestServiceClient client) + { + Console.WriteLine("running client_streaming"); + + var bodySizes = new List{27182, 8, 1828, 45904}; + + var context = client.StreamingInputCall(); + foreach (var size in bodySizes) + { + context.Inputs.OnNext( + StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build()); + } + context.Inputs.OnCompleted(); + + var response = context.Task.Result; + Assert.AreEqual(74922, response.AggregatedPayloadSize); + Console.WriteLine("Passed!"); + } + + private void RunServerStreaming(TestServiceGrpc.ITestServiceClient client) + { + Console.WriteLine("running server_streaming"); + + var bodySizes = new List{31415, 9, 2653, 58979}; + + var request = StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddRangeResponseParameters(bodySizes.ConvertAll( + (size) => ResponseParameters.CreateBuilder().SetSize(size).Build())) + .Build(); + + var recorder = new RecordingObserver(); + client.StreamingOutputCall(request, recorder); + + var responseList = recorder.ToList().Result; + + foreach (var res in responseList) + { + Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + } + CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); + Console.WriteLine("Passed!"); + } + + private void RunPingPong(TestServiceGrpc.ITestServiceClient client) + { + Console.WriteLine("running ping_pong"); + + var recorder = new RecordingQueue(); + var inputs = client.FullDuplexCall(recorder); + + StreamingOutputCallResponse response; + + inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); + + response = recorder.Queue.Take(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(31415, response.Payload.Body.Length); + + inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) + .SetPayload(CreateZerosPayload(8)).Build()); + + response = recorder.Queue.Take(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(9, response.Payload.Body.Length); + + inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2635)) + .SetPayload(CreateZerosPayload(1828)).Build()); + + response = recorder.Queue.Take(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(2653, response.Payload.Body.Length); + + + inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) + .SetPayload(CreateZerosPayload(45904)).Build()); + + response = recorder.Queue.Take(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(58979, response.Payload.Body.Length); + + recorder.Finished.Wait(); + Assert.AreEqual(0, recorder.Queue.Count); + + Console.WriteLine("Passed!"); + } + + private void RunEmptyStream(TestServiceGrpc.ITestServiceClient client) + { + Console.WriteLine("running empty_stream"); + + var recorder = new RecordingObserver(); + var inputs = client.FullDuplexCall(recorder); + inputs.OnCompleted(); + + var responseList = recorder.ToList().Result; + Assert.AreEqual(0, responseList.Count); + + Console.WriteLine("Passed!"); + } + + + private Payload CreateZerosPayload(int size) { + return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); } private static ClientOptions ParseArguments(string[] args)