From 15111f5c6b25970c8b5a6413cd147a61ec35fb27 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 5 Feb 2015 18:15:14 -0800 Subject: [PATCH] Polishing C# math service implementation and added an in-process mathclient mathserver test --- src/csharp/Grpc.sln | 6 + src/csharp/GrpcApi/DummyMathServiceClient.cs | 74 ----------- src/csharp/GrpcApi/Examples.cs | 15 ++- src/csharp/GrpcApi/GrpcApi.csproj | 6 +- src/csharp/GrpcApi/IMathServiceClient.cs | 26 ---- src/csharp/GrpcApi/MathGrpc.cs | 124 ++++++++++++++++++ src/csharp/GrpcApi/MathServiceClientStub.cs | 75 ----------- src/csharp/GrpcApi/MathServiceImpl.cs | 119 +++++++++++++++++ src/csharp/GrpcApiTests/.gitignore | 2 + src/csharp/GrpcApiTests/GrpcApiTests.csproj | 56 ++++++++ .../GrpcApiTests/MathClientServerTests.cs | 115 ++++++++++++++++ .../GrpcApiTests/Properties/AssemblyInfo.cs | 22 ++++ src/csharp/GrpcCore/Call.cs | 4 +- src/csharp/GrpcCore/GrpcCore.csproj | 6 +- src/csharp/GrpcCore/IMarshaller.cs | 31 ----- src/csharp/GrpcCore/Marshaller.cs | 54 ++++++++ src/csharp/GrpcCore/Method.cs | 10 +- src/csharp/GrpcCore/Server.cs | 17 ++- src/csharp/GrpcCore/ServerCallHandler.cs | 13 +- .../GrpcCore/ServerServiceDefinition.cs | 65 +++++++++ .../Utils.cs => GrpcCore/Utils/PortPicker.cs} | 9 +- .../Utils}/RecordingObserver.cs | 2 +- src/csharp/GrpcCoreTests/ClientServerTest.cs | 23 ++-- src/csharp/GrpcCoreTests/GrpcCoreTests.csproj | 1 - src/csharp/GrpcCoreTests/ServerTest.cs | 4 +- src/csharp/GrpcDemo/Program.cs | 3 +- 26 files changed, 622 insertions(+), 260 deletions(-) delete mode 100644 src/csharp/GrpcApi/DummyMathServiceClient.cs delete mode 100644 src/csharp/GrpcApi/IMathServiceClient.cs create mode 100644 src/csharp/GrpcApi/MathGrpc.cs delete mode 100644 src/csharp/GrpcApi/MathServiceClientStub.cs create mode 100644 src/csharp/GrpcApi/MathServiceImpl.cs create mode 100644 src/csharp/GrpcApiTests/.gitignore create mode 100644 src/csharp/GrpcApiTests/GrpcApiTests.csproj create mode 100644 src/csharp/GrpcApiTests/MathClientServerTests.cs create mode 100644 src/csharp/GrpcApiTests/Properties/AssemblyInfo.cs delete mode 100644 src/csharp/GrpcCore/IMarshaller.cs create mode 100644 src/csharp/GrpcCore/Marshaller.cs create mode 100644 src/csharp/GrpcCore/ServerServiceDefinition.cs rename src/csharp/{GrpcCoreTests/Utils.cs => GrpcCore/Utils/PortPicker.cs} (90%) rename src/csharp/{GrpcApi => GrpcCore/Utils}/RecordingObserver.cs (94%) diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln index 5890617acf5..c46c4d67620 100644 --- a/src/csharp/Grpc.sln +++ b/src/csharp/Grpc.sln @@ -9,12 +9,18 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcCore", "GrpcCore\GrpcCo EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcCoreTests", "GrpcCoreTests\GrpcCoreTests.csproj", "{86EC5CB4-4EA2-40A2-8057-86542A0353BB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcApiTests", "GrpcApiTests\GrpcApiTests.csproj", "{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x86 = Debug|x86 Release|x86 = Release|x86 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution + {143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Debug|x86.ActiveCfg = Debug|Any CPU + {143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Debug|x86.Build.0 = Debug|Any CPU + {143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Release|x86.ActiveCfg = Release|Any CPU + {143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Release|x86.Build.0 = Release|Any CPU {61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Debug|x86.ActiveCfg = Debug|x86 {61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Debug|x86.Build.0 = Debug|x86 {61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Release|x86.ActiveCfg = Release|x86 diff --git a/src/csharp/GrpcApi/DummyMathServiceClient.cs b/src/csharp/GrpcApi/DummyMathServiceClient.cs deleted file mode 100644 index 6799109be42..00000000000 --- a/src/csharp/GrpcApi/DummyMathServiceClient.cs +++ /dev/null @@ -1,74 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Reactive.Linq; - -namespace math -{ -// /// -// /// Dummy local implementation of math service. -// /// -// public class DummyMathServiceClient : IMathServiceClient -// { -// public DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken)) -// { -// // TODO: cancellation... -// return DivInternal(args); -// } -// -// public Task DivAsync(DivArgs args, CancellationToken token = default(CancellationToken)) -// { -// return Task.Factory.StartNew(() => DivInternal(args), token); -// } -// -// public IObservable Fib(FibArgs args, CancellationToken token = default(CancellationToken)) -// { -// if (args.Limit > 0) -// { -// // TODO: cancellation -// return FibInternal(args.Limit).ToObservable(); -// } -// -// throw new NotImplementedException("Not implemented yet"); -// } -// -// public Task Sum(IObservable inputs, CancellationToken token = default(CancellationToken)) -// { -// // TODO: implement -// inputs = null; -// return Task.Factory.StartNew(() => Num.CreateBuilder().Build(), token); -// } -// -// public IObservable DivMany(IObservable inputs, CancellationToken token = default(CancellationToken)) -// { -// // TODO: implement -// inputs = null; -// return new List { }.ToObservable (); -// } -// -// -// DivReply DivInternal(DivArgs args) -// { -// long quotient = args.Dividend / args.Divisor; -// long remainder = args.Dividend % args.Divisor; -// return new DivReply.Builder{ Quotient = quotient, Remainder = remainder }.Build(); -// } -// -// IEnumerable FibInternal(long n) -// { -// long a = 0; -// yield return new Num.Builder{Num_=a}.Build(); -// -// long b = 1; -// for (long i = 0; i < n - 1; i++) -// { -// long temp = a; -// a = b; -// b = temp + b; -// yield return new Num.Builder{Num_=a}.Build(); -// } -// } -// } -} - diff --git a/src/csharp/GrpcApi/Examples.cs b/src/csharp/GrpcApi/Examples.cs index d45b702708e..d2a6cc01fdb 100644 --- a/src/csharp/GrpcApi/Examples.cs +++ b/src/csharp/GrpcApi/Examples.cs @@ -2,32 +2,33 @@ using System; using System.Threading.Tasks; using System.Collections.Generic; using System.Reactive.Linq; +using Google.GRPC.Core.Utils; namespace math { public class Examples { - public static void DivExample(IMathServiceClient stub) + public static void DivExample(MathGrpc.IMathServiceClient stub) { DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build()); Console.WriteLine("Div Result: " + result); } - public static void DivAsyncExample(IMathServiceClient stub) + public static void DivAsyncExample(MathGrpc.IMathServiceClient stub) { Task call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); DivReply result = call.Result; Console.WriteLine(result); } - public static void DivAsyncWithCancellationExample(IMathServiceClient stub) + public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub) { Task call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); DivReply result = call.Result; Console.WriteLine(result); } - public static void FibExample(IMathServiceClient stub) + public static void FibExample(MathGrpc.IMathServiceClient stub) { var recorder = new RecordingObserver(); stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder); @@ -36,7 +37,7 @@ namespace math Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result)); } - public static void SumExample(IMathServiceClient stub) + public static void SumExample(MathGrpc.IMathServiceClient stub) { List numbers = new List{new Num.Builder { Num_ = 1 }.Build(), new Num.Builder { Num_ = 2 }.Build(), @@ -51,7 +52,7 @@ namespace math Console.WriteLine("Sum Result: " + res.Task.Result); } - public static void DivManyExample(IMathServiceClient stub) + public static void DivManyExample(MathGrpc.IMathServiceClient stub) { List divArgsList = new List{ new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(), @@ -71,7 +72,7 @@ namespace math Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result)); } - public static void DependendRequestsExample(IMathServiceClient stub) + public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub) { var numberList = new List { new Num.Builder{ Num_ = 1 }.Build(), diff --git a/src/csharp/GrpcApi/GrpcApi.csproj b/src/csharp/GrpcApi/GrpcApi.csproj index d0377828b54..07bd6f6efbe 100644 --- a/src/csharp/GrpcApi/GrpcApi.csproj +++ b/src/csharp/GrpcApi/GrpcApi.csproj @@ -48,11 +48,9 @@ - - - - + + diff --git a/src/csharp/GrpcApi/IMathServiceClient.cs b/src/csharp/GrpcApi/IMathServiceClient.cs deleted file mode 100644 index 51385a328f4..00000000000 --- a/src/csharp/GrpcApi/IMathServiceClient.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Reactive.Linq; -using Google.GRPC.Core; - -namespace math -{ - /// - /// Hand-written stub for MathService defined in math.proto. - /// This code will be generated by gRPC codegen in the future. - /// - public interface IMathServiceClient - { - DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken)); - - Task DivAsync(DivArgs args, CancellationToken token = default(CancellationToken)); - - Task Fib(FibArgs args, IObserver outputs, CancellationToken token = default(CancellationToken)); - - ClientStreamingAsyncResult Sum(CancellationToken token = default(CancellationToken)); - - IObserver DivMany(IObserver outputs, CancellationToken token = default(CancellationToken)); - } -} \ No newline at end of file diff --git a/src/csharp/GrpcApi/MathGrpc.cs b/src/csharp/GrpcApi/MathGrpc.cs new file mode 100644 index 00000000000..520fec437ab --- /dev/null +++ b/src/csharp/GrpcApi/MathGrpc.cs @@ -0,0 +1,124 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Reactive.Linq; +using Google.GRPC.Core; + +namespace math +{ + /// + /// Math service definitions (this is handwritten version of code that will normally be generated). + /// + public class MathGrpc + { + readonly static Marshaller divArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom); + readonly static Marshaller divReplyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom); + readonly static Marshaller numMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom); + readonly static Marshaller fibArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom); + + readonly static Method divMethod = new Method( + MethodType.Unary, + "/math.Math/Div", + divArgsMarshaller, + divReplyMarshaller + ); + readonly static Method fibMethod = new Method( + MethodType.ServerStreaming, + "/math.Math/Fib", + fibArgsMarshaller, + numMarshaller + ); + readonly static Method sumMethod = new Method( + MethodType.ClientStreaming, + "/math.Math/Sum", + numMarshaller, + numMarshaller + ); + readonly static Method divManyMethod = new Method( + MethodType.DuplexStreaming, + "/math.Math/DivMany", + divArgsMarshaller, + divReplyMarshaller + ); + + public interface IMathServiceClient + { + DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)); + + Task DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); + + Task Fib(FibArgs request, IObserver responseObserver, CancellationToken token = default(CancellationToken)); + + ClientStreamingAsyncResult Sum(CancellationToken token = default(CancellationToken)); + + IObserver DivMany(IObserver responseObserver, CancellationToken token = default(CancellationToken)); + } + + public class MathServiceClientStub : IMathServiceClient + { + readonly Channel channel; + + public MathServiceClientStub(Channel channel) + { + this.channel = channel; + } + + public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)) + { + var call = new Google.GRPC.Core.Call(divMethod, channel); + return Calls.BlockingUnaryCall(call, request, token); + } + + public Task DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)) + { + var call = new Google.GRPC.Core.Call(divMethod, channel); + return Calls.AsyncUnaryCall(call, request, token); + } + + public Task Fib(FibArgs request, IObserver responseObserver, CancellationToken token = default(CancellationToken)) + { + var call = new Google.GRPC.Core.Call(fibMethod, channel); + return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + } + + public ClientStreamingAsyncResult Sum(CancellationToken token = default(CancellationToken)) + { + var call = new Google.GRPC.Core.Call(sumMethod, channel); + return Calls.AsyncClientStreamingCall(call, token); + } + + public IObserver DivMany(IObserver responseObserver, CancellationToken token = default(CancellationToken)) + { + var call = new Google.GRPC.Core.Call(divManyMethod, channel); + return Calls.DuplexStreamingCall(call, responseObserver, token); + } + } + + // server-side interface + public interface IMathService + { + void Div(DivArgs request, IObserver responseObserver); + + void Fib(FibArgs request, IObserver responseObserver); + + IObserver Sum(IObserver responseObserver); + + IObserver DivMany(IObserver responseObserver); + } + + public static ServerServiceDefinition BindService(IMathService serviceImpl) + { + return ServerServiceDefinition.CreateBuilder("/math.Math/") + .AddMethod(divMethod, serviceImpl.Div) + .AddMethod(fibMethod, serviceImpl.Fib) + .AddMethod(sumMethod, serviceImpl.Sum) + .AddMethod(divManyMethod, serviceImpl.DivMany).Build(); + } + + public static IMathServiceClient NewStub(Channel channel) + { + return new MathServiceClientStub(channel); + } + } +} \ No newline at end of file diff --git a/src/csharp/GrpcApi/MathServiceClientStub.cs b/src/csharp/GrpcApi/MathServiceClientStub.cs deleted file mode 100644 index 493c186b8e5..00000000000 --- a/src/csharp/GrpcApi/MathServiceClientStub.cs +++ /dev/null @@ -1,75 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Reactive.Linq; -using Google.GRPC.Core; - -namespace math -{ - /// - /// Implementation of math service stub (this is handwritten version of code - /// that will normally be generated). - /// - public class MathServiceClientStub : IMathServiceClient - { - readonly Channel channel; - readonly TimeSpan methodTimeout; - - public MathServiceClientStub(Channel channel, TimeSpan methodTimeout) - { - this.channel = channel; - this.methodTimeout = methodTimeout; - } - - public DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken)) - { - var call = new Google.GRPC.Core.Call("/math.Math/Div", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel); - return Calls.BlockingUnaryCall(call, args, token); - } - - public Task DivAsync(DivArgs args, CancellationToken token = default(CancellationToken)) - { - var call = new Google.GRPC.Core.Call("/math.Math/Div", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel); - return Calls.AsyncUnaryCall(call, args, token); - } - - public Task Fib(FibArgs args, IObserver outputs, CancellationToken token = default(CancellationToken)) - { - var call = new Google.GRPC.Core.Call("/math.Math/Fib", Serialize_FibArgs, Deserialize_Num, methodTimeout, channel); - return Calls.AsyncServerStreamingCall(call, args, outputs, token); - } - - public ClientStreamingAsyncResult Sum(CancellationToken token = default(CancellationToken)) - { - var call = new Google.GRPC.Core.Call("/math.Math/Sum", Serialize_Num, Deserialize_Num, methodTimeout, channel); - return Calls.AsyncClientStreamingCall(call, token); - } - - public IObserver DivMany(IObserver outputs, CancellationToken token = default(CancellationToken)) - { - var call = new Google.GRPC.Core.Call("/math.Math/DivMany", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel); - return Calls.DuplexStreamingCall(call, outputs, token); - } - - private static byte[] Serialize_DivArgs(DivArgs arg) { - return arg.ToByteArray(); - } - - private static byte[] Serialize_FibArgs(FibArgs arg) { - return arg.ToByteArray(); - } - - private static byte[] Serialize_Num(Num arg) { - return arg.ToByteArray(); - } - - private static DivReply Deserialize_DivReply(byte[] payload) { - return DivReply.CreateBuilder().MergeFrom(payload).Build(); - } - - private static Num Deserialize_Num(byte[] payload) { - return Num.CreateBuilder().MergeFrom(payload).Build(); - } - } -} \ No newline at end of file diff --git a/src/csharp/GrpcApi/MathServiceImpl.cs b/src/csharp/GrpcApi/MathServiceImpl.cs new file mode 100644 index 00000000000..27abc4ce17c --- /dev/null +++ b/src/csharp/GrpcApi/MathServiceImpl.cs @@ -0,0 +1,119 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Reactive.Linq; +using Google.GRPC.Core.Utils; + +namespace math +{ + /// + /// Implementation of MathService server + /// + public class MathServiceImpl : MathGrpc.IMathService + { + public void Div(DivArgs request, IObserver responseObserver) + { + var response = DivInternal(request); + responseObserver.OnNext(response); + responseObserver.OnCompleted(); + } + + public void Fib(FibArgs request, IObserver responseObserver) + { + if (request.Limit <= 0) + { + // TODO: support cancellation.... + throw new NotImplementedException("Not implemented yet"); + } + + if (request.Limit > 0) + { + foreach (var num in FibInternal(request.Limit)) + { + responseObserver.OnNext(num); + } + responseObserver.OnCompleted(); + } + } + + public IObserver Sum(IObserver responseObserver) + { + var recorder = new RecordingObserver(); + Task.Factory.StartNew(() => { + + List inputs = recorder.ToList().Result; + + long sum = 0; + foreach (Num num in inputs) + { + sum += num.Num_; + } + + responseObserver.OnNext(Num.CreateBuilder().SetNum_(sum).Build()); + responseObserver.OnCompleted(); + }); + return recorder; + } + + public IObserver DivMany(IObserver responseObserver) + { + return new DivObserver(responseObserver); + } + + static DivReply DivInternal(DivArgs args) + { + long quotient = args.Dividend / args.Divisor; + long remainder = args.Dividend % args.Divisor; + return new DivReply.Builder { Quotient = quotient, Remainder = remainder }.Build(); + } + + static IEnumerable FibInternal(long n) + { + long a = 1; + yield return new Num.Builder { Num_=a }.Build(); + + long b = 1; + for (long i = 0; i < n - 1; i++) + { + long temp = a; + a = b; + b = temp + b; + yield return new Num.Builder { Num_=a }.Build(); + } + } + + private class DivObserver : IObserver { + + readonly IObserver responseObserver; + + public DivObserver(IObserver responseObserver) + { + this.responseObserver = responseObserver; + } + + public void OnCompleted() + { + Task.Factory.StartNew(() => + responseObserver.OnCompleted()); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnNext(DivArgs value) + { + // TODO: currently we need this indirection because + // responseObserver waits for write to finish, this + // callback is called from grpc threadpool which + // currently only has one thread. + // Same story for OnCompleted(). + Task.Factory.StartNew(() => + responseObserver.OnNext(DivInternal(value))); + } + } + } +} + diff --git a/src/csharp/GrpcApiTests/.gitignore b/src/csharp/GrpcApiTests/.gitignore new file mode 100644 index 00000000000..2cc8cca52d0 --- /dev/null +++ b/src/csharp/GrpcApiTests/.gitignore @@ -0,0 +1,2 @@ +test-results +bin diff --git a/src/csharp/GrpcApiTests/GrpcApiTests.csproj b/src/csharp/GrpcApiTests/GrpcApiTests.csproj new file mode 100644 index 00000000000..d0aac2b7533 --- /dev/null +++ b/src/csharp/GrpcApiTests/GrpcApiTests.csproj @@ -0,0 +1,56 @@ + + + + Debug + AnyCPU + 10.0.0 + 2.0 + {143B1C29-C442-4BE0-BF3F-A8F92288AC9F} + Library + GrpcApiTests + GrpcApiTests + v4.5 + + + true + full + false + bin\Debug + DEBUG; + prompt + 4 + false + + + full + true + bin\Release + prompt + 4 + false + + + + + False + + + ..\lib\Google.ProtocolBuffers.dll + + + + + + + + + + {7DC1433E-3225-42C7-B7EA-546D56E27A4B} + GrpcApi + + + {CCC4440E-49F7-4790-B0AF-FEABB0837AE7} + GrpcCore + + + \ No newline at end of file diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs new file mode 100644 index 00000000000..aa78b698e85 --- /dev/null +++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs @@ -0,0 +1,115 @@ +using System; +using NUnit.Framework; +using Google.GRPC.Core; +using System.Threading; +using System.Threading.Tasks; +using Google.GRPC.Core.Utils; +using System.Collections.Generic; + +namespace math.Tests +{ + /// + /// Math client talks to local math server. + /// + public class MathClientServerTest + { + string serverAddr = "localhost:" + PortPicker.PickUnusedPort(); + Server server; + Channel channel; + MathGrpc.IMathServiceClient client; + + [TestFixtureSetUp] + public void Init() + { + server = new Server(); + server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl())); + server.AddPort(serverAddr); + server.Start(); + channel = new Channel(serverAddr); + client = MathGrpc.NewStub(channel); + } + + [Test] + public void Div1() + { + DivReply response = client.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build()); + Assert.AreEqual(3, response.Quotient); + Assert.AreEqual(1, response.Remainder); + } + + [Test] + public void Div2() + { + DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 1 }.Build()); + Assert.AreEqual(0, response.Quotient); + Assert.AreEqual(0, response.Remainder); + } + + // TODO: test division by zero + + [Test] + public void DivAsync() + { + DivReply response = client.DivAsync(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build()).Result; + Assert.AreEqual(3, response.Quotient); + Assert.AreEqual(1, response.Remainder); + } + + [Test] + public void Fib() + { + var recorder = new RecordingObserver(); + client.Fib(new FibArgs.Builder { Limit = 6 }.Build(), recorder); + + CollectionAssert.AreEqual(new List{1, 1, 2, 3, 5, 8}, + recorder.ToList().Result.ConvertAll((n) => n.Num_)); + } + + // TODO: test Fib with limit=0 and cancellation + [Test] + public void Sum() + { + var res = client.Sum(); + foreach (var num in new long[] { 10, 20, 30 }) { + res.Inputs.OnNext(Num.CreateBuilder().SetNum_(num).Build()); + } + res.Inputs.OnCompleted(); + + Assert.AreEqual(60, res.Task.Result.Num_); + } + + [Test] + public void DivMany() + { + List divArgsList = new List{ + new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(), + new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), + new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() + }; + + var recorder = new RecordingObserver(); + var requestObserver = client.DivMany(recorder); + + foreach (var arg in divArgsList) + { + requestObserver.OnNext(arg); + } + requestObserver.OnCompleted(); + + var result = recorder.ToList().Result; + + CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient)); + CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder)); + } + + [TestFixtureTearDown] + public void Cleanup() + { + channel.Dispose(); + + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + } +} + diff --git a/src/csharp/GrpcApiTests/Properties/AssemblyInfo.cs b/src/csharp/GrpcApiTests/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..0928404429f --- /dev/null +++ b/src/csharp/GrpcApiTests/Properties/AssemblyInfo.cs @@ -0,0 +1,22 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + +// Information about this assembly is defined by the following attributes. +// Change them to the values specific to your project. +[assembly: AssemblyTitle("GrpcApiTests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("jtattermusch")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}". +// The form "{Major}.{Minor}.*" will automatically update the build and revision, +// and "{Major}.{Minor}.{Build}.*" will update just the revision. +[assembly: AssemblyVersion("1.0.*")] +// The following attributes are used to specify the signing key for the assembly, +// if desired. See the Mono documentation for more information about signing. +//[assembly: AssemblyDelaySign(false)] +//[assembly: AssemblyKeyFile("")] + diff --git a/src/csharp/GrpcCore/Call.cs b/src/csharp/GrpcCore/Call.cs index d3847a80091..66e70041805 100644 --- a/src/csharp/GrpcCore/Call.cs +++ b/src/csharp/GrpcCore/Call.cs @@ -24,8 +24,8 @@ namespace Google.GRPC.Core public Call(Method method, Channel channel) { this.methodName = method.Name; - this.requestSerializer = method.RequestMarshaller.Serialize; - this.responseDeserializer = method.ResponseMarshaller.Deserialize; + this.requestSerializer = method.RequestMarshaller.Serializer; + this.responseDeserializer = method.ResponseMarshaller.Deserializer; this.channel = channel; } diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index 2ad0f9154cc..fbfe50e4d88 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -55,13 +55,17 @@ - + + + + + \ No newline at end of file diff --git a/src/csharp/GrpcCore/IMarshaller.cs b/src/csharp/GrpcCore/IMarshaller.cs deleted file mode 100644 index eb08d8d3860..00000000000 --- a/src/csharp/GrpcCore/IMarshaller.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; - -namespace Google.GRPC.Core -{ - /// - /// For serializing and deserializing messages. - /// - public interface IMarshaller - { - byte[] Serialize(T value); - - T Deserialize(byte[] payload); - } - - /// - /// UTF-8 Marshalling for string. Useful for testing. - /// - internal class StringMarshaller : IMarshaller { - - public byte[] Serialize(string value) - { - return System.Text.Encoding.UTF8.GetBytes(value); - } - - public string Deserialize(byte[] payload) - { - return System.Text.Encoding.UTF8.GetString(payload); - } - } -} - diff --git a/src/csharp/GrpcCore/Marshaller.cs b/src/csharp/GrpcCore/Marshaller.cs new file mode 100644 index 00000000000..242524063c7 --- /dev/null +++ b/src/csharp/GrpcCore/Marshaller.cs @@ -0,0 +1,54 @@ +using System; + +namespace Google.GRPC.Core +{ + /// + /// For serializing and deserializing messages. + /// + public struct Marshaller + { + readonly Func serializer; + readonly Func deserializer; + + public Marshaller(Func serializer, Func deserializer) + { + this.serializer = serializer; + this.deserializer = deserializer; + } + + public Func Serializer + { + get + { + return this.serializer; + } + } + + public Func Deserializer + { + get + { + return this.deserializer; + } + } + } + + public static class Marshallers { + + public static Marshaller Create(Func serializer, Func deserializer) + { + return new Marshaller(serializer, deserializer); + } + + public static Marshaller StringMarshaller + { + get + { + return new Marshaller(System.Text.Encoding.UTF8.GetBytes, + System.Text.Encoding.UTF8.GetString); + } + } + + } +} + diff --git a/src/csharp/GrpcCore/Method.cs b/src/csharp/GrpcCore/Method.cs index 27901156950..9067ae8c947 100644 --- a/src/csharp/GrpcCore/Method.cs +++ b/src/csharp/GrpcCore/Method.cs @@ -17,10 +17,10 @@ namespace Google.GRPC.Core { readonly MethodType type; readonly string name; - readonly IMarshaller requestMarshaller; - readonly IMarshaller responseMarshaller; + readonly Marshaller requestMarshaller; + readonly Marshaller responseMarshaller; - public Method(MethodType type, string name, IMarshaller requestMarshaller, IMarshaller responseMarshaller) + public Method(MethodType type, string name, Marshaller requestMarshaller, Marshaller responseMarshaller) { this.type = type; this.name = name; @@ -44,7 +44,7 @@ namespace Google.GRPC.Core } } - public IMarshaller RequestMarshaller + public Marshaller RequestMarshaller { get { @@ -52,7 +52,7 @@ namespace Google.GRPC.Core } } - public IMarshaller ResponseMarshaller + public Marshaller ResponseMarshaller { get { diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 4e9d114f850..d3bc81e5744 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -38,10 +38,14 @@ namespace Google.GRPC.Core this.serverShutdownHandler = HandleServerShutdown; } - // only call before Start(), this will be in server builder in the future. - internal void AddCallHandler(string methodName, IServerCallHandler handler) { - callHandlers.Add(methodName, handler); + // only call this before Start() + public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { + foreach(var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } } + // only call before Start() public int AddPort(string addr) { return handle.AddPort(addr); @@ -113,7 +117,12 @@ namespace Google.GRPC.Core try { var ev = new EventSafeHandleNotOwned(eventPtr); - newRpcQueue.Add(new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod())); + var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod()); + + // after server shutdown, the callback returns with null call + if (!rpcInfo.Call.IsInvalid) { + newRpcQueue.Add(rpcInfo); + } } catch (Exception e) { diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index 08d527a019f..67103791b43 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -22,8 +22,8 @@ namespace Google.GRPC.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCall( - (msg) => method.ResponseMarshaller.Serialize(msg), - (payload) => method.RequestMarshaller.Deserialize(payload)); + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); asyncCall.Accept(cq); @@ -34,9 +34,6 @@ namespace Google.GRPC.Core handler(request, responseObserver); asyncCall.Halfclosed.Wait(); - // TODO: wait until writing is finished - - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); asyncCall.Finished.Wait(); } } @@ -55,8 +52,8 @@ namespace Google.GRPC.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCall( - (msg) => method.ResponseMarshaller.Serialize(msg), - (payload) => method.RequestMarshaller.Deserialize(payload)); + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); asyncCall.Accept(cq); @@ -68,8 +65,6 @@ namespace Google.GRPC.Core asyncCall.StartReadingToStream(requestObserver); asyncCall.Halfclosed.Wait(); - - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); asyncCall.Finished.Wait(); } } diff --git a/src/csharp/GrpcCore/ServerServiceDefinition.cs b/src/csharp/GrpcCore/ServerServiceDefinition.cs new file mode 100644 index 00000000000..7f1cc6284e4 --- /dev/null +++ b/src/csharp/GrpcCore/ServerServiceDefinition.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; + +namespace Google.GRPC.Core +{ + public class ServerServiceDefinition + { + readonly string serviceName; + // TODO: we would need an immutable dictionary here... + readonly Dictionary callHandlers; + + private ServerServiceDefinition(string serviceName, Dictionary callHandlers) + { + this.serviceName = serviceName; + this.callHandlers = new Dictionary(callHandlers); + } + + internal Dictionary CallHandlers + { + get + { + return this.callHandlers; + } + } + + + public static Builder CreateBuilder(String serviceName) + { + return new Builder(serviceName); + } + + public class Builder + { + readonly string serviceName; + readonly Dictionary callHandlers = new Dictionary(); + + public Builder(string serviceName) + { + this.serviceName = serviceName; + } + + public Builder AddMethod( + Method method, + UnaryRequestServerMethod handler) + { + callHandlers.Add(method.Name, ServerCalls.UnaryRequestCall(method, handler)); + return this; + } + + public Builder AddMethod( + Method method, + StreamingRequestServerMethod handler) + { + callHandlers.Add(method.Name, ServerCalls.StreamingRequestCall(method, handler)); + return this; + } + + public ServerServiceDefinition Build() + { + return new ServerServiceDefinition(serviceName, callHandlers); + } + } + } +} + diff --git a/src/csharp/GrpcCoreTests/Utils.cs b/src/csharp/GrpcCore/Utils/PortPicker.cs similarity index 90% rename from src/csharp/GrpcCoreTests/Utils.cs rename to src/csharp/GrpcCore/Utils/PortPicker.cs index b0c0a7b6205..7c83bf3886d 100644 --- a/src/csharp/GrpcCoreTests/Utils.cs +++ b/src/csharp/GrpcCore/Utils/PortPicker.cs @@ -2,14 +2,12 @@ using System; using System.Net; using System.Net.Sockets; -namespace Google.GRPC.Core.Tests +namespace Google.GRPC.Core.Utils { - /// - /// Testing utils. - /// - public class Utils + public class PortPicker { static Random random = new Random(); + // TODO: cleanup this code a bit public static int PickUnusedPort() { @@ -21,6 +19,7 @@ namespace Google.GRPC.Core.Tests } while(!IsPortAvailable(port)); return port; } + // TODO: cleanup this code a bit public static bool IsPortAvailable(int port) { diff --git a/src/csharp/GrpcApi/RecordingObserver.cs b/src/csharp/GrpcCore/Utils/RecordingObserver.cs similarity index 94% rename from src/csharp/GrpcApi/RecordingObserver.cs rename to src/csharp/GrpcCore/Utils/RecordingObserver.cs index 8ba3787905a..ca11cc4aa23 100644 --- a/src/csharp/GrpcApi/RecordingObserver.cs +++ b/src/csharp/GrpcCore/Utils/RecordingObserver.cs @@ -2,7 +2,7 @@ using System; using System.Threading.Tasks; using System.Collections.Generic; -namespace math +namespace Google.GRPC.Core.Utils { public class RecordingObserver : IObserver { diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 511683b0038..c700ffbe7ba 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -1,35 +1,37 @@ using System; using NUnit.Framework; +using Google.GRPC.Core; using Google.GRPC.Core.Internal; using System.Threading; using System.Threading.Tasks; +using Google.GRPC.Core.Utils; namespace Google.GRPC.Core.Tests { public class ClientServerTest { - string serverAddr = "localhost:" + Utils.PickUnusedPort(); + string serverAddr = "localhost:" + PortPicker.PickUnusedPort(); - private Method unaryEchoStringMethod = new Method( + Method unaryEchoStringMethod = new Method( MethodType.Unary, "/tests.Test/UnaryEchoString", - new StringMarshaller(), - new StringMarshaller()); + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); [Test] public void EmptyCall() { Server server = new Server(); - - server.AddCallHandler(unaryEchoStringMethod.Name, - ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString)); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService") + .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); server.AddPort(serverAddr); server.Start(); using (Channel channel = new Channel(serverAddr)) { - var call = CreateUnaryEchoStringCall(channel); + var call = new Call(unaryEchoStringMethod, channel); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); @@ -40,11 +42,6 @@ namespace Google.GRPC.Core.Tests GrpcEnvironment.Shutdown(); } - private Call CreateUnaryEchoStringCall(Channel channel) - { - return new Call(unaryEchoStringMethod, channel); - } - private void HandleUnaryEchoString(string request, IObserver responseObserver) { responseObserver.OnNext(request); responseObserver.OnCompleted(); diff --git a/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj b/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj index 3de0f585cda..111f0883db0 100644 --- a/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj +++ b/src/csharp/GrpcCoreTests/GrpcCoreTests.csproj @@ -39,7 +39,6 @@ - diff --git a/src/csharp/GrpcCoreTests/ServerTest.cs b/src/csharp/GrpcCoreTests/ServerTest.cs index e6de95c3363..6e13bc735f6 100644 --- a/src/csharp/GrpcCoreTests/ServerTest.cs +++ b/src/csharp/GrpcCoreTests/ServerTest.cs @@ -1,6 +1,8 @@ using System; using NUnit.Framework; using Google.GRPC.Core.Internal; +using Google.GRPC.Core; +using Google.GRPC.Core.Utils; namespace Google.GRPC.Core.Tests { @@ -10,7 +12,7 @@ namespace Google.GRPC.Core.Tests public void StartAndShutdownServer() { Server server = new Server(); - server.AddPort("localhost:" + Utils.PickUnusedPort()); + server.AddPort("localhost:" + PortPicker.PickUnusedPort()); server.Start(); server.ShutdownAsync().Wait(); diff --git a/src/csharp/GrpcDemo/Program.cs b/src/csharp/GrpcDemo/Program.cs index 258762dbb99..c442c32193e 100644 --- a/src/csharp/GrpcDemo/Program.cs +++ b/src/csharp/GrpcDemo/Program.cs @@ -12,7 +12,8 @@ namespace Google.GRPC.Demo { using (Channel channel = new Channel("127.0.0.1:23456")) { - IMathServiceClient stub = new MathServiceClientStub(channel, Timeout.InfiniteTimeSpan); + + MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel); Examples.DivExample(stub); Examples.FibExample(stub);