Merge pull request #23817 from jtattermusch/upmerge_v1_31_x

Upmerge changes from v1.31.x branch to master
pull/23832/head
Jan Tattermusch 4 years ago committed by GitHub
commit e4386e5657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 174
      src/csharp/Grpc.IntegrationTesting/Messages.cs
  2. 5
      src/csharp/Grpc.IntegrationTesting/Test.cs
  3. 127
      src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
  4. 171
      src/csharp/Grpc.IntegrationTesting/XdsInteropClient.cs
  5. 42
      src/csharp/Grpc.IntegrationTesting/XdsInteropClientTest.cs
  6. 11
      tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
  7. 7
      tools/run_tests/run_xds_tests.py

@ -58,14 +58,22 @@ namespace Grpc.Testing {
"eF9yZWNvbm5lY3RfYmFja29mZl9tcxgBIAEoBSIzCg1SZWNvbm5lY3RJbmZv",
"Eg4KBnBhc3NlZBgBIAEoCBISCgpiYWNrb2ZmX21zGAIgAygFIkEKGExvYWRC",
"YWxhbmNlclN0YXRzUmVxdWVzdBIQCghudW1fcnBjcxgBIAEoBRITCgt0aW1l",
"b3V0X3NlYxgCIAEoBSKzAQoZTG9hZEJhbGFuY2VyU3RhdHNSZXNwb25zZRJN",
"b3V0X3NlYxgCIAEoBSKLBAoZTG9hZEJhbGFuY2VyU3RhdHNSZXNwb25zZRJN",
"CgxycGNzX2J5X3BlZXIYASADKAsyNy5ncnBjLnRlc3RpbmcuTG9hZEJhbGFu",
"Y2VyU3RhdHNSZXNwb25zZS5ScGNzQnlQZWVyRW50cnkSFAoMbnVtX2ZhaWx1",
"cmVzGAIgASgFGjEKD1JwY3NCeVBlZXJFbnRyeRILCgNrZXkYASABKAkSDQoF",
"dmFsdWUYAiABKAU6AjgBKh8KC1BheWxvYWRUeXBlEhAKDENPTVBSRVNTQUJM",
"RRAAKm8KD0dycGNsYlJvdXRlVHlwZRIdChlHUlBDTEJfUk9VVEVfVFlQRV9V",
"TktOT1dOEAASHgoaR1JQQ0xCX1JPVVRFX1RZUEVfRkFMTEJBQ0sQARIdChlH",
"UlBDTEJfUk9VVEVfVFlQRV9CQUNLRU5EEAJiBnByb3RvMw=="));
"cmVzGAIgASgFElEKDnJwY3NfYnlfbWV0aG9kGAMgAygLMjkuZ3JwYy50ZXN0",
"aW5nLkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UuUnBjc0J5TWV0aG9kRW50",
"cnkamQEKClJwY3NCeVBlZXISWAoMcnBjc19ieV9wZWVyGAEgAygLMkIuZ3Jw",
"Yy50ZXN0aW5nLkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UuUnBjc0J5UGVl",
"ci5ScGNzQnlQZWVyRW50cnkaMQoPUnBjc0J5UGVlckVudHJ5EgsKA2tleRgB",
"IAEoCRINCgV2YWx1ZRgCIAEoBToCOAEaMQoPUnBjc0J5UGVlckVudHJ5EgsK",
"A2tleRgBIAEoCRINCgV2YWx1ZRgCIAEoBToCOAEaZwoRUnBjc0J5TWV0aG9k",
"RW50cnkSCwoDa2V5GAEgASgJEkEKBXZhbHVlGAIgASgLMjIuZ3JwYy50ZXN0",
"aW5nLkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UuUnBjc0J5UGVlcjoCOAEq",
"HwoLUGF5bG9hZFR5cGUSEAoMQ09NUFJFU1NBQkxFEAAqbwoPR3JwY2xiUm91",
"dGVUeXBlEh0KGUdSUENMQl9ST1VURV9UWVBFX1VOS05PV04QABIeChpHUlBD",
"TEJfUk9VVEVfVFlQRV9GQUxMQkFDSxABEh0KGUdSUENMQl9ST1VURV9UWVBF",
"X0JBQ0tFTkQQAmIGcHJvdG8z"));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(new[] {typeof(global::Grpc.Testing.PayloadType), typeof(global::Grpc.Testing.GrpclbRouteType), }, null, new pbr::GeneratedClrTypeInfo[] {
@ -82,7 +90,8 @@ namespace Grpc.Testing {
new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ReconnectParams), global::Grpc.Testing.ReconnectParams.Parser, new[]{ "MaxReconnectBackoffMs" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ReconnectInfo), global::Grpc.Testing.ReconnectInfo.Parser, new[]{ "Passed", "BackoffMs" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsRequest), global::Grpc.Testing.LoadBalancerStatsRequest.Parser, new[]{ "NumRpcs", "TimeoutSec" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsResponse), global::Grpc.Testing.LoadBalancerStatsResponse.Parser, new[]{ "RpcsByPeer", "NumFailures" }, null, null, null, new pbr::GeneratedClrTypeInfo[] { null, })
new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsResponse), global::Grpc.Testing.LoadBalancerStatsResponse.Parser, new[]{ "RpcsByPeer", "NumFailures", "RpcsByMethod" }, null, null, null, new pbr::GeneratedClrTypeInfo[] { new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer), global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer.Parser, new[]{ "RpcsByPeer_" }, null, null, null, new pbr::GeneratedClrTypeInfo[] { null, }),
null, null, })
}));
}
#endregion
@ -2706,6 +2715,7 @@ namespace Grpc.Testing {
public LoadBalancerStatsResponse(LoadBalancerStatsResponse other) : this() {
rpcsByPeer_ = other.rpcsByPeer_.Clone();
numFailures_ = other.numFailures_;
rpcsByMethod_ = other.rpcsByMethod_.Clone();
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
@ -2741,6 +2751,16 @@ namespace Grpc.Testing {
}
}
/// <summary>Field number for the "rpcs_by_method" field.</summary>
public const int RpcsByMethodFieldNumber = 3;
private static readonly pbc::MapField<string, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer>.Codec _map_rpcsByMethod_codec
= new pbc::MapField<string, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer>.Codec(pb::FieldCodec.ForString(10, ""), pb::FieldCodec.ForMessage(18, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer.Parser), 26);
private readonly pbc::MapField<string, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer> rpcsByMethod_ = new pbc::MapField<string, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer>();
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public pbc::MapField<string, global::Grpc.Testing.LoadBalancerStatsResponse.Types.RpcsByPeer> RpcsByMethod {
get { return rpcsByMethod_; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override bool Equals(object other) {
return Equals(other as LoadBalancerStatsResponse);
@ -2756,6 +2776,7 @@ namespace Grpc.Testing {
}
if (!RpcsByPeer.Equals(other.RpcsByPeer)) return false;
if (NumFailures != other.NumFailures) return false;
if (!RpcsByMethod.Equals(other.RpcsByMethod)) return false;
return Equals(_unknownFields, other._unknownFields);
}
@ -2764,6 +2785,7 @@ namespace Grpc.Testing {
int hash = 1;
hash ^= RpcsByPeer.GetHashCode();
if (NumFailures != 0) hash ^= NumFailures.GetHashCode();
hash ^= RpcsByMethod.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
@ -2782,6 +2804,7 @@ namespace Grpc.Testing {
output.WriteRawTag(16);
output.WriteInt32(NumFailures);
}
rpcsByMethod_.WriteTo(output, _map_rpcsByMethod_codec);
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
@ -2794,6 +2817,7 @@ namespace Grpc.Testing {
if (NumFailures != 0) {
size += 1 + pb::CodedOutputStream.ComputeInt32Size(NumFailures);
}
size += rpcsByMethod_.CalculateSize(_map_rpcsByMethod_codec);
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
@ -2809,6 +2833,7 @@ namespace Grpc.Testing {
if (other.NumFailures != 0) {
NumFailures = other.NumFailures;
}
rpcsByMethod_.Add(other.rpcsByMethod_);
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
@ -2828,9 +2853,144 @@ namespace Grpc.Testing {
NumFailures = input.ReadInt32();
break;
}
case 26: {
rpcsByMethod_.AddEntriesFrom(input, _map_rpcsByMethod_codec);
break;
}
}
}
}
#region Nested types
/// <summary>Container for nested types declared in the LoadBalancerStatsResponse message type.</summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static partial class Types {
public sealed partial class RpcsByPeer : pb::IMessage<RpcsByPeer> {
private static readonly pb::MessageParser<RpcsByPeer> _parser = new pb::MessageParser<RpcsByPeer>(() => new RpcsByPeer());
private pb::UnknownFieldSet _unknownFields;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pb::MessageParser<RpcsByPeer> Parser { get { return _parser; } }
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public static pbr::MessageDescriptor Descriptor {
get { return global::Grpc.Testing.LoadBalancerStatsResponse.Descriptor.NestedTypes[0]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
pbr::MessageDescriptor pb::IMessage.Descriptor {
get { return Descriptor; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public RpcsByPeer() {
OnConstruction();
}
partial void OnConstruction();
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public RpcsByPeer(RpcsByPeer other) : this() {
rpcsByPeer_ = other.rpcsByPeer_.Clone();
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public RpcsByPeer Clone() {
return new RpcsByPeer(this);
}
/// <summary>Field number for the "rpcs_by_peer" field.</summary>
public const int RpcsByPeer_FieldNumber = 1;
private static readonly pbc::MapField<string, int>.Codec _map_rpcsByPeer_codec
= new pbc::MapField<string, int>.Codec(pb::FieldCodec.ForString(10, ""), pb::FieldCodec.ForInt32(16, 0), 10);
private readonly pbc::MapField<string, int> rpcsByPeer_ = new pbc::MapField<string, int>();
/// <summary>
/// The number of completed RPCs for each peer.
/// </summary>
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public pbc::MapField<string, int> RpcsByPeer_ {
get { return rpcsByPeer_; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override bool Equals(object other) {
return Equals(other as RpcsByPeer);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public bool Equals(RpcsByPeer other) {
if (ReferenceEquals(other, null)) {
return false;
}
if (ReferenceEquals(other, this)) {
return true;
}
if (!RpcsByPeer_.Equals(other.RpcsByPeer_)) return false;
return Equals(_unknownFields, other._unknownFields);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override int GetHashCode() {
int hash = 1;
hash ^= RpcsByPeer_.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
return hash;
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public override string ToString() {
return pb::JsonFormatter.ToDiagnosticString(this);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void WriteTo(pb::CodedOutputStream output) {
rpcsByPeer_.WriteTo(output, _map_rpcsByPeer_codec);
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public int CalculateSize() {
int size = 0;
size += rpcsByPeer_.CalculateSize(_map_rpcsByPeer_codec);
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
return size;
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void MergeFrom(RpcsByPeer other) {
if (other == null) {
return;
}
rpcsByPeer_.Add(other.rpcsByPeer_);
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
public void MergeFrom(pb::CodedInputStream input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
case 10: {
rpcsByPeer_.AddEntriesFrom(input, _map_rpcsByPeer_codec);
break;
}
}
}
}
}
}
#endregion
}

@ -50,7 +50,10 @@ namespace Grpc.Testing {
"RW1wdHkaGy5ncnBjLnRlc3RpbmcuUmVjb25uZWN0SW5mbzJ/ChhMb2FkQmFs",
"YW5jZXJTdGF0c1NlcnZpY2USYwoOR2V0Q2xpZW50U3RhdHMSJi5ncnBjLnRl",
"c3RpbmcuTG9hZEJhbGFuY2VyU3RhdHNSZXF1ZXN0GicuZ3JwYy50ZXN0aW5n",
"LkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UiAGIGcHJvdG8z"));
"LkxvYWRCYWxhbmNlclN0YXRzUmVzcG9uc2UiADKLAQoWWGRzVXBkYXRlSGVh",
"bHRoU2VydmljZRI2CgpTZXRTZXJ2aW5nEhMuZ3JwYy50ZXN0aW5nLkVtcHR5",
"GhMuZ3JwYy50ZXN0aW5nLkVtcHR5EjkKDVNldE5vdFNlcnZpbmcSEy5ncnBj",
"LnRlc3RpbmcuRW1wdHkaEy5ncnBjLnRlc3RpbmcuRW1wdHliBnByb3RvMw=="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { global::Grpc.Testing.EmptyReflection.Descriptor, global::Grpc.Testing.MessagesReflection.Descriptor, },
new pbr::GeneratedClrTypeInfo(null, null, null));

@ -950,5 +950,132 @@ namespace Grpc.Testing {
}
}
/// <summary>
/// A service to remotely control health status of an xDS test server.
/// </summary>
public static partial class XdsUpdateHealthService
{
static readonly string __ServiceName = "grpc.testing.XdsUpdateHealthService";
static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_grpc_testing_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom);
static readonly grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty> __Method_SetServing = new grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>(
grpc::MethodType.Unary,
__ServiceName,
"SetServing",
__Marshaller_grpc_testing_Empty,
__Marshaller_grpc_testing_Empty);
static readonly grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty> __Method_SetNotServing = new grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>(
grpc::MethodType.Unary,
__ServiceName,
"SetNotServing",
__Marshaller_grpc_testing_Empty,
__Marshaller_grpc_testing_Empty);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::Grpc.Testing.TestReflection.Descriptor.Services[4]; }
}
/// <summary>Base class for server-side implementations of XdsUpdateHealthService</summary>
[grpc::BindServiceMethod(typeof(XdsUpdateHealthService), "BindService")]
public abstract partial class XdsUpdateHealthServiceBase
{
public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> SetServing(global::Grpc.Testing.Empty request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> SetNotServing(global::Grpc.Testing.Empty request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Client for XdsUpdateHealthService</summary>
public partial class XdsUpdateHealthServiceClient : grpc::ClientBase<XdsUpdateHealthServiceClient>
{
/// <summary>Creates a new client for XdsUpdateHealthService</summary>
/// <param name="channel">The channel to use to make remote calls.</param>
public XdsUpdateHealthServiceClient(grpc::ChannelBase channel) : base(channel)
{
}
/// <summary>Creates a new client for XdsUpdateHealthService that uses a custom <c>CallInvoker</c>.</summary>
/// <param name="callInvoker">The callInvoker to use to make remote calls.</param>
public XdsUpdateHealthServiceClient(grpc::CallInvoker callInvoker) : base(callInvoker)
{
}
/// <summary>Protected parameterless constructor to allow creation of test doubles.</summary>
protected XdsUpdateHealthServiceClient() : base()
{
}
/// <summary>Protected constructor to allow creation of configured clients.</summary>
/// <param name="configuration">The client configuration.</param>
protected XdsUpdateHealthServiceClient(ClientBaseConfiguration configuration) : base(configuration)
{
}
public virtual global::Grpc.Testing.Empty SetServing(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return SetServing(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
public virtual global::Grpc.Testing.Empty SetServing(global::Grpc.Testing.Empty request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_SetServing, null, options, request);
}
public virtual grpc::AsyncUnaryCall<global::Grpc.Testing.Empty> SetServingAsync(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return SetServingAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
public virtual grpc::AsyncUnaryCall<global::Grpc.Testing.Empty> SetServingAsync(global::Grpc.Testing.Empty request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_SetServing, null, options, request);
}
public virtual global::Grpc.Testing.Empty SetNotServing(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return SetNotServing(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
public virtual global::Grpc.Testing.Empty SetNotServing(global::Grpc.Testing.Empty request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_SetNotServing, null, options, request);
}
public virtual grpc::AsyncUnaryCall<global::Grpc.Testing.Empty> SetNotServingAsync(global::Grpc.Testing.Empty request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return SetNotServingAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
public virtual grpc::AsyncUnaryCall<global::Grpc.Testing.Empty> SetNotServingAsync(global::Grpc.Testing.Empty request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_SetNotServing, null, options, request);
}
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
protected override XdsUpdateHealthServiceClient NewInstance(ClientBaseConfiguration configuration)
{
return new XdsUpdateHealthServiceClient(configuration);
}
}
/// <summary>Creates service definition that can be registered with a server</summary>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
public static grpc::ServerServiceDefinition BindService(XdsUpdateHealthServiceBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_SetServing, serviceImpl.SetServing)
.AddMethod(__Method_SetNotServing, serviceImpl.SetNotServing).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
public static void BindService(grpc::ServiceBinderBase serviceBinder, XdsUpdateHealthServiceBase serviceImpl)
{
serviceBinder.AddMethod(__Method_SetServing, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>(serviceImpl.SetServing));
serviceBinder.AddMethod(__Method_SetNotServing, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>(serviceImpl.SetNotServing));
}
}
}
#endregion

@ -39,7 +39,7 @@ namespace Grpc.IntegrationTesting
[Option("qps", Default = 1)]
// The desired QPS per channel.
// The desired QPS per channel, for each type of RPC.
public int Qps { get; set; }
[Option("server", Default = "localhost:8080")]
@ -53,18 +53,37 @@ namespace Grpc.IntegrationTesting
[Option("print_response", Default = false)]
public bool PrintResponse { get; set; }
// Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall
[Option("rpc", Default = "UnaryCall")]
public string Rpc { get; set; }
// The metadata to send with each RPC, in the format EmptyCall:key1:value1,UnaryCall:key2:value2
[Option("metadata", Default = null)]
public string Metadata { get; set; }
}
internal enum RpcType
{
UnaryCall,
EmptyCall
}
ClientOptions options;
StatsWatcher statsWatcher = new StatsWatcher();
List<RpcType> rpcs;
Dictionary<RpcType, Metadata> metadata;
// make watcher accessible by tests
internal StatsWatcher StatsWatcher => statsWatcher;
internal XdsInteropClient(ClientOptions options)
{
this.options = options;
this.rpcs = ParseRpcArgument(this.options.Rpc);
this.metadata = ParseMetadataArgument(this.options.Metadata);
}
public static void Run(string[] args)
@ -124,8 +143,11 @@ namespace Grpc.IntegrationTesting
var stopwatch = Stopwatch.StartNew();
while (!cancellationToken.IsCancellationRequested)
{
inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken));
rpcsStarted++;
foreach (var rpcType in rpcs)
{
inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken, rpcType));
rpcsStarted++;
}
// only cleanup calls that have already completed, calls that are still inflight will be cleaned up later.
await CleanupCompletedTasksAsync(inflightTasks);
@ -133,7 +155,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine($"Currently {inflightTasks.Count} in-flight RPCs");
// if needed, wait a bit before we start the next RPC.
int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps) - stopwatch.ElapsedMilliseconds);
int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps / rpcs.Count) - stopwatch.ElapsedMilliseconds);
if (nextDueInMillis > 0)
{
await Task.Delay(nextDueInMillis);
@ -146,25 +168,61 @@ namespace Grpc.IntegrationTesting
Console.WriteLine($"Channel shutdown {channelId}");
}
private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken)
private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType)
{
long rpcId = statsWatcher.RpcIdGenerator.Increment();
try
{
Console.WriteLine($"Starting RPC {rpcId}.");
var response = await client.UnaryCallAsync(new SimpleRequest(),
new CallOptions(cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec)));
statsWatcher.OnRpcComplete(rpcId, response.Hostname);
if (options.PrintResponse)
Console.WriteLine($"Starting RPC {rpcId} of type {rpcType}");
// metadata to send with the RPC
var headers = new Metadata();
if (metadata.ContainsKey(rpcType))
{
Console.WriteLine($"Got response {response}");
headers = metadata[rpcType];
if (headers.Count > 0)
{
var printableHeaders = "[" + string.Join(", ", headers) + "]";
Console.WriteLine($"Will send metadata {printableHeaders}");
}
}
Console.WriteLine($"RPC {rpcId} succeeded ");
if (rpcType == RpcType.UnaryCall)
{
var call = client.UnaryCallAsync(new SimpleRequest(),
new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec)));
var response = await call;
var hostname = (await call.ResponseHeadersAsync).GetValue("hostname") ?? response.Hostname;
statsWatcher.OnRpcComplete(rpcId, rpcType, hostname);
if (options.PrintResponse)
{
Console.WriteLine($"Got response {response}");
}
}
else if (rpcType == RpcType.EmptyCall)
{
var call = client.EmptyCallAsync(new Empty(),
new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec)));
var response = await call;
var hostname = (await call.ResponseHeadersAsync).GetValue("hostname");
statsWatcher.OnRpcComplete(rpcId, rpcType, hostname);
if (options.PrintResponse)
{
Console.WriteLine($"Got response {response}");
}
}
else
{
throw new InvalidOperationException($"Unsupported RPC type ${rpcType}");
}
Console.WriteLine($"RPC {rpcId} succeeded");
}
catch (RpcException ex)
{
statsWatcher.OnRpcComplete(rpcId, null);
statsWatcher.OnRpcComplete(rpcId, rpcType, null);
Console.WriteLine($"RPC {rpcId} failed: {ex}");
}
}
@ -186,6 +244,66 @@ namespace Grpc.IntegrationTesting
tasks.Remove(task);
}
}
private static List<RpcType> ParseRpcArgument(string rpcArg)
{
var result = new List<RpcType>();
foreach (var part in rpcArg.Split(','))
{
result.Add(ParseRpc(part));
}
return result;
}
private static RpcType ParseRpc(string rpc)
{
switch (rpc)
{
case "UnaryCall":
return RpcType.UnaryCall;
case "EmptyCall":
return RpcType.EmptyCall;
default:
throw new ArgumentException($"Unknown RPC: \"{rpc}\"");
}
}
private static Dictionary<RpcType, Metadata> ParseMetadataArgument(string metadataArg)
{
var rpcMetadata = new Dictionary<RpcType, Metadata>();
if (string.IsNullOrEmpty(metadataArg))
{
return rpcMetadata;
}
foreach (var metadata in metadataArg.Split(','))
{
var parts = metadata.Split(':');
if (parts.Length != 3)
{
throw new ArgumentException($"Invalid metadata: \"{metadata}\"");
}
var rpc = ParseRpc(parts[0]);
var key = parts[1];
var value = parts[2];
var md = new Metadata { {key, value} };
if (rpcMetadata.ContainsKey(rpc))
{
var existingMetadata = rpcMetadata[rpc];
foreach (var entry in md)
{
existingMetadata.Add(entry);
}
}
else
{
rpcMetadata.Add(rpc, md);
}
}
return rpcMetadata;
}
}
internal class StatsWatcher
@ -198,6 +316,7 @@ namespace Grpc.IntegrationTesting
private int rpcsCompleted;
private int rpcsNoHostname;
private Dictionary<string, int> rpcsByHostname;
private Dictionary<string, Dictionary<string, int>> rpcsByMethod;
public AtomicCounter RpcIdGenerator => rpcIdGenerator;
@ -206,7 +325,7 @@ namespace Grpc.IntegrationTesting
Reset();
}
public void OnRpcComplete(long rpcId, string responseHostname)
public void OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname)
{
lock (myLock)
{
@ -221,11 +340,24 @@ namespace Grpc.IntegrationTesting
}
else
{
// update rpcsByHostname
if (!rpcsByHostname.ContainsKey(responseHostname))
{
rpcsByHostname[responseHostname] = 0;
}
rpcsByHostname[responseHostname] += 1;
// update rpcsByMethod
var method = rpcType.ToString();
if (!rpcsByMethod.ContainsKey(method))
{
rpcsByMethod[method] = new Dictionary<string, int>();
}
if (!rpcsByMethod[method].ContainsKey(responseHostname))
{
rpcsByMethod[method][responseHostname] = 0;
}
rpcsByMethod[method][responseHostname] += 1;
}
rpcsCompleted += 1;
@ -245,6 +377,7 @@ namespace Grpc.IntegrationTesting
rpcsCompleted = 0;
rpcsNoHostname = 0;
rpcsByHostname = new Dictionary<string, int>();
rpcsByMethod = new Dictionary<string, Dictionary<string, int>>();
}
}
@ -269,6 +402,14 @@ namespace Grpc.IntegrationTesting
// we collected enough RPCs, or timed out waiting
var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname };
response.RpcsByPeer.Add(rpcsByHostname);
response.RpcsByMethod.Clear();
foreach (var methodEntry in rpcsByMethod)
{
var rpcsByPeer = new LoadBalancerStatsResponse.Types.RpcsByPeer();
rpcsByPeer.RpcsByPeer_.Add(methodEntry.Value);
response.RpcsByMethod[methodEntry.Key] = rpcsByPeer;
}
Reset();
return response;
}

@ -59,6 +59,7 @@ namespace Grpc.IntegrationTesting
NumChannels = 1,
Qps = 1,
RpcTimeoutSec = 10,
Rpc = "UnaryCall",
Server = $"{Host}:{backendServer.Ports.Single().BoundPort}",
});
@ -89,7 +90,7 @@ namespace Grpc.IntegrationTesting
string backendName = "backend1";
backendService.UnaryHandler = (request, context) =>
{
return Task.FromResult(new SimpleResponse { Hostname = backendName});
return Task.FromResult(new SimpleResponse { Hostname = backendName });
};
var cancellationTokenSource = new CancellationTokenSource();
@ -104,6 +105,9 @@ namespace Grpc.IntegrationTesting
Assert.AreEqual(0, stats.NumFailures);
Assert.AreEqual(backendName, stats.RpcsByPeer.Keys.Single());
Assert.AreEqual(5, stats.RpcsByPeer[backendName]);
Assert.AreEqual("UnaryCall", stats.RpcsByMethod.Keys.Single());
Assert.AreEqual(backendName, stats.RpcsByMethod["UnaryCall"].RpcsByPeer_.Keys.Single());
Assert.AreEqual(5, stats.RpcsByMethod["UnaryCall"].RpcsByPeer_[backendName]);
await Task.Delay(100);
@ -116,6 +120,36 @@ namespace Grpc.IntegrationTesting
Assert.AreEqual(0, stats2.NumFailures);
Assert.AreEqual(backendName, stats2.RpcsByPeer.Keys.Single());
Assert.AreEqual(3, stats2.RpcsByPeer[backendName]);
Assert.AreEqual("UnaryCall", stats2.RpcsByMethod.Keys.Single());
Assert.AreEqual(backendName, stats2.RpcsByMethod["UnaryCall"].RpcsByPeer_.Keys.Single());
Assert.AreEqual(3, stats2.RpcsByMethod["UnaryCall"].RpcsByPeer_[backendName]);
cancellationTokenSource.Cancel();
await runChannelsTask;
}
[Test]
public async Task HostnameReadFromResponseHeaders()
{
string correctBackendName = "backend1";
backendService.UnaryHandler = async (request, context) =>
{
await context.WriteResponseHeadersAsync(new Metadata { {"hostname", correctBackendName} });
return new SimpleResponse { Hostname = "wrong_hostname" };
};
var cancellationTokenSource = new CancellationTokenSource();
var runChannelsTask = xdsInteropClient.RunChannelsAsync(cancellationTokenSource.Token);
var stats = await lbStatsClient.GetClientStatsAsync(new LoadBalancerStatsRequest
{
NumRpcs = 3,
TimeoutSec = 10,
}, deadline: DateTime.UtcNow.AddSeconds(30));
Assert.AreEqual(0, stats.NumFailures);
Assert.AreEqual(correctBackendName, stats.RpcsByPeer.Keys.Single());
Assert.AreEqual(3, stats.RpcsByPeer[correctBackendName]);
cancellationTokenSource.Cancel();
await runChannelsTask;
@ -124,11 +158,17 @@ namespace Grpc.IntegrationTesting
public class BackendServiceImpl : TestService.TestServiceBase
{
public UnaryServerMethod<SimpleRequest, SimpleResponse> UnaryHandler { get; set; }
public UnaryServerMethod<Empty, Empty> EmptyHandler { get; set; }
public override Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
{
return UnaryHandler(request, context);
}
public override Task<Empty> EmptyCall(Empty request, ServerCallContext context)
{
return EmptyHandler(request, context);
}
}
}
}

@ -48,12 +48,17 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py
python tools/run_tests/run_tests.py -l csharp -c opt --build_only
# Test cases "path_matching" and "header_matching" are not included in "all",
# because not all interop clients in all languages support these new tests.
#
# TODO(jtattermusch): remove "path_matching" and "header_matching" from
# --test_case after they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case=all \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
--gcp_suffix=$(date '+%s') \
--verbose \
--client_cmd='dotnet exec src/csharp/Grpc.IntegrationTesting.XdsClient/bin/Release/netcoreapp2.1/Grpc.IntegrationTesting.XdsClient.dll -- --server=xds:///{server_uri} --stats_port={stats_port} --qps={qps}'
--client_cmd='dotnet exec src/csharp/Grpc.IntegrationTesting.XdsClient/bin/Release/netcoreapp2.1/Grpc.IntegrationTesting.XdsClient.dll -- --server=xds:///{server_uri} --stats_port={stats_port} --qps={qps} {rpcs_to_send} {metadata_to_send}'

@ -1743,7 +1743,12 @@ try:
metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format(
key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE)
else:
metadata_to_send = '--metadata=""'
# Setting the arg explicitly to empty with '--metadata=""'
# makes C# client fail
# (see https://github.com/commandlineparser/commandline/issues/412),
# so instead we just rely on clients using the default when
# metadata arg is not specified.
metadata_to_send = ''
if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE:
# TODO(ericgribkoff) Unconditional wait is recommended by TD

Loading…
Cancel
Save