From db54a04d1e3d6aef95c055bbdc6628bcd5afcdab Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 5 Aug 2020 14:11:26 +0200 Subject: [PATCH] support xDS traffic splitting and routing in c# interop --- .../XdsInteropClient.cs | 171 ++++++++++++++++-- .../XdsInteropClientTest.cs | 42 ++++- 2 files changed, 197 insertions(+), 16 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs b/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs index b9d7799fba6..0ad6c4bc920 100644 --- a/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs @@ -39,7 +39,7 @@ namespace Grpc.IntegrationTesting [Option("qps", Default = 1)] - // The desired QPS per channel. + // The desired QPS per channel, for each type of RPC. public int Qps { get; set; } [Option("server", Default = "localhost:8080")] @@ -53,18 +53,37 @@ namespace Grpc.IntegrationTesting [Option("print_response", Default = false)] public bool PrintResponse { get; set; } + + // Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall + [Option("rpc", Default = "UnaryCall")] + public string Rpc { get; set; } + + // The metadata to send with each RPC, in the format EmptyCall:key1:value1,UnaryCall:key2:value2 + [Option("metadata", Default = null)] + public string Metadata { get; set; } + } + + internal enum RpcType + { + UnaryCall, + EmptyCall } ClientOptions options; StatsWatcher statsWatcher = new StatsWatcher(); + List rpcs; + Dictionary metadata; + // make watcher accessible by tests internal StatsWatcher StatsWatcher => statsWatcher; internal XdsInteropClient(ClientOptions options) { this.options = options; + this.rpcs = ParseRpcArgument(this.options.Rpc); + this.metadata = ParseMetadataArgument(this.options.Metadata); } public static void Run(string[] args) @@ -124,8 +143,11 @@ namespace Grpc.IntegrationTesting var stopwatch = Stopwatch.StartNew(); while (!cancellationToken.IsCancellationRequested) { - inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken)); - rpcsStarted++; + foreach (var rpcType in rpcs) + { + inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken, rpcType)); + rpcsStarted++; + } // only cleanup calls that have already completed, calls that are still inflight will be cleaned up later. await CleanupCompletedTasksAsync(inflightTasks); @@ -133,7 +155,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine($"Currently {inflightTasks.Count} in-flight RPCs"); // if needed, wait a bit before we start the next RPC. - int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps) - stopwatch.ElapsedMilliseconds); + int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps / rpcs.Count) - stopwatch.ElapsedMilliseconds); if (nextDueInMillis > 0) { await Task.Delay(nextDueInMillis); @@ -146,25 +168,61 @@ namespace Grpc.IntegrationTesting Console.WriteLine($"Channel shutdown {channelId}"); } - private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken) + private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType) { long rpcId = statsWatcher.RpcIdGenerator.Increment(); try { - Console.WriteLine($"Starting RPC {rpcId}."); - var response = await client.UnaryCallAsync(new SimpleRequest(), - new CallOptions(cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); - - statsWatcher.OnRpcComplete(rpcId, response.Hostname); - if (options.PrintResponse) + Console.WriteLine($"Starting RPC {rpcId} of type {rpcType}"); + + // metadata to send with the RPC + var headers = new Metadata(); + if (metadata.ContainsKey(rpcType)) { - Console.WriteLine($"Got response {response}"); + headers = metadata[rpcType]; + if (headers.Count > 0) + { + var printableHeaders = "[" + string.Join(", ", headers) + "]"; + Console.WriteLine($"Will send metadata {printableHeaders}"); + } } - Console.WriteLine($"RPC {rpcId} succeeded "); + + if (rpcType == RpcType.UnaryCall) + { + + var call = client.UnaryCallAsync(new SimpleRequest(), + new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); + + var response = await call; + var hostname = (await call.ResponseHeadersAsync).GetValue("hostname") ?? response.Hostname; + statsWatcher.OnRpcComplete(rpcId, rpcType, hostname); + if (options.PrintResponse) + { + Console.WriteLine($"Got response {response}"); + } + } + else if (rpcType == RpcType.EmptyCall) + { + var call = client.EmptyCallAsync(new Empty(), + new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); + + var response = await call; + var hostname = (await call.ResponseHeadersAsync).GetValue("hostname"); + statsWatcher.OnRpcComplete(rpcId, rpcType, hostname); + if (options.PrintResponse) + { + Console.WriteLine($"Got response {response}"); + } + } + else + { + throw new InvalidOperationException($"Unsupported RPC type ${rpcType}"); + } + Console.WriteLine($"RPC {rpcId} succeeded"); } catch (RpcException ex) { - statsWatcher.OnRpcComplete(rpcId, null); + statsWatcher.OnRpcComplete(rpcId, rpcType, null); Console.WriteLine($"RPC {rpcId} failed: {ex}"); } } @@ -186,6 +244,66 @@ namespace Grpc.IntegrationTesting tasks.Remove(task); } } + + private static List ParseRpcArgument(string rpcArg) + { + var result = new List(); + foreach (var part in rpcArg.Split(',')) + { + result.Add(ParseRpc(part)); + } + return result; + } + + private static RpcType ParseRpc(string rpc) + { + switch (rpc) + { + case "UnaryCall": + return RpcType.UnaryCall; + case "EmptyCall": + return RpcType.EmptyCall; + default: + throw new ArgumentException($"Unknown RPC: \"{rpc}\""); + } + } + + private static Dictionary ParseMetadataArgument(string metadataArg) + { + var rpcMetadata = new Dictionary(); + if (string.IsNullOrEmpty(metadataArg)) + { + return rpcMetadata; + } + + foreach (var metadata in metadataArg.Split(',')) + { + var parts = metadata.Split(':'); + if (parts.Length != 3) + { + throw new ArgumentException($"Invalid metadata: \"{metadata}\""); + } + var rpc = ParseRpc(parts[0]); + var key = parts[1]; + var value = parts[2]; + + var md = new Metadata { {key, value} }; + + if (rpcMetadata.ContainsKey(rpc)) + { + var existingMetadata = rpcMetadata[rpc]; + foreach (var entry in md) + { + existingMetadata.Add(entry); + } + } + else + { + rpcMetadata.Add(rpc, md); + } + } + return rpcMetadata; + } } internal class StatsWatcher @@ -198,6 +316,7 @@ namespace Grpc.IntegrationTesting private int rpcsCompleted; private int rpcsNoHostname; private Dictionary rpcsByHostname; + private Dictionary> rpcsByMethod; public AtomicCounter RpcIdGenerator => rpcIdGenerator; @@ -206,7 +325,7 @@ namespace Grpc.IntegrationTesting Reset(); } - public void OnRpcComplete(long rpcId, string responseHostname) + public void OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname) { lock (myLock) { @@ -221,11 +340,24 @@ namespace Grpc.IntegrationTesting } else { + // update rpcsByHostname if (!rpcsByHostname.ContainsKey(responseHostname)) { rpcsByHostname[responseHostname] = 0; } rpcsByHostname[responseHostname] += 1; + + // update rpcsByMethod + var method = rpcType.ToString(); + if (!rpcsByMethod.ContainsKey(method)) + { + rpcsByMethod[method] = new Dictionary(); + } + if (!rpcsByMethod[method].ContainsKey(responseHostname)) + { + rpcsByMethod[method][responseHostname] = 0; + } + rpcsByMethod[method][responseHostname] += 1; } rpcsCompleted += 1; @@ -245,6 +377,7 @@ namespace Grpc.IntegrationTesting rpcsCompleted = 0; rpcsNoHostname = 0; rpcsByHostname = new Dictionary(); + rpcsByMethod = new Dictionary>(); } } @@ -269,6 +402,14 @@ namespace Grpc.IntegrationTesting // we collected enough RPCs, or timed out waiting var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname }; response.RpcsByPeer.Add(rpcsByHostname); + + response.RpcsByMethod.Clear(); + foreach (var methodEntry in rpcsByMethod) + { + var rpcsByPeer = new LoadBalancerStatsResponse.Types.RpcsByPeer(); + rpcsByPeer.RpcsByPeer_.Add(methodEntry.Value); + response.RpcsByMethod[methodEntry.Key] = rpcsByPeer; + } Reset(); return response; } diff --git a/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs b/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs index be4e696d2ae..4025d806404 100644 --- a/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs @@ -59,6 +59,7 @@ namespace Grpc.IntegrationTesting NumChannels = 1, Qps = 1, RpcTimeoutSec = 10, + Rpc = "UnaryCall", Server = $"{Host}:{backendServer.Ports.Single().BoundPort}", }); @@ -89,7 +90,7 @@ namespace Grpc.IntegrationTesting string backendName = "backend1"; backendService.UnaryHandler = (request, context) => { - return Task.FromResult(new SimpleResponse { Hostname = backendName}); + return Task.FromResult(new SimpleResponse { Hostname = backendName }); }; var cancellationTokenSource = new CancellationTokenSource(); @@ -104,6 +105,9 @@ namespace Grpc.IntegrationTesting Assert.AreEqual(0, stats.NumFailures); Assert.AreEqual(backendName, stats.RpcsByPeer.Keys.Single()); Assert.AreEqual(5, stats.RpcsByPeer[backendName]); + Assert.AreEqual("UnaryCall", stats.RpcsByMethod.Keys.Single()); + Assert.AreEqual(backendName, stats.RpcsByMethod["UnaryCall"].RpcsByPeer_.Keys.Single()); + Assert.AreEqual(5, stats.RpcsByMethod["UnaryCall"].RpcsByPeer_[backendName]); await Task.Delay(100); @@ -116,6 +120,36 @@ namespace Grpc.IntegrationTesting Assert.AreEqual(0, stats2.NumFailures); Assert.AreEqual(backendName, stats2.RpcsByPeer.Keys.Single()); Assert.AreEqual(3, stats2.RpcsByPeer[backendName]); + Assert.AreEqual("UnaryCall", stats2.RpcsByMethod.Keys.Single()); + Assert.AreEqual(backendName, stats2.RpcsByMethod["UnaryCall"].RpcsByPeer_.Keys.Single()); + Assert.AreEqual(3, stats2.RpcsByMethod["UnaryCall"].RpcsByPeer_[backendName]); + + cancellationTokenSource.Cancel(); + await runChannelsTask; + } + + [Test] + public async Task HostnameReadFromResponseHeaders() + { + string correctBackendName = "backend1"; + backendService.UnaryHandler = async (request, context) => + { + await context.WriteResponseHeadersAsync(new Metadata { {"hostname", correctBackendName} }); + return new SimpleResponse { Hostname = "wrong_hostname" }; + }; + + var cancellationTokenSource = new CancellationTokenSource(); + var runChannelsTask = xdsInteropClient.RunChannelsAsync(cancellationTokenSource.Token); + + var stats = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest + { + NumRpcs = 3, + TimeoutSec = 10, + }, deadline: DateTime.UtcNow.AddSeconds(30)); + + Assert.AreEqual(0, stats.NumFailures); + Assert.AreEqual(correctBackendName, stats.RpcsByPeer.Keys.Single()); + Assert.AreEqual(3, stats.RpcsByPeer[correctBackendName]); cancellationTokenSource.Cancel(); await runChannelsTask; @@ -124,11 +158,17 @@ namespace Grpc.IntegrationTesting public class BackendServiceImpl : TestService.TestServiceBase { public UnaryServerMethod UnaryHandler { get; set; } + public UnaryServerMethod EmptyHandler { get; set; } public override Task UnaryCall(SimpleRequest request, ServerCallContext context) { return UnaryHandler(request, context); } + + public override Task EmptyCall(Empty request, ServerCallContext context) + { + return EmptyHandler(request, context); + } } } }