From dbdccc4f4d31716215366450d55f718696a532a7 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 17 Apr 2020 13:39:43 +0200 Subject: [PATCH] initial implementation of XdsInteropClient --- .../.gitignore | 3 + .../Grpc.IntegrationTesting.XdsClient.csproj | 24 ++ .../Program.cs | 31 ++ .../Properties/AssemblyInfo.cs | 29 ++ .../XdsInteropClient.cs | 287 ++++++++++++++++++ .../XdsInteropClientTest.cs | 134 ++++++++ src/csharp/Grpc.sln | 6 + src/csharp/tests.json | 3 +- 8 files changed, 516 insertions(+), 1 deletion(-) create mode 100644 src/csharp/Grpc.IntegrationTesting.XdsClient/.gitignore create mode 100755 src/csharp/Grpc.IntegrationTesting.XdsClient/Grpc.IntegrationTesting.XdsClient.csproj create mode 100644 src/csharp/Grpc.IntegrationTesting.XdsClient/Program.cs create mode 100644 src/csharp/Grpc.IntegrationTesting.XdsClient/Properties/AssemblyInfo.cs create mode 100644 src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs create mode 100644 src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs diff --git a/src/csharp/Grpc.IntegrationTesting.XdsClient/.gitignore b/src/csharp/Grpc.IntegrationTesting.XdsClient/.gitignore new file mode 100644 index 00000000000..a382af2294f --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.XdsClient/.gitignore @@ -0,0 +1,3 @@ +bin +obj + diff --git a/src/csharp/Grpc.IntegrationTesting.XdsClient/Grpc.IntegrationTesting.XdsClient.csproj b/src/csharp/Grpc.IntegrationTesting.XdsClient/Grpc.IntegrationTesting.XdsClient.csproj new file mode 100755 index 00000000000..a5baf96357d --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.XdsClient/Grpc.IntegrationTesting.XdsClient.csproj @@ -0,0 +1,24 @@ + + + + + + net45;netcoreapp2.1 + Exe + true + + + + + + + + + + + + + + + + diff --git a/src/csharp/Grpc.IntegrationTesting.XdsClient/Program.cs b/src/csharp/Grpc.IntegrationTesting.XdsClient/Program.cs new file mode 100644 index 00000000000..19214439cf1 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.XdsClient/Program.cs @@ -0,0 +1,31 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.IntegrationTesting; + +namespace Grpc.IntegrationTesting.XdsClient +{ + class Program + { + public static void Main(string[] args) + { + XdsInteropClient.Run(args); + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting.XdsClient/Properties/AssemblyInfo.cs b/src/csharp/Grpc.IntegrationTesting.XdsClient/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..80289be1c21 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.XdsClient/Properties/AssemblyInfo.cs @@ -0,0 +1,29 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System.Reflection; +using System.Runtime.CompilerServices; + +[assembly: AssemblyTitle("Grpc.IntegrationTesting.XdsClient")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("Google Inc. All rights reserved.")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] diff --git a/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs b/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs new file mode 100644 index 00000000000..b988a41b25d --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs @@ -0,0 +1,287 @@ +#region Copyright notice and license + +// Copyright 2020 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +using CommandLine; +using Grpc.Core; +using Grpc.Core.Logging; +using Grpc.Core.Internal; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + public class XdsInteropClient + { + internal class ClientOptions + { + [Option("num_channels", Default = 1)] + public int NumChannels { get; set; } + + [Option("qps", Default = 1)] + public int Qps { get; set; } + + [Option("server", Default = "localhost:8080")] + public string Server { get; set; } + + [Option("stats_port", Default = 8081)] + public int StatsPort { get; set; } + + [Option("rpc_timeout_sec", Default = 30)] + public int RpcTimeoutSec { get; set; } + + [Option("print_response", Default = false)] + public bool PrintResponse { get; set; } + } + + ClientOptions options; + + StatsWatcher statsWatcher = new StatsWatcher(); + + // make watcher accessible by tests + internal StatsWatcher StatsWatcher => statsWatcher; + + internal XdsInteropClient(ClientOptions options) + { + this.options = options; + } + + public static void Run(string[] args) + { + GrpcEnvironment.SetLogger(new ConsoleLogger()); + var parserResult = Parser.Default.ParseArguments(args) + .WithNotParsed(errors => Environment.Exit(1)) + .WithParsed(options => + { + var xdsInteropClient = new XdsInteropClient(options); + xdsInteropClient.RunAsync().Wait(); + }); + } + + private async Task RunAsync() + { + var server = new Server + { + Services = { LoadBalancerStatsService.BindService(new LoadBalancerStatsServiceImpl(statsWatcher)) } + }; + + string host = "0.0.0.0"; + server.Ports.Add(host, options.StatsPort, ServerCredentials.Insecure); + Console.WriteLine($"Running server on {host}:{options.StatsPort}"); + server.Start(); + + var cancellationTokenSource = new CancellationTokenSource(); + await RunChannelsAsync(cancellationTokenSource.Token); + + await server.ShutdownAsync(); + } + + // method made internal to make it runnable by tests + internal async Task RunChannelsAsync(CancellationToken cancellationToken) + { + var channelTasks = new List(); + for (int channelId = 0; channelId < options.NumChannels; channelId++) + { + var channelTask = RunSingleChannelAsync(channelId, cancellationToken); + channelTasks.Add(channelTask); + } + + for (int channelId = 0; channelId < options.NumChannels; channelId++) + { + await channelTasks[channelId]; + } + } + + private async Task RunSingleChannelAsync(int channelId, CancellationToken cancellationToken) + { + Console.WriteLine($"Starting channel {channelId}"); + var channel = new Channel(options.Server, ChannelCredentials.Insecure); + var client = new TestService.TestServiceClient(channel); + + var inflightTasks = new List(); + int millisPerQuery = (int)(1000.0 / options.Qps); // qps value is per-channel + while (!cancellationToken.IsCancellationRequested) + { + inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken)); + + await CleanupCompletedTasksAsync(inflightTasks); + + Console.WriteLine($"Currently {inflightTasks.Count} in-flight RPCs"); + await Task.Delay(millisPerQuery); // not accurate, but good enough for low QPS. + } + + Console.WriteLine($"Shutting down channel {channelId}"); + await channel.ShutdownAsync(); + Console.WriteLine($"Channel shutdown {channelId}"); + } + + private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken) + { + long rpcId = statsWatcher.RpcIdGenerator.Increment(); + try + { + Console.WriteLine($"Starting RPC {rpcId}."); + var response = await client.UnaryCallAsync(new SimpleRequest(), + new CallOptions(cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); + + statsWatcher.OnRpcComplete(rpcId, response.Hostname); + if (options.PrintResponse) + { + Console.WriteLine($"Got response {response}"); + } + Console.WriteLine($"RPC {rpcId} succeeded "); + } + catch (RpcException ex) + { + statsWatcher.OnRpcComplete(rpcId, null); + Console.WriteLine($"RPC {rpcId} failed: {ex}"); + } + } + + private async Task CleanupCompletedTasksAsync(List tasks) + { + var toRemove = new List(); + foreach (var task in tasks) + { + if (task.IsCompleted) + { + // awaiting tasks that have already completed should be instantaneous + await task; + } + toRemove.Add(task); + } + foreach (var task in toRemove) + { + tasks.Remove(task); + } + } + } + + internal class StatsWatcher + { + private readonly object myLock = new object(); + private readonly AtomicCounter rpcIdGenerator = new AtomicCounter(0); + + private long? firstAcceptedRpcId; + private int numRpcsWanted; + private int rpcsCompleted; + private int rpcsNoHostname; + private Dictionary rpcsByHostname; + + public AtomicCounter RpcIdGenerator => rpcIdGenerator; + + public StatsWatcher() + { + Reset(); + } + + public void OnRpcComplete(long rpcId, string responseHostname) + { + lock (myLock) + { + if (!firstAcceptedRpcId.HasValue || rpcId < firstAcceptedRpcId || rpcId >= firstAcceptedRpcId + numRpcsWanted) + { + return; + } + + if (string.IsNullOrEmpty(responseHostname)) + { + rpcsNoHostname ++; + } + else + { + if (!rpcsByHostname.ContainsKey(responseHostname)) + { + rpcsByHostname[responseHostname] = 0; + } + rpcsByHostname[responseHostname] += 1; + } + rpcsCompleted += 1; + + if (rpcsCompleted >= numRpcsWanted) + { + Monitor.Pulse(myLock); + } + } + } + + public void Reset() + { + lock (myLock) + { + firstAcceptedRpcId = null; + numRpcsWanted = 0; + rpcsCompleted = 0; + rpcsNoHostname = 0; + rpcsByHostname = new Dictionary(); + } + } + + public LoadBalancerStatsResponse WaitForRpcStatsResponse(int rpcsWanted, int timeoutSec) + { + lock (myLock) + { + if (firstAcceptedRpcId.HasValue) + { + throw new InvalidOperationException("StateWatcher is already collecting stats."); + } + // we are only interested in the next numRpcsWanted RPCs + firstAcceptedRpcId = rpcIdGenerator.Count + 1; + numRpcsWanted = rpcsWanted; + + var deadline = DateTime.UtcNow.AddSeconds(timeoutSec); + while (true) + { + var timeoutMillis = Math.Max((int)(deadline - DateTime.UtcNow).TotalMilliseconds, 0); + if (!Monitor.Wait(myLock, timeoutMillis) || rpcsCompleted >= rpcsWanted) + { + // we collected enough RPCs, or timed out waiting + var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname }; + response.RpcsByPeer.Add(rpcsByHostname); + Reset(); + return response; + } + } + } + } + } + + /// + /// Implementation of LoadBalancerStatsService server + /// + internal class LoadBalancerStatsServiceImpl : LoadBalancerStatsService.LoadBalancerStatsServiceBase + { + StatsWatcher statsWatcher; + + public LoadBalancerStatsServiceImpl(StatsWatcher statsWatcher) + { + this.statsWatcher = statsWatcher; + } + + public override async Task GetClientStats(LoadBalancerStatsRequest request, ServerCallContext context) + { + // run as a task to avoid blocking + var response = await Task.Run(() => statsWatcher.WaitForRpcStatsResponse(request.NumRpcs, request.TimeoutSec)); + Console.WriteLine($"Returning stats {response} (num of requested RPCs: {request.NumRpcs})"); + return response; + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs b/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs new file mode 100644 index 00000000000..be4e696d2ae --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs @@ -0,0 +1,134 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Utils; +using Grpc.Testing; +using NUnit.Framework; + +namespace Grpc.IntegrationTesting +{ + public class XdsInteropClientTest + { + const string Host = "localhost"; + + BackendServiceImpl backendService; + + Server backendServer; + Server lbStatsServer; + Channel lbStatsChannel; + LoadBalancerStatsService.LoadBalancerStatsServiceClient lbStatsClient; + + XdsInteropClient xdsInteropClient; + + [OneTimeSetUp] + public void Init() + { + backendService = new BackendServiceImpl(); + + // Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755 + backendServer = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) }) + { + Services = { TestService.BindService(backendService) }, + Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } + }; + backendServer.Start(); + + xdsInteropClient = new XdsInteropClient(new XdsInteropClient.ClientOptions + { + NumChannels = 1, + Qps = 1, + RpcTimeoutSec = 10, + Server = $"{Host}:{backendServer.Ports.Single().BoundPort}", + }); + + // Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755 + lbStatsServer = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) }) + { + Services = { LoadBalancerStatsService.BindService(new LoadBalancerStatsServiceImpl(xdsInteropClient.StatsWatcher)) }, + Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } + }; + lbStatsServer.Start(); + + int port = lbStatsServer.Ports.Single().BoundPort; + lbStatsChannel = new Channel(Host, port, ChannelCredentials.Insecure); + lbStatsClient = new LoadBalancerStatsService.LoadBalancerStatsServiceClient(lbStatsChannel); + } + + [OneTimeTearDown] + public void Cleanup() + { + lbStatsChannel.ShutdownAsync().Wait(); + lbStatsServer.ShutdownAsync().Wait(); + backendServer.ShutdownAsync().Wait(); + } + + [Test] + public async Task SmokeTest() + { + string backendName = "backend1"; + backendService.UnaryHandler = (request, context) => + { + return Task.FromResult(new SimpleResponse { Hostname = backendName}); + }; + + var cancellationTokenSource = new CancellationTokenSource(); + var runChannelsTask = xdsInteropClient.RunChannelsAsync(cancellationTokenSource.Token); + + var stats = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest + { + NumRpcs = 5, + TimeoutSec = 10, + }, deadline: DateTime.UtcNow.AddSeconds(30)); + + Assert.AreEqual(0, stats.NumFailures); + Assert.AreEqual(backendName, stats.RpcsByPeer.Keys.Single()); + Assert.AreEqual(5, stats.RpcsByPeer[backendName]); + + await Task.Delay(100); + + var stats2 = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest + { + NumRpcs = 3, + TimeoutSec = 10, + }, deadline: DateTime.UtcNow.AddSeconds(30)); + + Assert.AreEqual(0, stats2.NumFailures); + Assert.AreEqual(backendName, stats2.RpcsByPeer.Keys.Single()); + Assert.AreEqual(3, stats2.RpcsByPeer[backendName]); + + cancellationTokenSource.Cancel(); + await runChannelsTask; + } + + public class BackendServiceImpl : TestService.TestServiceBase + { + public UnaryServerMethod UnaryHandler { get; set; } + + public override Task UnaryCall(SimpleRequest request, ServerCallContext context) + { + return UnaryHandler(request, context); + } + } + } +} diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln index 19f3605d8f6..01d1b94d2a0 100644 --- a/src/csharp/Grpc.sln +++ b/src/csharp/Grpc.sln @@ -45,6 +45,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Tools", "Grpc.Tools\Gr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Tools.Tests", "Grpc.Tools.Tests\Grpc.Tools.Tests.csproj", "{AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.IntegrationTesting.XdsClient", "Grpc.IntegrationTesting.XdsClient\Grpc.IntegrationTesting.XdsClient.csproj", "{7306313A-4853-4CFF-B913-0FCB1A497449}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -135,6 +137,10 @@ Global {AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}.Debug|Any CPU.Build.0 = Debug|Any CPU {AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}.Release|Any CPU.ActiveCfg = Release|Any CPU {AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}.Release|Any CPU.Build.0 = Release|Any CPU + {7306313A-4853-4CFF-B913-0FCB1A497449}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7306313A-4853-4CFF-B913-0FCB1A497449}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7306313A-4853-4CFF-B913-0FCB1A497449}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7306313A-4853-4CFF-B913-0FCB1A497449}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 50eea00c040..6f0cb6316aa 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -70,7 +70,8 @@ "Grpc.IntegrationTesting.MetadataCredentialsTest", "Grpc.IntegrationTesting.RunnerClientServerTest", "Grpc.IntegrationTesting.SslCredentialsTest", - "Grpc.IntegrationTesting.UnobservedTaskExceptionTest" + "Grpc.IntegrationTesting.UnobservedTaskExceptionTest", + "Grpc.IntegrationTesting.XdsInteropClientTest" ], "Grpc.Reflection.Tests": [ "Grpc.Reflection.Tests.ReflectionClientServerTest",