From e45ca5f59286ef8a3b617e5f9c49f07f9fcfeefd Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 21 Mar 2016 14:59:08 -0700 Subject: [PATCH] add support for C# generic client --- .../Grpc.IntegrationTesting/ClientRunners.cs | 67 +++++++++++++++++-- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 76e877d4aa3..a749f4a8a34 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -97,22 +97,32 @@ namespace Grpc.IntegrationTesting return new SimpleClientRunner(channel, config.ClientType, config.RpcType, - config.PayloadConfig.SimpleParams, + config.PayloadConfig, config.HistogramParams); } } /// - /// Client that starts synchronous unary calls in a closed loop. + /// Simple protobuf client. /// public class SimpleClientRunner : IClientRunner { const double SecondsToNanos = 1e9; + readonly static Marshaller ByteArrayMarshaller = new Marshaller((b) => b, (b) => b); + + readonly static Method StreamingCallMethod = new Method( + MethodType.DuplexStreaming, + "grpc.testing.BenchmarkService", + "StreamingCall", + ByteArrayMarshaller, + ByteArrayMarshaller + ); + readonly Channel channel; readonly ClientType clientType; readonly RpcType rpcType; - readonly SimpleProtoParams payloadParams; + readonly PayloadConfig payloadConfig; readonly Histogram histogram; readonly BenchmarkService.IBenchmarkServiceClient client; @@ -120,12 +130,12 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, SimpleProtoParams payloadParams, HistogramParams histogramParams) + public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); this.clientType = clientType; this.rpcType = rpcType; - this.payloadParams = payloadParams; + this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.stoppedCts = new CancellationTokenSource(); @@ -213,8 +223,45 @@ namespace Grpc.IntegrationTesting } } + private async Task RunGenericClosedLoopStreamingAsync() + { + var request = CreateByteBufferRequest(); + var stopwatch = new Stopwatch(); + + var callDetails = new CallInvocationDetails(channel, StreamingCallMethod, new CallOptions()); + + using (var call = Calls.AsyncDuplexStreamingCall(callDetails)) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + private Action GetThreadBody() { + if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) + { + GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API"); + GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); + return () => + { + RunGenericClosedLoopStreamingAsync().Wait(); + }; + } + + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); if (clientType == ClientType.SYNC_CLIENT) { GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); @@ -241,13 +288,19 @@ namespace Grpc.IntegrationTesting private SimpleRequest CreateSimpleRequest() { + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); return new SimpleRequest { - Payload = CreateZerosPayload(payloadParams.ReqSize), - ResponseSize = payloadParams.RespSize + Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize), + ResponseSize = payloadConfig.SimpleParams.RespSize }; } + private byte[] CreateByteBufferRequest() + { + return new byte[payloadConfig.BytebufParams.ReqSize]; + } + private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) };