diff --git a/src/csharp/Grpc.IntegrationTesting/Messages.cs b/src/csharp/Grpc.IntegrationTesting/Messages.cs index 6911a44b786..a1f353453bc 100644 --- a/src/csharp/Grpc.IntegrationTesting/Messages.cs +++ b/src/csharp/Grpc.IntegrationTesting/Messages.cs @@ -58,14 +58,22 @@ namespace Grpc.Testing { "eF9yZWNvbm5lY3RfYmFja29mZl9tcxgBIAEoBSIzCg1SZWNvbm5lY3RJbmZv", "Eg4KBnBhc3NlZBgBIAEoCBISCgpiYWNrb2ZmX21zGAIgAygFIkEKGExvYWRC", "YWxhbmNlclN0YXRzUmVxdWVzdBIQCghudW1fcnBjcxgBIAEoBRITCgt0aW1l", - "b3V0X3NlYxgCIAEoBSKzAQoZTG9hZEJhbGFuY2VyU3RhdHNSZXNwb25zZRJN", + "b3V0X3NlYxgCIAEoBSKLBAoZTG9hZEJhbGFuY2VyU3RhdHNSZXNwb25zZRJN", "CgxycGNzX2J5X3BlZXIYASADKAsyNy5ncnBjLnRlc3RpbmcuTG9hZEJhbGFu", "Y2VyU3RhdHNSZXNwb25zZS5ScGNzQnlQZWVyRW50cnkSFAoMbnVtX2ZhaWx1", - "cmVzGAIgASgFGjEKD1JwY3NCeVBlZXJFbnRyeRILCgNrZXkYASABKAkSDQoF", - "dmFsdWUYAiABKAU6AjgBKh8KC1BheWxvYWRUeXBlEhAKDENPTVBSRVNTQUJM", - "RRAAKm8KD0dycGNsYlJvdXRlVHlwZRIdChlHUlBDTEJfUk9VVEVfVFlQRV9V", - "TktOT1dOEAASHgoaR1JQQ0xCX1JPVVRFX1RZUEVfRkFMTEJBQ0sQARIdChlH", - "UlBDTEJfUk9VVEVfVFlQRV9CQUNLRU5EEAJiBnByb3RvMw==")); + "cmVzGAIgASgFElEKDnJwY3NfYnlfbWV0aG9kGAMgAygLMjkuZ3JwYy50ZXN0", + "aW5nLkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UuUnBjc0J5TWV0aG9kRW50", + "cnkamQEKClJwY3NCeVBlZXISWAoMcnBjc19ieV9wZWVyGAEgAygLMkIuZ3Jw", + "Yy50ZXN0aW5nLkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UuUnBjc0J5UGVl", + "ci5ScGNzQnlQZWVyRW50cnkaMQoPUnBjc0J5UGVlckVudHJ5EgsKA2tleRgB", + "IAEoCRINCgV2YWx1ZRgCIAEoBToCOAEaMQoPUnBjc0J5UGVlckVudHJ5EgsK", + "A2tleRgBIAEoCRINCgV2YWx1ZRgCIAEoBToCOAEaZwoRUnBjc0J5TWV0aG9k", + "RW50cnkSCwoDa2V5GAEgASgJEkEKBXZhbHVlGAIgASgLMjIuZ3JwYy50ZXN0", + "aW5nLkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UuUnBjc0J5UGVlcjoCOAEq", + "HwoLUGF5bG9hZFR5cGUSEAoMQ09NUFJFU1NBQkxFEAAqbwoPR3JwY2xiUm91", + "dGVUeXBlEh0KGUdSUENMQl9ST1VURV9UWVBFX1VOS05PV04QABIeChpHUlBD", + "TEJfUk9VVEVfVFlQRV9GQUxMQkFDSxABEh0KGUdSUENMQl9ST1VURV9UWVBF", + "X0JBQ0tFTkQQAmIGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, new pbr::GeneratedClrTypeInfo(new[] {typeof(global::Grpc.Testing.PayloadType), typeof(global::Grpc.Testing.GrpclbRouteType), }, null, new pbr::GeneratedClrTypeInfo[] { @@ -82,7 +90,8 @@ namespace Grpc.Testing { new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ReconnectParams), global::Grpc.Testing.ReconnectParams.Parser, new[]{ "MaxReconnectBackoffMs" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ReconnectInfo), global::Grpc.Testing.ReconnectInfo.Parser, new[]{ "Passed", "BackoffMs" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsRequest), global::Grpc.Testing.LoadBalancerStatsRequest.Parser, new[]{ "NumRpcs", "TimeoutSec" }, null, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsResponse), global::Grpc.Testing.LoadBalancerStatsResponse.Parser, new[]{ "RpcsByPeer", "NumFailures" }, null, null, null, new pbr::GeneratedClrTypeInfo[] { null, }) + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsResponse), global::Grpc.Testing.LoadBalancerStatsResponse.Parser, new[]{ "RpcsByPeer", "NumFailures", "RpcsByMethod" }, null, null, null, new pbr::GeneratedClrTypeInfo[] { new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer), global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer.Parser, new[]{ "RpcsByPeer_" }, null, null, null, new pbr::GeneratedClrTypeInfo[] { null, }), + null, null, }) })); } #endregion @@ -2706,6 +2715,7 @@ namespace Grpc.Testing { public LoadBalancerStatsResponse(LoadBalancerStatsResponse other) : this() { rpcsByPeer_ = other.rpcsByPeer_.Clone(); numFailures_ = other.numFailures_; + rpcsByMethod_ = other.rpcsByMethod_.Clone(); _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); } @@ -2741,6 +2751,16 @@ namespace Grpc.Testing { } } + /// Field number for the "rpcs_by_method" field. + public const int RpcsByMethodFieldNumber = 3; + private static readonly pbc::MapField.Codec _map_rpcsByMethod_codec + = new pbc::MapField.Codec(pb::FieldCodec.ForString(10, ""), pb::FieldCodec.ForMessage(18, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer.Parser), 26); + private readonly pbc::MapField rpcsByMethod_ = new pbc::MapField(); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public pbc::MapField RpcsByMethod { + get { return rpcsByMethod_; } + } + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] public override bool Equals(object other) { return Equals(other as LoadBalancerStatsResponse); @@ -2756,6 +2776,7 @@ namespace Grpc.Testing { } if (!RpcsByPeer.Equals(other.RpcsByPeer)) return false; if (NumFailures != other.NumFailures) return false; + if (!RpcsByMethod.Equals(other.RpcsByMethod)) return false; return Equals(_unknownFields, other._unknownFields); } @@ -2764,6 +2785,7 @@ namespace Grpc.Testing { int hash = 1; hash ^= RpcsByPeer.GetHashCode(); if (NumFailures != 0) hash ^= NumFailures.GetHashCode(); + hash ^= RpcsByMethod.GetHashCode(); if (_unknownFields != null) { hash ^= _unknownFields.GetHashCode(); } @@ -2782,6 +2804,7 @@ namespace Grpc.Testing { output.WriteRawTag(16); output.WriteInt32(NumFailures); } + rpcsByMethod_.WriteTo(output, _map_rpcsByMethod_codec); if (_unknownFields != null) { _unknownFields.WriteTo(output); } @@ -2794,6 +2817,7 @@ namespace Grpc.Testing { if (NumFailures != 0) { size += 1 + pb::CodedOutputStream.ComputeInt32Size(NumFailures); } + size += rpcsByMethod_.CalculateSize(_map_rpcsByMethod_codec); if (_unknownFields != null) { size += _unknownFields.CalculateSize(); } @@ -2809,6 +2833,7 @@ namespace Grpc.Testing { if (other.NumFailures != 0) { NumFailures = other.NumFailures; } + rpcsByMethod_.Add(other.rpcsByMethod_); _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); } @@ -2828,9 +2853,144 @@ namespace Grpc.Testing { NumFailures = input.ReadInt32(); break; } + case 26: { + rpcsByMethod_.AddEntriesFrom(input, _map_rpcsByMethod_codec); + break; + } + } + } + } + + #region Nested types + /// Container for nested types declared in the LoadBalancerStatsResponse message type. + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static partial class Types { + public sealed partial class RpcsByPeer : pb::IMessage { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new RpcsByPeer()); + private pb::UnknownFieldSet _unknownFields; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public static pbr::MessageDescriptor Descriptor { + get { return global::Grpc.Testing.LoadBalancerStatsResponse.Descriptor.NestedTypes[0]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RpcsByPeer() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RpcsByPeer(RpcsByPeer other) : this() { + rpcsByPeer_ = other.rpcsByPeer_.Clone(); + _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public RpcsByPeer Clone() { + return new RpcsByPeer(this); + } + + /// Field number for the "rpcs_by_peer" field. + public const int RpcsByPeer_FieldNumber = 1; + private static readonly pbc::MapField.Codec _map_rpcsByPeer_codec + = new pbc::MapField.Codec(pb::FieldCodec.ForString(10, ""), pb::FieldCodec.ForInt32(16, 0), 10); + private readonly pbc::MapField rpcsByPeer_ = new pbc::MapField(); + /// + /// The number of completed RPCs for each peer. + /// + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public pbc::MapField RpcsByPeer_ { + get { return rpcsByPeer_; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override bool Equals(object other) { + return Equals(other as RpcsByPeer); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public bool Equals(RpcsByPeer other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!RpcsByPeer_.Equals(other.RpcsByPeer_)) return false; + return Equals(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override int GetHashCode() { + int hash = 1; + hash ^= RpcsByPeer_.GetHashCode(); + if (_unknownFields != null) { + hash ^= _unknownFields.GetHashCode(); + } + return hash; } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void WriteTo(pb::CodedOutputStream output) { + rpcsByPeer_.WriteTo(output, _map_rpcsByPeer_codec); + if (_unknownFields != null) { + _unknownFields.WriteTo(output); + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public int CalculateSize() { + int size = 0; + size += rpcsByPeer_.CalculateSize(_map_rpcsByPeer_codec); + if (_unknownFields != null) { + size += _unknownFields.CalculateSize(); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(RpcsByPeer other) { + if (other == null) { + return; + } + rpcsByPeer_.Add(other.rpcsByPeer_); + _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); + break; + case 10: { + rpcsByPeer_.AddEntriesFrom(input, _map_rpcsByPeer_codec); + break; + } + } + } + } + } + } + #endregion } diff --git a/src/csharp/Grpc.IntegrationTesting/Test.cs b/src/csharp/Grpc.IntegrationTesting/Test.cs index 992908f5877..8967af6f3d9 100644 --- a/src/csharp/Grpc.IntegrationTesting/Test.cs +++ b/src/csharp/Grpc.IntegrationTesting/Test.cs @@ -50,7 +50,10 @@ namespace Grpc.Testing { "RW1wdHkaGy5ncnBjLnRlc3RpbmcuUmVjb25uZWN0SW5mbzJ/ChhMb2FkQmFs", "YW5jZXJTdGF0c1NlcnZpY2USYwoOR2V0Q2xpZW50U3RhdHMSJi5ncnBjLnRl", "c3RpbmcuTG9hZEJhbGFuY2VyU3RhdHNSZXF1ZXN0GicuZ3JwYy50ZXN0aW5n", - "LkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UiAGIGcHJvdG8z")); + "LkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UiADKLAQoWWGRzVXBkYXRlSGVh", + "bHRoU2VydmljZRI2CgpTZXRTZXJ2aW5nEhMuZ3JwYy50ZXN0aW5nLkVtcHR5", + "GhMuZ3JwYy50ZXN0aW5nLkVtcHR5EjkKDVNldE5vdFNlcnZpbmcSEy5ncnBj", + "LnRlc3RpbmcuRW1wdHkaEy5ncnBjLnRlc3RpbmcuRW1wdHliBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Grpc.Testing.EmptyReflection.Descriptor, global::Grpc.Testing.MessagesReflection.Descriptor, }, new pbr::GeneratedClrTypeInfo(null, null, null)); diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index 5f63fd9a5d8..ffa8388e3fb 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -950,5 +950,132 @@ namespace Grpc.Testing { } } + /// + /// A service to remotely control health status of an xDS test server. + /// + public static partial class XdsUpdateHealthService + { + static readonly string __ServiceName = "grpc.testing.XdsUpdateHealthService"; + + static readonly grpc::Marshaller __Marshaller_grpc_testing_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); + + static readonly grpc::Method __Method_SetServing = new grpc::Method( + grpc::MethodType.Unary, + __ServiceName, + "SetServing", + __Marshaller_grpc_testing_Empty, + __Marshaller_grpc_testing_Empty); + + static readonly grpc::Method __Method_SetNotServing = new grpc::Method( + grpc::MethodType.Unary, + __ServiceName, + "SetNotServing", + __Marshaller_grpc_testing_Empty, + __Marshaller_grpc_testing_Empty); + + /// Service descriptor + public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor + { + get { return global::Grpc.Testing.TestReflection.Descriptor.Services[4]; } + } + + /// Base class for server-side implementations of XdsUpdateHealthService + [grpc::BindServiceMethod(typeof(XdsUpdateHealthService), "BindService")] + public abstract partial class XdsUpdateHealthServiceBase + { + public virtual global::System.Threading.Tasks.Task SetServing(global::Grpc.Testing.Empty request, grpc::ServerCallContext context) + { + throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); + } + + public virtual global::System.Threading.Tasks.Task SetNotServing(global::Grpc.Testing.Empty request, grpc::ServerCallContext context) + { + throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); + } + + } + + /// Client for XdsUpdateHealthService + public partial class XdsUpdateHealthServiceClient : grpc::ClientBase + { + /// Creates a new client for XdsUpdateHealthService + /// The channel to use to make remote calls. + public XdsUpdateHealthServiceClient(grpc::ChannelBase channel) : base(channel) + { + } + /// Creates a new client for XdsUpdateHealthService that uses a custom CallInvoker. + /// The callInvoker to use to make remote calls. + public XdsUpdateHealthServiceClient(grpc::CallInvoker callInvoker) : base(callInvoker) + { + } + /// Protected parameterless constructor to allow creation of test doubles. + protected XdsUpdateHealthServiceClient() : base() + { + } + /// Protected constructor to allow creation of configured clients. + /// The client configuration. + protected XdsUpdateHealthServiceClient(ClientBaseConfiguration configuration) : base(configuration) + { + } + + public virtual global::Grpc.Testing.Empty SetServing(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SetServing(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + public virtual global::Grpc.Testing.Empty SetServing(global::Grpc.Testing.Empty request, grpc::CallOptions options) + { + return CallInvoker.BlockingUnaryCall(__Method_SetServing, null, options, request); + } + public virtual grpc::AsyncUnaryCall SetServingAsync(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SetServingAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + public virtual grpc::AsyncUnaryCall SetServingAsync(global::Grpc.Testing.Empty request, grpc::CallOptions options) + { + return CallInvoker.AsyncUnaryCall(__Method_SetServing, null, options, request); + } + public virtual global::Grpc.Testing.Empty SetNotServing(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SetNotServing(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + public virtual global::Grpc.Testing.Empty SetNotServing(global::Grpc.Testing.Empty request, grpc::CallOptions options) + { + return CallInvoker.BlockingUnaryCall(__Method_SetNotServing, null, options, request); + } + public virtual grpc::AsyncUnaryCall SetNotServingAsync(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return SetNotServingAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + public virtual grpc::AsyncUnaryCall SetNotServingAsync(global::Grpc.Testing.Empty request, grpc::CallOptions options) + { + return CallInvoker.AsyncUnaryCall(__Method_SetNotServing, null, options, request); + } + /// Creates a new instance of client from given ClientBaseConfiguration. + protected override XdsUpdateHealthServiceClient NewInstance(ClientBaseConfiguration configuration) + { + return new XdsUpdateHealthServiceClient(configuration); + } + } + + /// Creates service definition that can be registered with a server + /// An object implementing the server-side handling logic. + public static grpc::ServerServiceDefinition BindService(XdsUpdateHealthServiceBase serviceImpl) + { + return grpc::ServerServiceDefinition.CreateBuilder() + .AddMethod(__Method_SetServing, serviceImpl.SetServing) + .AddMethod(__Method_SetNotServing, serviceImpl.SetNotServing).Build(); + } + + /// Register service method with a service binder with or without implementation. Useful when customizing the service binding logic. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// Service methods will be bound by calling AddMethod on this object. + /// An object implementing the server-side handling logic. + public static void BindService(grpc::ServiceBinderBase serviceBinder, XdsUpdateHealthServiceBase serviceImpl) + { + serviceBinder.AddMethod(__Method_SetServing, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.SetServing)); + serviceBinder.AddMethod(__Method_SetNotServing, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.SetNotServing)); + } + + } } #endregion diff --git a/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs b/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs index b9d7799fba6..0ad6c4bc920 100644 --- a/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs @@ -39,7 +39,7 @@ namespace Grpc.IntegrationTesting [Option("qps", Default = 1)] - // The desired QPS per channel. + // The desired QPS per channel, for each type of RPC. public int Qps { get; set; } [Option("server", Default = "localhost:8080")] @@ -53,18 +53,37 @@ namespace Grpc.IntegrationTesting [Option("print_response", Default = false)] public bool PrintResponse { get; set; } + + // Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall + [Option("rpc", Default = "UnaryCall")] + public string Rpc { get; set; } + + // The metadata to send with each RPC, in the format EmptyCall:key1:value1,UnaryCall:key2:value2 + [Option("metadata", Default = null)] + public string Metadata { get; set; } + } + + internal enum RpcType + { + UnaryCall, + EmptyCall } ClientOptions options; StatsWatcher statsWatcher = new StatsWatcher(); + List rpcs; + Dictionary metadata; + // make watcher accessible by tests internal StatsWatcher StatsWatcher => statsWatcher; internal XdsInteropClient(ClientOptions options) { this.options = options; + this.rpcs = ParseRpcArgument(this.options.Rpc); + this.metadata = ParseMetadataArgument(this.options.Metadata); } public static void Run(string[] args) @@ -124,8 +143,11 @@ namespace Grpc.IntegrationTesting var stopwatch = Stopwatch.StartNew(); while (!cancellationToken.IsCancellationRequested) { - inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken)); - rpcsStarted++; + foreach (var rpcType in rpcs) + { + inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken, rpcType)); + rpcsStarted++; + } // only cleanup calls that have already completed, calls that are still inflight will be cleaned up later. await CleanupCompletedTasksAsync(inflightTasks); @@ -133,7 +155,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine($"Currently {inflightTasks.Count} in-flight RPCs"); // if needed, wait a bit before we start the next RPC. - int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps) - stopwatch.ElapsedMilliseconds); + int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps / rpcs.Count) - stopwatch.ElapsedMilliseconds); if (nextDueInMillis > 0) { await Task.Delay(nextDueInMillis); @@ -146,25 +168,61 @@ namespace Grpc.IntegrationTesting Console.WriteLine($"Channel shutdown {channelId}"); } - private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken) + private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType) { long rpcId = statsWatcher.RpcIdGenerator.Increment(); try { - Console.WriteLine($"Starting RPC {rpcId}."); - var response = await client.UnaryCallAsync(new SimpleRequest(), - new CallOptions(cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); - - statsWatcher.OnRpcComplete(rpcId, response.Hostname); - if (options.PrintResponse) + Console.WriteLine($"Starting RPC {rpcId} of type {rpcType}"); + + // metadata to send with the RPC + var headers = new Metadata(); + if (metadata.ContainsKey(rpcType)) { - Console.WriteLine($"Got response {response}"); + headers = metadata[rpcType]; + if (headers.Count > 0) + { + var printableHeaders = "[" + string.Join(", ", headers) + "]"; + Console.WriteLine($"Will send metadata {printableHeaders}"); + } } - Console.WriteLine($"RPC {rpcId} succeeded "); + + if (rpcType == RpcType.UnaryCall) + { + + var call = client.UnaryCallAsync(new SimpleRequest(), + new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); + + var response = await call; + var hostname = (await call.ResponseHeadersAsync).GetValue("hostname") ?? response.Hostname; + statsWatcher.OnRpcComplete(rpcId, rpcType, hostname); + if (options.PrintResponse) + { + Console.WriteLine($"Got response {response}"); + } + } + else if (rpcType == RpcType.EmptyCall) + { + var call = client.EmptyCallAsync(new Empty(), + new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); + + var response = await call; + var hostname = (await call.ResponseHeadersAsync).GetValue("hostname"); + statsWatcher.OnRpcComplete(rpcId, rpcType, hostname); + if (options.PrintResponse) + { + Console.WriteLine($"Got response {response}"); + } + } + else + { + throw new InvalidOperationException($"Unsupported RPC type ${rpcType}"); + } + Console.WriteLine($"RPC {rpcId} succeeded"); } catch (RpcException ex) { - statsWatcher.OnRpcComplete(rpcId, null); + statsWatcher.OnRpcComplete(rpcId, rpcType, null); Console.WriteLine($"RPC {rpcId} failed: {ex}"); } } @@ -186,6 +244,66 @@ namespace Grpc.IntegrationTesting tasks.Remove(task); } } + + private static List ParseRpcArgument(string rpcArg) + { + var result = new List(); + foreach (var part in rpcArg.Split(',')) + { + result.Add(ParseRpc(part)); + } + return result; + } + + private static RpcType ParseRpc(string rpc) + { + switch (rpc) + { + case "UnaryCall": + return RpcType.UnaryCall; + case "EmptyCall": + return RpcType.EmptyCall; + default: + throw new ArgumentException($"Unknown RPC: \"{rpc}\""); + } + } + + private static Dictionary ParseMetadataArgument(string metadataArg) + { + var rpcMetadata = new Dictionary(); + if (string.IsNullOrEmpty(metadataArg)) + { + return rpcMetadata; + } + + foreach (var metadata in metadataArg.Split(',')) + { + var parts = metadata.Split(':'); + if (parts.Length != 3) + { + throw new ArgumentException($"Invalid metadata: \"{metadata}\""); + } + var rpc = ParseRpc(parts[0]); + var key = parts[1]; + var value = parts[2]; + + var md = new Metadata { {key, value} }; + + if (rpcMetadata.ContainsKey(rpc)) + { + var existingMetadata = rpcMetadata[rpc]; + foreach (var entry in md) + { + existingMetadata.Add(entry); + } + } + else + { + rpcMetadata.Add(rpc, md); + } + } + return rpcMetadata; + } } internal class StatsWatcher @@ -198,6 +316,7 @@ namespace Grpc.IntegrationTesting private int rpcsCompleted; private int rpcsNoHostname; private Dictionary rpcsByHostname; + private Dictionary> rpcsByMethod; public AtomicCounter RpcIdGenerator => rpcIdGenerator; @@ -206,7 +325,7 @@ namespace Grpc.IntegrationTesting Reset(); } - public void OnRpcComplete(long rpcId, string responseHostname) + public void OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname) { lock (myLock) { @@ -221,11 +340,24 @@ namespace Grpc.IntegrationTesting } else { + // update rpcsByHostname if (!rpcsByHostname.ContainsKey(responseHostname)) { rpcsByHostname[responseHostname] = 0; } rpcsByHostname[responseHostname] += 1; + + // update rpcsByMethod + var method = rpcType.ToString(); + if (!rpcsByMethod.ContainsKey(method)) + { + rpcsByMethod[method] = new Dictionary(); + } + if (!rpcsByMethod[method].ContainsKey(responseHostname)) + { + rpcsByMethod[method][responseHostname] = 0; + } + rpcsByMethod[method][responseHostname] += 1; } rpcsCompleted += 1; @@ -245,6 +377,7 @@ namespace Grpc.IntegrationTesting rpcsCompleted = 0; rpcsNoHostname = 0; rpcsByHostname = new Dictionary(); + rpcsByMethod = new Dictionary>(); } } @@ -269,6 +402,14 @@ namespace Grpc.IntegrationTesting // we collected enough RPCs, or timed out waiting var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname }; response.RpcsByPeer.Add(rpcsByHostname); + + response.RpcsByMethod.Clear(); + foreach (var methodEntry in rpcsByMethod) + { + var rpcsByPeer = new LoadBalancerStatsResponse.Types.RpcsByPeer(); + rpcsByPeer.RpcsByPeer_.Add(methodEntry.Value); + response.RpcsByMethod[methodEntry.Key] = rpcsByPeer; + } Reset(); return response; } diff --git a/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs b/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs index be4e696d2ae..4025d806404 100644 --- a/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs @@ -59,6 +59,7 @@ namespace Grpc.IntegrationTesting NumChannels = 1, Qps = 1, RpcTimeoutSec = 10, + Rpc = "UnaryCall", Server = $"{Host}:{backendServer.Ports.Single().BoundPort}", }); @@ -89,7 +90,7 @@ namespace Grpc.IntegrationTesting string backendName = "backend1"; backendService.UnaryHandler = (request, context) => { - return Task.FromResult(new SimpleResponse { Hostname = backendName}); + return Task.FromResult(new SimpleResponse { Hostname = backendName }); }; var cancellationTokenSource = new CancellationTokenSource(); @@ -104,6 +105,9 @@ namespace Grpc.IntegrationTesting Assert.AreEqual(0, stats.NumFailures); Assert.AreEqual(backendName, stats.RpcsByPeer.Keys.Single()); Assert.AreEqual(5, stats.RpcsByPeer[backendName]); + Assert.AreEqual("UnaryCall", stats.RpcsByMethod.Keys.Single()); + Assert.AreEqual(backendName, stats.RpcsByMethod["UnaryCall"].RpcsByPeer_.Keys.Single()); + Assert.AreEqual(5, stats.RpcsByMethod["UnaryCall"].RpcsByPeer_[backendName]); await Task.Delay(100); @@ -116,6 +120,36 @@ namespace Grpc.IntegrationTesting Assert.AreEqual(0, stats2.NumFailures); Assert.AreEqual(backendName, stats2.RpcsByPeer.Keys.Single()); Assert.AreEqual(3, stats2.RpcsByPeer[backendName]); + Assert.AreEqual("UnaryCall", stats2.RpcsByMethod.Keys.Single()); + Assert.AreEqual(backendName, stats2.RpcsByMethod["UnaryCall"].RpcsByPeer_.Keys.Single()); + Assert.AreEqual(3, stats2.RpcsByMethod["UnaryCall"].RpcsByPeer_[backendName]); + + cancellationTokenSource.Cancel(); + await runChannelsTask; + } + + [Test] + public async Task HostnameReadFromResponseHeaders() + { + string correctBackendName = "backend1"; + backendService.UnaryHandler = async (request, context) => + { + await context.WriteResponseHeadersAsync(new Metadata { {"hostname", correctBackendName} }); + return new SimpleResponse { Hostname = "wrong_hostname" }; + }; + + var cancellationTokenSource = new CancellationTokenSource(); + var runChannelsTask = xdsInteropClient.RunChannelsAsync(cancellationTokenSource.Token); + + var stats = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest + { + NumRpcs = 3, + TimeoutSec = 10, + }, deadline: DateTime.UtcNow.AddSeconds(30)); + + Assert.AreEqual(0, stats.NumFailures); + Assert.AreEqual(correctBackendName, stats.RpcsByPeer.Keys.Single()); + Assert.AreEqual(3, stats.RpcsByPeer[correctBackendName]); cancellationTokenSource.Cancel(); await runChannelsTask; @@ -124,11 +158,17 @@ namespace Grpc.IntegrationTesting public class BackendServiceImpl : TestService.TestServiceBase { public UnaryServerMethod UnaryHandler { get; set; } + public UnaryServerMethod EmptyHandler { get; set; } public override Task UnaryCall(SimpleRequest request, ServerCallContext context) { return UnaryHandler(request, context); } + + public override Task EmptyCall(Empty request, ServerCallContext context) + { + return EmptyHandler(request, context); + } } } } diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh index 85ddb2c8fe9..f9c9a8ad0f5 100755 --- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh @@ -48,12 +48,17 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py python tools/run_tests/run_tests.py -l csharp -c opt --build_only +# Test cases "path_matching" and "header_matching" are not included in "all", +# because not all interop clients in all languages support these new tests. +# +# TODO(jtattermusch): remove "path_matching" and "header_matching" from +# --test_case after they are added into "all". GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ - --test_case=all \ + --test_case="all,path_matching,header_matching" \ --project_id=grpc-testing \ - --source_image=projects/grpc-testing/global/images/xds-test-server \ + --source_image=projects/grpc-testing/global/images/xds-test-server-2 \ --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --gcp_suffix=$(date '+%s') \ --verbose \ - --client_cmd='dotnet exec src/csharp/Grpc.IntegrationTesting.XdsClient/bin/Release/netcoreapp2.1/Grpc.IntegrationTesting.XdsClient.dll -- --server=xds:///{server_uri} --stats_port={stats_port} --qps={qps}' + --client_cmd='dotnet exec src/csharp/Grpc.IntegrationTesting.XdsClient/bin/Release/netcoreapp2.1/Grpc.IntegrationTesting.XdsClient.dll -- --server=xds:///{server_uri} --stats_port={stats_port} --qps={qps} {rpcs_to_send} {metadata_to_send}' diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 95241b55a82..a5d6fa5c454 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -1715,7 +1715,12 @@ try: metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format( key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE) else: - metadata_to_send = '--metadata=""' + # Setting the arg explicitly to empty with '--metadata=""' + # makes C# client fail + # (see https://github.com/commandlineparser/commandline/issues/412), + # so instead we just rely on clients using the default when + # metadata arg is not specified. + metadata_to_send = '' if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE: # TODO(ericgribkoff) Unconditional wait is recommended by TD