From 253769e92d5ff1883c1623fd0ee130ae4ce4b380 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 21 Mar 2016 16:25:59 -0700 Subject: [PATCH] add ASYNC_GENERIC_SERVER support for C# --- .../Grpc.IntegrationTesting/ClientRunners.cs | 21 ++---- .../Grpc.IntegrationTesting/GenericService.cs | 71 +++++++++++++++++++ .../Grpc.IntegrationTesting.csproj | 1 + .../Grpc.IntegrationTesting/ServerRunners.cs | 46 ++++++++++-- 4 files changed, 116 insertions(+), 23 deletions(-) create mode 100644 src/csharp/Grpc.IntegrationTesting/GenericService.cs diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index a749f4a8a34..e6dc2321c4c 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -94,7 +94,7 @@ namespace Grpc.IntegrationTesting } var channel = new Channel(target, credentials, channelOptions); - return new SimpleClientRunner(channel, + return new ClientRunnerImpl(channel, config.ClientType, config.RpcType, config.PayloadConfig, @@ -102,23 +102,10 @@ namespace Grpc.IntegrationTesting } } - /// - /// Simple protobuf client. - /// - public class SimpleClientRunner : IClientRunner + public class ClientRunnerImpl : IClientRunner { const double SecondsToNanos = 1e9; - readonly static Marshaller ByteArrayMarshaller = new Marshaller((b) => b, (b) => b); - - readonly static Method StreamingCallMethod = new Method( - MethodType.DuplexStreaming, - "grpc.testing.BenchmarkService", - "StreamingCall", - ByteArrayMarshaller, - ByteArrayMarshaller - ); - readonly Channel channel; readonly ClientType clientType; readonly RpcType rpcType; @@ -130,7 +117,7 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) + public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); this.clientType = clientType; @@ -228,7 +215,7 @@ namespace Grpc.IntegrationTesting var request = CreateByteBufferRequest(); var stopwatch = new Stopwatch(); - var callDetails = new CallInvocationDetails(channel, StreamingCallMethod, new CallOptions()); + var callDetails = new CallInvocationDetails(channel, GenericService.StreamingCallMethod, new CallOptions()); using (var call = Calls.AsyncDuplexStreamingCall(callDetails)) { diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs new file mode 100644 index 00000000000..c6128264ac5 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs @@ -0,0 +1,71 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + /// + /// Utility methods for defining and calling a service that doesn't use protobufs + /// for serialization/deserialization. + /// + public static class GenericService + { + readonly static Marshaller ByteArrayMarshaller = new Marshaller((b) => b, (b) => b); + + public readonly static Method StreamingCallMethod = new Method( + MethodType.DuplexStreaming, + "grpc.testing.BenchmarkService", + "StreamingCall", + ByteArrayMarshaller, + ByteArrayMarshaller + ); + + public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod handler) + { + return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName) + .AddMethod(StreamingCallMethod, handler).Build(); + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 372991374ee..4c049944eaf 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -120,6 +120,7 @@ + diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 516436ac5ac..c326378cfac 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -61,7 +61,6 @@ namespace Grpc.IntegrationTesting public static IServerRunner CreateStarted(ServerConfig config) { Logger.Debug("ServerConfig: {0}", config); - GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER, "Only ASYNC_SERVER supported for C# QpsWorker"); var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure; if (config.AsyncServerThreads != 0) @@ -77,17 +76,53 @@ namespace Grpc.IntegrationTesting Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value"); } - GrpcPreconditions.CheckArgument(config.PayloadConfig == null, - "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); + ServerServiceDefinition service = null; + if (config.ServerType == ServerType.ASYNC_SERVER) + { + GrpcPreconditions.CheckArgument(config.PayloadConfig == null, + "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); + service = BenchmarkService.BindService(new BenchmarkServiceImpl()); + } + else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER) + { + var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize); + service = GenericService.BindHandler(genericService.StreamingCall); + } + else + { + throw new ArgumentException("Unsupported ServerType"); + } + var server = new Server { - Services = { BenchmarkService.BindService(new BenchmarkServiceImpl()) }, + Services = { service }, Ports = { new ServerPort("[::]", config.Port, credentials) } }; server.Start(); return new ServerRunnerImpl(server); } + + private class GenericServiceImpl + { + readonly byte[] response; + + public GenericServiceImpl(int responseSize) + { + this.response = new byte[responseSize]; + } + + /// + /// Generic streaming call handler. + /// + public async Task StreamingCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + await requestStream.ForEachAsync(async request => + { + await responseStream.WriteAsync(response); + }); + } + } } /// @@ -136,6 +171,5 @@ namespace Grpc.IntegrationTesting { return server.ShutdownAsync(); } - } - + } }