initial implementation of XdsInteropClient

pull/22704/head
Jan Tattermusch 5 years ago
parent 5a5105b89c
commit dbdccc4f4d
  1. 3
      src/csharp/Grpc.IntegrationTesting.XdsClient/.gitignore
  2. 24
      src/csharp/Grpc.IntegrationTesting.XdsClient/Grpc.IntegrationTesting.XdsClient.csproj
  3. 31
      src/csharp/Grpc.IntegrationTesting.XdsClient/Program.cs
  4. 29
      src/csharp/Grpc.IntegrationTesting.XdsClient/Properties/AssemblyInfo.cs
  5. 287
      src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs
  6. 134
      src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs
  7. 6
      src/csharp/Grpc.sln
  8. 3
      src/csharp/tests.json

@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
<TargetFrameworks>net45;netcoreapp2.1</TargetFrameworks>
<OutputType>Exe</OutputType>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="../Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core.Api\Version.cs" />
</ItemGroup>
</Project>

@ -0,0 +1,31 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using Grpc.IntegrationTesting;
namespace Grpc.IntegrationTesting.XdsClient
{
class Program
{
public static void Main(string[] args)
{
XdsInteropClient.Run(args);
}
}
}

@ -0,0 +1,29 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System.Reflection;
using System.Runtime.CompilerServices;
[assembly: AssemblyTitle("Grpc.IntegrationTesting.XdsClient")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("Google Inc. All rights reserved.")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

@ -0,0 +1,287 @@
#region Copyright notice and license
// Copyright 2020 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CommandLine;
using Grpc.Core;
using Grpc.Core.Logging;
using Grpc.Core.Internal;
using Grpc.Testing;
namespace Grpc.IntegrationTesting
{
public class XdsInteropClient
{
internal class ClientOptions
{
[Option("num_channels", Default = 1)]
public int NumChannels { get; set; }
[Option("qps", Default = 1)]
public int Qps { get; set; }
[Option("server", Default = "localhost:8080")]
public string Server { get; set; }
[Option("stats_port", Default = 8081)]
public int StatsPort { get; set; }
[Option("rpc_timeout_sec", Default = 30)]
public int RpcTimeoutSec { get; set; }
[Option("print_response", Default = false)]
public bool PrintResponse { get; set; }
}
ClientOptions options;
StatsWatcher statsWatcher = new StatsWatcher();
// make watcher accessible by tests
internal StatsWatcher StatsWatcher => statsWatcher;
internal XdsInteropClient(ClientOptions options)
{
this.options = options;
}
public static void Run(string[] args)
{
GrpcEnvironment.SetLogger(new ConsoleLogger());
var parserResult = Parser.Default.ParseArguments<ClientOptions>(args)
.WithNotParsed(errors => Environment.Exit(1))
.WithParsed(options =>
{
var xdsInteropClient = new XdsInteropClient(options);
xdsInteropClient.RunAsync().Wait();
});
}
private async Task RunAsync()
{
var server = new Server
{
Services = { LoadBalancerStatsService.BindService(new LoadBalancerStatsServiceImpl(statsWatcher)) }
};
string host = "0.0.0.0";
server.Ports.Add(host, options.StatsPort, ServerCredentials.Insecure);
Console.WriteLine($"Running server on {host}:{options.StatsPort}");
server.Start();
var cancellationTokenSource = new CancellationTokenSource();
await RunChannelsAsync(cancellationTokenSource.Token);
await server.ShutdownAsync();
}
// method made internal to make it runnable by tests
internal async Task RunChannelsAsync(CancellationToken cancellationToken)
{
var channelTasks = new List<Task>();
for (int channelId = 0; channelId < options.NumChannels; channelId++)
{
var channelTask = RunSingleChannelAsync(channelId, cancellationToken);
channelTasks.Add(channelTask);
}
for (int channelId = 0; channelId < options.NumChannels; channelId++)
{
await channelTasks[channelId];
}
}
private async Task RunSingleChannelAsync(int channelId, CancellationToken cancellationToken)
{
Console.WriteLine($"Starting channel {channelId}");
var channel = new Channel(options.Server, ChannelCredentials.Insecure);
var client = new TestService.TestServiceClient(channel);
var inflightTasks = new List<Task>();
int millisPerQuery = (int)(1000.0 / options.Qps); // qps value is per-channel
while (!cancellationToken.IsCancellationRequested)
{
inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken));
await CleanupCompletedTasksAsync(inflightTasks);
Console.WriteLine($"Currently {inflightTasks.Count} in-flight RPCs");
await Task.Delay(millisPerQuery); // not accurate, but good enough for low QPS.
}
Console.WriteLine($"Shutting down channel {channelId}");
await channel.ShutdownAsync();
Console.WriteLine($"Channel shutdown {channelId}");
}
private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken)
{
long rpcId = statsWatcher.RpcIdGenerator.Increment();
try
{
Console.WriteLine($"Starting RPC {rpcId}.");
var response = await client.UnaryCallAsync(new SimpleRequest(),
new CallOptions(cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec)));
statsWatcher.OnRpcComplete(rpcId, response.Hostname);
if (options.PrintResponse)
{
Console.WriteLine($"Got response {response}");
}
Console.WriteLine($"RPC {rpcId} succeeded ");
}
catch (RpcException ex)
{
statsWatcher.OnRpcComplete(rpcId, null);
Console.WriteLine($"RPC {rpcId} failed: {ex}");
}
}
private async Task CleanupCompletedTasksAsync(List<Task> tasks)
{
var toRemove = new List<Task>();
foreach (var task in tasks)
{
if (task.IsCompleted)
{
// awaiting tasks that have already completed should be instantaneous
await task;
}
toRemove.Add(task);
}
foreach (var task in toRemove)
{
tasks.Remove(task);
}
}
}
internal class StatsWatcher
{
private readonly object myLock = new object();
private readonly AtomicCounter rpcIdGenerator = new AtomicCounter(0);
private long? firstAcceptedRpcId;
private int numRpcsWanted;
private int rpcsCompleted;
private int rpcsNoHostname;
private Dictionary<string, int> rpcsByHostname;
public AtomicCounter RpcIdGenerator => rpcIdGenerator;
public StatsWatcher()
{
Reset();
}
public void OnRpcComplete(long rpcId, string responseHostname)
{
lock (myLock)
{
if (!firstAcceptedRpcId.HasValue || rpcId < firstAcceptedRpcId || rpcId >= firstAcceptedRpcId + numRpcsWanted)
{
return;
}
if (string.IsNullOrEmpty(responseHostname))
{
rpcsNoHostname ++;
}
else
{
if (!rpcsByHostname.ContainsKey(responseHostname))
{
rpcsByHostname[responseHostname] = 0;
}
rpcsByHostname[responseHostname] += 1;
}
rpcsCompleted += 1;
if (rpcsCompleted >= numRpcsWanted)
{
Monitor.Pulse(myLock);
}
}
}
public void Reset()
{
lock (myLock)
{
firstAcceptedRpcId = null;
numRpcsWanted = 0;
rpcsCompleted = 0;
rpcsNoHostname = 0;
rpcsByHostname = new Dictionary<string, int>();
}
}
public LoadBalancerStatsResponse WaitForRpcStatsResponse(int rpcsWanted, int timeoutSec)
{
lock (myLock)
{
if (firstAcceptedRpcId.HasValue)
{
throw new InvalidOperationException("StateWatcher is already collecting stats.");
}
// we are only interested in the next numRpcsWanted RPCs
firstAcceptedRpcId = rpcIdGenerator.Count + 1;
numRpcsWanted = rpcsWanted;
var deadline = DateTime.UtcNow.AddSeconds(timeoutSec);
while (true)
{
var timeoutMillis = Math.Max((int)(deadline - DateTime.UtcNow).TotalMilliseconds, 0);
if (!Monitor.Wait(myLock, timeoutMillis) || rpcsCompleted >= rpcsWanted)
{
// we collected enough RPCs, or timed out waiting
var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname };
response.RpcsByPeer.Add(rpcsByHostname);
Reset();
return response;
}
}
}
}
}
/// <summary>
/// Implementation of LoadBalancerStatsService server
/// </summary>
internal class LoadBalancerStatsServiceImpl : LoadBalancerStatsService.LoadBalancerStatsServiceBase
{
StatsWatcher statsWatcher;
public LoadBalancerStatsServiceImpl(StatsWatcher statsWatcher)
{
this.statsWatcher = statsWatcher;
}
public override async Task<LoadBalancerStatsResponse> GetClientStats(LoadBalancerStatsRequest request, ServerCallContext context)
{
// run as a task to avoid blocking
var response = await Task.Run(() => statsWatcher.WaitForRpcStatsResponse(request.NumRpcs, request.TimeoutSec));
Console.WriteLine($"Returning stats {response} (num of requested RPCs: {request.NumRpcs})");
return response;
}
}
}

@ -0,0 +1,134 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Utils;
using Grpc.Testing;
using NUnit.Framework;
namespace Grpc.IntegrationTesting
{
public class XdsInteropClientTest
{
const string Host = "localhost";
BackendServiceImpl backendService;
Server backendServer;
Server lbStatsServer;
Channel lbStatsChannel;
LoadBalancerStatsService.LoadBalancerStatsServiceClient lbStatsClient;
XdsInteropClient xdsInteropClient;
[OneTimeSetUp]
public void Init()
{
backendService = new BackendServiceImpl();
// Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755
backendServer = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) })
{
Services = { TestService.BindService(backendService) },
Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
};
backendServer.Start();
xdsInteropClient = new XdsInteropClient(new XdsInteropClient.ClientOptions
{
NumChannels = 1,
Qps = 1,
RpcTimeoutSec = 10,
Server = $"{Host}:{backendServer.Ports.Single().BoundPort}",
});
// Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755
lbStatsServer = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) })
{
Services = { LoadBalancerStatsService.BindService(new LoadBalancerStatsServiceImpl(xdsInteropClient.StatsWatcher)) },
Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
};
lbStatsServer.Start();
int port = lbStatsServer.Ports.Single().BoundPort;
lbStatsChannel = new Channel(Host, port, ChannelCredentials.Insecure);
lbStatsClient = new LoadBalancerStatsService.LoadBalancerStatsServiceClient(lbStatsChannel);
}
[OneTimeTearDown]
public void Cleanup()
{
lbStatsChannel.ShutdownAsync().Wait();
lbStatsServer.ShutdownAsync().Wait();
backendServer.ShutdownAsync().Wait();
}
[Test]
public async Task SmokeTest()
{
string backendName = "backend1";
backendService.UnaryHandler = (request, context) =>
{
return Task.FromResult(new SimpleResponse { Hostname = backendName});
};
var cancellationTokenSource = new CancellationTokenSource();
var runChannelsTask = xdsInteropClient.RunChannelsAsync(cancellationTokenSource.Token);
var stats = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest
{
NumRpcs = 5,
TimeoutSec = 10,
}, deadline: DateTime.UtcNow.AddSeconds(30));
Assert.AreEqual(0, stats.NumFailures);
Assert.AreEqual(backendName, stats.RpcsByPeer.Keys.Single());
Assert.AreEqual(5, stats.RpcsByPeer[backendName]);
await Task.Delay(100);
var stats2 = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest
{
NumRpcs = 3,
TimeoutSec = 10,
}, deadline: DateTime.UtcNow.AddSeconds(30));
Assert.AreEqual(0, stats2.NumFailures);
Assert.AreEqual(backendName, stats2.RpcsByPeer.Keys.Single());
Assert.AreEqual(3, stats2.RpcsByPeer[backendName]);
cancellationTokenSource.Cancel();
await runChannelsTask;
}
public class BackendServiceImpl : TestService.TestServiceBase
{
public UnaryServerMethod<SimpleRequest, SimpleResponse> UnaryHandler { get; set; }
public override Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
{
return UnaryHandler(request, context);
}
}
}
}

@ -45,6 +45,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Tools", "Grpc.Tools\Gr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Tools.Tests", "Grpc.Tools.Tests\Grpc.Tools.Tests.csproj", "{AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.IntegrationTesting.XdsClient", "Grpc.IntegrationTesting.XdsClient\Grpc.IntegrationTesting.XdsClient.csproj", "{7306313A-4853-4CFF-B913-0FCB1A497449}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -135,6 +137,10 @@ Global
{AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AEBE9BD8-E433-45B7-8B3D-D458EDBBCFC4}.Release|Any CPU.Build.0 = Release|Any CPU
{7306313A-4853-4CFF-B913-0FCB1A497449}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7306313A-4853-4CFF-B913-0FCB1A497449}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7306313A-4853-4CFF-B913-0FCB1A497449}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7306313A-4853-4CFF-B913-0FCB1A497449}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

@ -70,7 +70,8 @@
"Grpc.IntegrationTesting.MetadataCredentialsTest",
"Grpc.IntegrationTesting.RunnerClientServerTest",
"Grpc.IntegrationTesting.SslCredentialsTest",
"Grpc.IntegrationTesting.UnobservedTaskExceptionTest"
"Grpc.IntegrationTesting.UnobservedTaskExceptionTest",
"Grpc.IntegrationTesting.XdsInteropClientTest"
],
"Grpc.Reflection.Tests": [
"Grpc.Reflection.Tests.ReflectionClientServerTest",

Loading…
Cancel
Save