Merge pull request #415 from jtattermusch/csharp_progress

Polishing C# math service implementation and adding inprocess tests
changes/78/217578/1
Tim Emiola 10 years ago
commit f2d5e409d3
  1. 6
      src/csharp/Grpc.sln
  2. 74
      src/csharp/GrpcApi/DummyMathServiceClient.cs
  3. 15
      src/csharp/GrpcApi/Examples.cs
  4. 6
      src/csharp/GrpcApi/GrpcApi.csproj
  5. 26
      src/csharp/GrpcApi/IMathServiceClient.cs
  6. 124
      src/csharp/GrpcApi/MathGrpc.cs
  7. 75
      src/csharp/GrpcApi/MathServiceClientStub.cs
  8. 119
      src/csharp/GrpcApi/MathServiceImpl.cs
  9. 2
      src/csharp/GrpcApiTests/.gitignore
  10. 56
      src/csharp/GrpcApiTests/GrpcApiTests.csproj
  11. 115
      src/csharp/GrpcApiTests/MathClientServerTests.cs
  12. 22
      src/csharp/GrpcApiTests/Properties/AssemblyInfo.cs
  13. 4
      src/csharp/GrpcCore/Call.cs
  14. 6
      src/csharp/GrpcCore/GrpcCore.csproj
  15. 31
      src/csharp/GrpcCore/IMarshaller.cs
  16. 54
      src/csharp/GrpcCore/Marshaller.cs
  17. 10
      src/csharp/GrpcCore/Method.cs
  18. 17
      src/csharp/GrpcCore/Server.cs
  19. 13
      src/csharp/GrpcCore/ServerCallHandler.cs
  20. 65
      src/csharp/GrpcCore/ServerServiceDefinition.cs
  21. 9
      src/csharp/GrpcCore/Utils/PortPicker.cs
  22. 2
      src/csharp/GrpcCore/Utils/RecordingObserver.cs
  23. 23
      src/csharp/GrpcCoreTests/ClientServerTest.cs
  24. 1
      src/csharp/GrpcCoreTests/GrpcCoreTests.csproj
  25. 4
      src/csharp/GrpcCoreTests/ServerTest.cs
  26. 3
      src/csharp/GrpcDemo/Program.cs

@ -9,12 +9,18 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcCore", "GrpcCore\GrpcCo
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcCoreTests", "GrpcCoreTests\GrpcCoreTests.csproj", "{86EC5CB4-4EA2-40A2-8057-86542A0353BB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcApiTests", "GrpcApiTests\GrpcApiTests.csproj", "{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x86 = Debug|x86
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Debug|x86.ActiveCfg = Debug|Any CPU
{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Debug|x86.Build.0 = Debug|Any CPU
{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Release|x86.ActiveCfg = Release|Any CPU
{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}.Release|x86.Build.0 = Release|Any CPU
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Debug|x86.ActiveCfg = Debug|x86
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Debug|x86.Build.0 = Debug|x86
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Release|x86.ActiveCfg = Release|x86

@ -1,74 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
namespace math
{
// /// <summary>
// /// Dummy local implementation of math service.
// /// </summary>
// public class DummyMathServiceClient : IMathServiceClient
// {
// public DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken))
// {
// // TODO: cancellation...
// return DivInternal(args);
// }
//
// public Task<DivReply> DivAsync(DivArgs args, CancellationToken token = default(CancellationToken))
// {
// return Task.Factory.StartNew(() => DivInternal(args), token);
// }
//
// public IObservable<Num> Fib(FibArgs args, CancellationToken token = default(CancellationToken))
// {
// if (args.Limit > 0)
// {
// // TODO: cancellation
// return FibInternal(args.Limit).ToObservable();
// }
//
// throw new NotImplementedException("Not implemented yet");
// }
//
// public Task<Num> Sum(IObservable<Num> inputs, CancellationToken token = default(CancellationToken))
// {
// // TODO: implement
// inputs = null;
// return Task.Factory.StartNew(() => Num.CreateBuilder().Build(), token);
// }
//
// public IObservable<DivReply> DivMany(IObservable<DivArgs> inputs, CancellationToken token = default(CancellationToken))
// {
// // TODO: implement
// inputs = null;
// return new List<DivReply> { }.ToObservable ();
// }
//
//
// DivReply DivInternal(DivArgs args)
// {
// long quotient = args.Dividend / args.Divisor;
// long remainder = args.Dividend % args.Divisor;
// return new DivReply.Builder{ Quotient = quotient, Remainder = remainder }.Build();
// }
//
// IEnumerable<Num> FibInternal(long n)
// {
// long a = 0;
// yield return new Num.Builder{Num_=a}.Build();
//
// long b = 1;
// for (long i = 0; i < n - 1; i++)
// {
// long temp = a;
// a = b;
// b = temp + b;
// yield return new Num.Builder{Num_=a}.Build();
// }
// }
// }
}

@ -2,32 +2,33 @@ using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core.Utils;
namespace math
{
public class Examples
{
public static void DivExample(IMathServiceClient stub)
public static void DivExample(MathGrpc.IMathServiceClient stub)
{
DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
Console.WriteLine("Div Result: " + result);
}
public static void DivAsyncExample(IMathServiceClient stub)
public static void DivAsyncExample(MathGrpc.IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void DivAsyncWithCancellationExample(IMathServiceClient stub)
public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void FibExample(IMathServiceClient stub)
public static void FibExample(MathGrpc.IMathServiceClient stub)
{
var recorder = new RecordingObserver<Num>();
stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder);
@ -36,7 +37,7 @@ namespace math
Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result));
}
public static void SumExample(IMathServiceClient stub)
public static void SumExample(MathGrpc.IMathServiceClient stub)
{
List<Num> numbers = new List<Num>{new Num.Builder { Num_ = 1 }.Build(),
new Num.Builder { Num_ = 2 }.Build(),
@ -51,7 +52,7 @@ namespace math
Console.WriteLine("Sum Result: " + res.Task.Result);
}
public static void DivManyExample(IMathServiceClient stub)
public static void DivManyExample(MathGrpc.IMathServiceClient stub)
{
List<DivArgs> divArgsList = new List<DivArgs>{
new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
@ -71,7 +72,7 @@ namespace math
Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result));
}
public static void DependendRequestsExample(IMathServiceClient stub)
public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub)
{
var numberList = new List<Num>
{ new Num.Builder{ Num_ = 1 }.Build(),

@ -48,11 +48,9 @@
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Examples.cs" />
<Compile Include="IMathServiceClient.cs" />
<Compile Include="Math.cs" />
<Compile Include="DummyMathServiceClient.cs" />
<Compile Include="MathServiceClientStub.cs" />
<Compile Include="RecordingObserver.cs" />
<Compile Include="MathGrpc.cs" />
<Compile Include="MathServiceImpl.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -1,26 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
namespace math
{
/// <summary>
/// Hand-written stub for MathService defined in math.proto.
/// This code will be generated by gRPC codegen in the future.
/// </summary>
public interface IMathServiceClient
{
DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken));
Task<DivReply> DivAsync(DivArgs args, CancellationToken token = default(CancellationToken));
Task Fib(FibArgs args, IObserver<Num> outputs, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
IObserver<DivArgs> DivMany(IObserver<DivReply> outputs, CancellationToken token = default(CancellationToken));
}
}

@ -0,0 +1,124 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
namespace math
{
/// <summary>
/// Math service definitions (this is handwritten version of code that will normally be generated).
/// </summary>
public class MathGrpc
{
readonly static Marshaller<DivArgs> divArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom);
readonly static Marshaller<DivReply> divReplyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom);
readonly static Marshaller<Num> numMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom);
readonly static Marshaller<FibArgs> fibArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom);
readonly static Method<DivArgs, DivReply> divMethod = new Method<DivArgs, DivReply>(
MethodType.Unary,
"/math.Math/Div",
divArgsMarshaller,
divReplyMarshaller
);
readonly static Method<FibArgs, Num> fibMethod = new Method<FibArgs, Num>(
MethodType.ServerStreaming,
"/math.Math/Fib",
fibArgsMarshaller,
numMarshaller
);
readonly static Method<Num, Num> sumMethod = new Method<Num, Num>(
MethodType.ClientStreaming,
"/math.Math/Sum",
numMarshaller,
numMarshaller
);
readonly static Method<DivArgs, DivReply> divManyMethod = new Method<DivArgs, DivReply>(
MethodType.DuplexStreaming,
"/math.Math/DivMany",
divArgsMarshaller,
divReplyMarshaller
);
public interface IMathServiceClient
{
DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken));
Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken));
}
public class MathServiceClientStub : IMathServiceClient
{
readonly Channel channel;
public MathServiceClientStub(Channel channel)
{
this.channel = channel;
}
public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>(divMethod, channel);
return Calls.BlockingUnaryCall(call, request, token);
}
public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>(divMethod, channel);
return Calls.AsyncUnaryCall(call, request, token);
}
public Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel);
return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<Num, Num>(sumMethod, channel);
return Calls.AsyncClientStreamingCall(call, token);
}
public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>(divManyMethod, channel);
return Calls.DuplexStreamingCall(call, responseObserver, token);
}
}
// server-side interface
public interface IMathService
{
void Div(DivArgs request, IObserver<DivReply> responseObserver);
void Fib(FibArgs request, IObserver<Num> responseObserver);
IObserver<Num> Sum(IObserver<Num> responseObserver);
IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver);
}
public static ServerServiceDefinition BindService(IMathService serviceImpl)
{
return ServerServiceDefinition.CreateBuilder("/math.Math/")
.AddMethod(divMethod, serviceImpl.Div)
.AddMethod(fibMethod, serviceImpl.Fib)
.AddMethod(sumMethod, serviceImpl.Sum)
.AddMethod(divManyMethod, serviceImpl.DivMany).Build();
}
public static IMathServiceClient NewStub(Channel channel)
{
return new MathServiceClientStub(channel);
}
}
}

@ -1,75 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
namespace math
{
/// <summary>
/// Implementation of math service stub (this is handwritten version of code
/// that will normally be generated).
/// </summary>
public class MathServiceClientStub : IMathServiceClient
{
readonly Channel channel;
readonly TimeSpan methodTimeout;
public MathServiceClientStub(Channel channel, TimeSpan methodTimeout)
{
this.channel = channel;
this.methodTimeout = methodTimeout;
}
public DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>("/math.Math/Div", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel);
return Calls.BlockingUnaryCall(call, args, token);
}
public Task<DivReply> DivAsync(DivArgs args, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>("/math.Math/Div", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel);
return Calls.AsyncUnaryCall(call, args, token);
}
public Task Fib(FibArgs args, IObserver<Num> outputs, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>("/math.Math/Fib", Serialize_FibArgs, Deserialize_Num, methodTimeout, channel);
return Calls.AsyncServerStreamingCall(call, args, outputs, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<Num, Num>("/math.Math/Sum", Serialize_Num, Deserialize_Num, methodTimeout, channel);
return Calls.AsyncClientStreamingCall(call, token);
}
public IObserver<DivArgs> DivMany(IObserver<DivReply> outputs, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>("/math.Math/DivMany", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel);
return Calls.DuplexStreamingCall(call, outputs, token);
}
private static byte[] Serialize_DivArgs(DivArgs arg) {
return arg.ToByteArray();
}
private static byte[] Serialize_FibArgs(FibArgs arg) {
return arg.ToByteArray();
}
private static byte[] Serialize_Num(Num arg) {
return arg.ToByteArray();
}
private static DivReply Deserialize_DivReply(byte[] payload) {
return DivReply.CreateBuilder().MergeFrom(payload).Build();
}
private static Num Deserialize_Num(byte[] payload) {
return Num.CreateBuilder().MergeFrom(payload).Build();
}
}
}

@ -0,0 +1,119 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core.Utils;
namespace math
{
/// <summary>
/// Implementation of MathService server
/// </summary>
public class MathServiceImpl : MathGrpc.IMathService
{
public void Div(DivArgs request, IObserver<DivReply> responseObserver)
{
var response = DivInternal(request);
responseObserver.OnNext(response);
responseObserver.OnCompleted();
}
public void Fib(FibArgs request, IObserver<Num> responseObserver)
{
if (request.Limit <= 0)
{
// TODO: support cancellation....
throw new NotImplementedException("Not implemented yet");
}
if (request.Limit > 0)
{
foreach (var num in FibInternal(request.Limit))
{
responseObserver.OnNext(num);
}
responseObserver.OnCompleted();
}
}
public IObserver<Num> Sum(IObserver<Num> responseObserver)
{
var recorder = new RecordingObserver<Num>();
Task.Factory.StartNew(() => {
List<Num> inputs = recorder.ToList().Result;
long sum = 0;
foreach (Num num in inputs)
{
sum += num.Num_;
}
responseObserver.OnNext(Num.CreateBuilder().SetNum_(sum).Build());
responseObserver.OnCompleted();
});
return recorder;
}
public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver)
{
return new DivObserver(responseObserver);
}
static DivReply DivInternal(DivArgs args)
{
long quotient = args.Dividend / args.Divisor;
long remainder = args.Dividend % args.Divisor;
return new DivReply.Builder { Quotient = quotient, Remainder = remainder }.Build();
}
static IEnumerable<Num> FibInternal(long n)
{
long a = 1;
yield return new Num.Builder { Num_=a }.Build();
long b = 1;
for (long i = 0; i < n - 1; i++)
{
long temp = a;
a = b;
b = temp + b;
yield return new Num.Builder { Num_=a }.Build();
}
}
private class DivObserver : IObserver<DivArgs> {
readonly IObserver<DivReply> responseObserver;
public DivObserver(IObserver<DivReply> responseObserver)
{
this.responseObserver = responseObserver;
}
public void OnCompleted()
{
Task.Factory.StartNew(() =>
responseObserver.OnCompleted());
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnNext(DivArgs value)
{
// TODO: currently we need this indirection because
// responseObserver waits for write to finish, this
// callback is called from grpc threadpool which
// currently only has one thread.
// Same story for OnCompleted().
Task.Factory.StartNew(() =>
responseObserver.OnNext(DivInternal(value)));
}
}
}
}

@ -0,0 +1,2 @@
test-results
bin

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>10.0.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{143B1C29-C442-4BE0-BF3F-A8F92288AC9F}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>GrpcApiTests</RootNamespace>
<AssemblyName>GrpcApiTests</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug</OutputPath>
<DefineConstants>DEBUG;</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>full</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release</OutputPath>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="nunit.framework, Version=2.6.0.0, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77">
<Private>False</Private>
</Reference>
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\lib\Google.ProtocolBuffers.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="MathClientServerTests.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
<ProjectReference Include="..\GrpcApi\GrpcApi.csproj">
<Project>{7DC1433E-3225-42C7-B7EA-546D56E27A4B}</Project>
<Name>GrpcApi</Name>
</ProjectReference>
<ProjectReference Include="..\GrpcCore\GrpcCore.csproj">
<Project>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</Project>
<Name>GrpcCore</Name>
</ProjectReference>
</ItemGroup>
</Project>

@ -0,0 +1,115 @@
using System;
using NUnit.Framework;
using Google.GRPC.Core;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
using System.Collections.Generic;
namespace math.Tests
{
/// <summary>
/// Math client talks to local math server.
/// </summary>
public class MathClientServerTest
{
string serverAddr = "localhost:" + PortPicker.PickUnusedPort();
Server server;
Channel channel;
MathGrpc.IMathServiceClient client;
[TestFixtureSetUp]
public void Init()
{
server = new Server();
server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl()));
server.AddPort(serverAddr);
server.Start();
channel = new Channel(serverAddr);
client = MathGrpc.NewStub(channel);
}
[Test]
public void Div1()
{
DivReply response = client.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
Assert.AreEqual(3, response.Quotient);
Assert.AreEqual(1, response.Remainder);
}
[Test]
public void Div2()
{
DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 1 }.Build());
Assert.AreEqual(0, response.Quotient);
Assert.AreEqual(0, response.Remainder);
}
// TODO: test division by zero
[Test]
public void DivAsync()
{
DivReply response = client.DivAsync(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build()).Result;
Assert.AreEqual(3, response.Quotient);
Assert.AreEqual(1, response.Remainder);
}
[Test]
public void Fib()
{
var recorder = new RecordingObserver<Num>();
client.Fib(new FibArgs.Builder { Limit = 6 }.Build(), recorder);
CollectionAssert.AreEqual(new List<long>{1, 1, 2, 3, 5, 8},
recorder.ToList().Result.ConvertAll((n) => n.Num_));
}
// TODO: test Fib with limit=0 and cancellation
[Test]
public void Sum()
{
var res = client.Sum();
foreach (var num in new long[] { 10, 20, 30 }) {
res.Inputs.OnNext(Num.CreateBuilder().SetNum_(num).Build());
}
res.Inputs.OnCompleted();
Assert.AreEqual(60, res.Task.Result.Num_);
}
[Test]
public void DivMany()
{
List<DivArgs> divArgsList = new List<DivArgs>{
new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
var recorder = new RecordingObserver<DivReply>();
var requestObserver = client.DivMany(recorder);
foreach (var arg in divArgsList)
{
requestObserver.OnNext(arg);
}
requestObserver.OnCompleted();
var result = recorder.ToList().Result;
CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder));
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -0,0 +1,22 @@
using System.Reflection;
using System.Runtime.CompilerServices;
// Information about this assembly is defined by the following attributes.
// Change them to the values specific to your project.
[assembly: AssemblyTitle("GrpcApiTests")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("jtattermusch")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion("1.0.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]
//[assembly: AssemblyKeyFile("")]

@ -24,8 +24,8 @@ namespace Google.GRPC.Core
public Call(Method<TRequest, TResponse> method, Channel channel)
{
this.methodName = method.Name;
this.requestSerializer = method.RequestMarshaller.Serialize;
this.responseDeserializer = method.ResponseMarshaller.Deserialize;
this.requestSerializer = method.RequestMarshaller.Serializer;
this.responseDeserializer = method.ResponseMarshaller.Deserializer;
this.channel = channel;
}

@ -55,13 +55,17 @@
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" />
<Compile Include="Method.cs" />
<Compile Include="IMarshaller.cs" />
<Compile Include="ServerCalls.cs" />
<Compile Include="ServerCallHandler.cs" />
<Compile Include="Internal\ServerWritingObserver.cs" />
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\RecordingObserver.cs" />
<Compile Include="Utils\PortPicker.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
<Folder Include="Internal\" />
<Folder Include="Utils\" />
</ItemGroup>
</Project>

@ -1,31 +0,0 @@
using System;
namespace Google.GRPC.Core
{
/// <summary>
/// For serializing and deserializing messages.
/// </summary>
public interface IMarshaller<T>
{
byte[] Serialize(T value);
T Deserialize(byte[] payload);
}
/// <summary>
/// UTF-8 Marshalling for string. Useful for testing.
/// </summary>
internal class StringMarshaller : IMarshaller<string> {
public byte[] Serialize(string value)
{
return System.Text.Encoding.UTF8.GetBytes(value);
}
public string Deserialize(byte[] payload)
{
return System.Text.Encoding.UTF8.GetString(payload);
}
}
}

@ -0,0 +1,54 @@
using System;
namespace Google.GRPC.Core
{
/// <summary>
/// For serializing and deserializing messages.
/// </summary>
public struct Marshaller<T>
{
readonly Func<T,byte[]> serializer;
readonly Func<byte[],T> deserializer;
public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
}
public Func<T, byte[]> Serializer
{
get
{
return this.serializer;
}
}
public Func<byte[], T> Deserializer
{
get
{
return this.deserializer;
}
}
}
public static class Marshallers {
public static Marshaller<T> Create<T>(Func<T,byte[]> serializer, Func<byte[],T> deserializer)
{
return new Marshaller<T>(serializer, deserializer);
}
public static Marshaller<string> StringMarshaller
{
get
{
return new Marshaller<string>(System.Text.Encoding.UTF8.GetBytes,
System.Text.Encoding.UTF8.GetString);
}
}
}
}

@ -17,10 +17,10 @@ namespace Google.GRPC.Core
{
readonly MethodType type;
readonly string name;
readonly IMarshaller<TRequest> requestMarshaller;
readonly IMarshaller<TResponse> responseMarshaller;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
public Method(MethodType type, string name, IMarshaller<TRequest> requestMarshaller, IMarshaller<TResponse> responseMarshaller)
public Method(MethodType type, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller)
{
this.type = type;
this.name = name;
@ -44,7 +44,7 @@ namespace Google.GRPC.Core
}
}
public IMarshaller<TRequest> RequestMarshaller
public Marshaller<TRequest> RequestMarshaller
{
get
{
@ -52,7 +52,7 @@ namespace Google.GRPC.Core
}
}
public IMarshaller<TResponse> ResponseMarshaller
public Marshaller<TResponse> ResponseMarshaller
{
get
{

@ -38,10 +38,14 @@ namespace Google.GRPC.Core
this.serverShutdownHandler = HandleServerShutdown;
}
// only call before Start(), this will be in server builder in the future.
internal void AddCallHandler(string methodName, IServerCallHandler handler) {
callHandlers.Add(methodName, handler);
// only call this before Start()
public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) {
foreach(var entry in serviceDefinition.CallHandlers)
{
callHandlers.Add(entry.Key, entry.Value);
}
}
// only call before Start()
public int AddPort(string addr) {
return handle.AddPort(addr);
@ -113,7 +117,12 @@ namespace Google.GRPC.Core
try
{
var ev = new EventSafeHandleNotOwned(eventPtr);
newRpcQueue.Add(new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod()));
var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod());
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid) {
newRpcQueue.Add(rpcInfo);
}
}
catch (Exception e)
{

@ -22,8 +22,8 @@ namespace Google.GRPC.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
(msg) => method.ResponseMarshaller.Serialize(msg),
(payload) => method.RequestMarshaller.Deserialize(payload));
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
@ -34,9 +34,6 @@ namespace Google.GRPC.Core
handler(request, responseObserver);
asyncCall.Halfclosed.Wait();
// TODO: wait until writing is finished
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
}
}
@ -55,8 +52,8 @@ namespace Google.GRPC.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
(msg) => method.ResponseMarshaller.Serialize(msg),
(payload) => method.RequestMarshaller.Deserialize(payload));
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
@ -68,8 +65,6 @@ namespace Google.GRPC.Core
asyncCall.StartReadingToStream(requestObserver);
asyncCall.Halfclosed.Wait();
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
}
}

@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
namespace Google.GRPC.Core
{
public class ServerServiceDefinition
{
readonly string serviceName;
// TODO: we would need an immutable dictionary here...
readonly Dictionary<string, IServerCallHandler> callHandlers;
private ServerServiceDefinition(string serviceName, Dictionary<string, IServerCallHandler> callHandlers)
{
this.serviceName = serviceName;
this.callHandlers = new Dictionary<string, IServerCallHandler>(callHandlers);
}
internal Dictionary<string, IServerCallHandler> CallHandlers
{
get
{
return this.callHandlers;
}
}
public static Builder CreateBuilder(String serviceName)
{
return new Builder(serviceName);
}
public class Builder
{
readonly string serviceName;
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<String, IServerCallHandler>();
public Builder(string serviceName)
{
this.serviceName = serviceName;
}
public Builder AddMethod<TRequest, TResponse>(
Method<TRequest, TResponse> method,
UnaryRequestServerMethod<TRequest, TResponse> handler)
{
callHandlers.Add(method.Name, ServerCalls.UnaryRequestCall(method, handler));
return this;
}
public Builder AddMethod<TRequest, TResponse>(
Method<TRequest, TResponse> method,
StreamingRequestServerMethod<TRequest, TResponse> handler)
{
callHandlers.Add(method.Name, ServerCalls.StreamingRequestCall(method, handler));
return this;
}
public ServerServiceDefinition Build()
{
return new ServerServiceDefinition(serviceName, callHandlers);
}
}
}
}

@ -2,14 +2,12 @@ using System;
using System.Net;
using System.Net.Sockets;
namespace Google.GRPC.Core.Tests
namespace Google.GRPC.Core.Utils
{
/// <summary>
/// Testing utils.
/// </summary>
public class Utils
public class PortPicker
{
static Random random = new Random();
// TODO: cleanup this code a bit
public static int PickUnusedPort()
{
@ -21,6 +19,7 @@ namespace Google.GRPC.Core.Tests
} while(!IsPortAvailable(port));
return port;
}
// TODO: cleanup this code a bit
public static bool IsPortAvailable(int port)
{

@ -2,7 +2,7 @@ using System;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace math
namespace Google.GRPC.Core.Utils
{
public class RecordingObserver<T> : IObserver<T>
{

@ -1,35 +1,37 @@
using System;
using NUnit.Framework;
using Google.GRPC.Core;
using Google.GRPC.Core.Internal;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
namespace Google.GRPC.Core.Tests
{
public class ClientServerTest
{
string serverAddr = "localhost:" + Utils.PickUnusedPort();
string serverAddr = "localhost:" + PortPicker.PickUnusedPort();
private Method<string, string> unaryEchoStringMethod = new Method<string, string>(
Method<string, string> unaryEchoStringMethod = new Method<string, string>(
MethodType.Unary,
"/tests.Test/UnaryEchoString",
new StringMarshaller(),
new StringMarshaller());
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
[Test]
public void EmptyCall()
{
Server server = new Server();
server.AddCallHandler(unaryEchoStringMethod.Name,
ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString));
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
server.AddPort(serverAddr);
server.Start();
using (Channel channel = new Channel(serverAddr))
{
var call = CreateUnaryEchoStringCall(channel);
var call = new Call<string, string>(unaryEchoStringMethod, channel);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
@ -40,11 +42,6 @@ namespace Google.GRPC.Core.Tests
GrpcEnvironment.Shutdown();
}
private Call<string, string> CreateUnaryEchoStringCall(Channel channel)
{
return new Call<string, string>(unaryEchoStringMethod, channel);
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
responseObserver.OnNext(request);
responseObserver.OnCompleted();

@ -39,7 +39,6 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />
<Compile Include="Utils.cs" />
<Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" />
</ItemGroup>

@ -1,6 +1,8 @@
using System;
using NUnit.Framework;
using Google.GRPC.Core.Internal;
using Google.GRPC.Core;
using Google.GRPC.Core.Utils;
namespace Google.GRPC.Core.Tests
{
@ -10,7 +12,7 @@ namespace Google.GRPC.Core.Tests
public void StartAndShutdownServer() {
Server server = new Server();
server.AddPort("localhost:" + Utils.PickUnusedPort());
server.AddPort("localhost:" + PortPicker.PickUnusedPort());
server.Start();
server.ShutdownAsync().Wait();

@ -12,7 +12,8 @@ namespace Google.GRPC.Demo
{
using (Channel channel = new Channel("127.0.0.1:23456"))
{
IMathServiceClient stub = new MathServiceClientStub(channel, Timeout.InfiniteTimeSpan);
MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel);
Examples.DivExample(stub);
Examples.FibExample(stub);

Loading…
Cancel
Save