From 89717aa921f52715c90d55820819cb26e7d74705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thiago=20C=2E=20D=27=C3=81vila?= Date: Mon, 18 May 2020 21:35:36 -0300 Subject: [PATCH 01/14] Initial module docstring --- src/compiler/python_generator.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 36318a3c031..9464154fad4 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -777,7 +777,8 @@ pair PrivateGenerator::GetGrpcServices() { if (generate_in_pb2_grpc) { out->Print( "# Generated by the gRPC Python protocol compiler plugin. " - "DO NOT EDIT!\n"); + "DO NOT EDIT!\n\"\"\"" + "Protocol buffer generated client and server classes.\"\"\"\n"); if (!PrintPreamble(out.get())) { return make_pair(false, ""); } From d54d1275dd966799441081c2f1925a9f6ae2e0d4 Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Fri, 22 May 2020 14:59:29 -0700 Subject: [PATCH 02/14] Fix missing include for std::string Fixes #23019 --- src/core/lib/debug/stats.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/lib/debug/stats.h b/src/core/lib/debug/stats.h index 6117e8e4ff8..bc56c27bba9 100644 --- a/src/core/lib/debug/stats.h +++ b/src/core/lib/debug/stats.h @@ -25,6 +25,8 @@ #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include + typedef struct grpc_stats_data { gpr_atm counters[GRPC_STATS_COUNTER_COUNT]; gpr_atm histograms[GRPC_STATS_HISTOGRAM_BUCKETS]; From 280fcdb95fd893b00e401c0ea62a5d94e07b127a Mon Sep 17 00:00:00 2001 From: Kevin Dungs Date: Tue, 26 May 2020 01:06:53 +0200 Subject: [PATCH 03/14] Correct gMock syntax in example code. --- doc/unit_testing.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/unit_testing.md b/doc/unit_testing.md index ca5648130ce..c301e7d4219 100644 --- a/doc/unit_testing.md +++ b/doc/unit_testing.md @@ -144,7 +144,7 @@ Unary RPC: MockEchoTestServiceStub stub; EchoResponse resp; resp.set_message("hello world"); -Expect_CALL(stub, Echo(_,_,_)).Times(Atleast(1)).WillOnce(DoAll(SetArgPointee<2>(resp), Return(Status::OK))); +EXPECT_CALL(stub, Echo(_,_,_)).Times(AtLeast(1)).WillOnce(DoAll(SetArgPointee<2>(resp), Return(Status::OK))); FakeClient client(stub); client.DoEcho(); ``` From 1c20b397abf6087f9aab5fa0402f26f05acaf630 Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Thu, 28 May 2020 14:45:30 -0700 Subject: [PATCH 04/14] Move header up. --- src/core/lib/debug/stats.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/debug/stats.h b/src/core/lib/debug/stats.h index a6bf8cefeb0..4f9cceaf9f9 100644 --- a/src/core/lib/debug/stats.h +++ b/src/core/lib/debug/stats.h @@ -21,12 +21,12 @@ #include +#include + #include #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include - typedef struct grpc_stats_data { gpr_atm counters[GRPC_STATS_COUNTER_COUNT]; gpr_atm histograms[GRPC_STATS_HISTOGRAM_BUCKETS]; From 942a2b4201456bf0e8afa04a5d0ff63756bc4812 Mon Sep 17 00:00:00 2001 From: staticdev Date: Fri, 29 May 2020 12:45:44 -0300 Subject: [PATCH 05/14] Review of wording --- src/compiler/python_generator.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 9464154fad4..c53c5f3d27e 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -778,7 +778,8 @@ pair PrivateGenerator::GetGrpcServices() { out->Print( "# Generated by the gRPC Python protocol compiler plugin. " "DO NOT EDIT!\n\"\"\"" - "Protocol buffer generated client and server classes.\"\"\"\n"); + "Client and server classes corresponding to protobuf-defined services." + "\"\"\"\n"); if (!PrintPreamble(out.get())) { return make_pair(false, ""); } From ab41ecbaafb42ac8ccdf8daffcf80058276f0868 Mon Sep 17 00:00:00 2001 From: staticdev Date: Sat, 30 May 2020 10:53:24 -0300 Subject: [PATCH 06/14] clang_format_code --- src/compiler/python_generator.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index c53c5f3d27e..096ac5b86b6 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -778,8 +778,8 @@ pair PrivateGenerator::GetGrpcServices() { out->Print( "# Generated by the gRPC Python protocol compiler plugin. " "DO NOT EDIT!\n\"\"\"" - "Client and server classes corresponding to protobuf-defined services." - "\"\"\"\n"); + "Client and server classes corresponding to protobuf-defined " + "services.\"\"\"\n"); if (!PrintPreamble(out.get())) { return make_pair(false, ""); } From 682e8bd035cf750766b9785c8d2f59c0297f453e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 2 Jun 2020 16:32:16 +0200 Subject: [PATCH 07/14] add C# xds example --- examples/csharp/Xds/Greeter.sln | 34 +++++++++ examples/csharp/Xds/Greeter/Greeter.csproj | 19 +++++ .../Xds/GreeterClient/GreeterClient.csproj | 12 ++++ examples/csharp/Xds/GreeterClient/Program.cs | 40 +++++++++++ .../Xds/GreeterServer/GreeterServer.csproj | 12 ++++ examples/csharp/Xds/GreeterServer/Program.cs | 71 +++++++++++++++++++ examples/csharp/Xds/README.md | 29 ++++++++ 7 files changed, 217 insertions(+) create mode 100644 examples/csharp/Xds/Greeter.sln create mode 100644 examples/csharp/Xds/Greeter/Greeter.csproj create mode 100644 examples/csharp/Xds/GreeterClient/GreeterClient.csproj create mode 100644 examples/csharp/Xds/GreeterClient/Program.cs create mode 100644 examples/csharp/Xds/GreeterServer/GreeterServer.csproj create mode 100644 examples/csharp/Xds/GreeterServer/Program.cs create mode 100644 examples/csharp/Xds/README.md diff --git a/examples/csharp/Xds/Greeter.sln b/examples/csharp/Xds/Greeter.sln new file mode 100644 index 00000000000..a5ba98d0bed --- /dev/null +++ b/examples/csharp/Xds/Greeter.sln @@ -0,0 +1,34 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26228.4 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greeter", "Greeter\Greeter.csproj", "{13B6DFC8-F5F6-4CC2-99DF-57A7CF042033}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreeterClient", "GreeterClient\GreeterClient.csproj", "{B754FB02-D501-4308-8B89-33AB7119C80D}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreeterServer", "GreeterServer\GreeterServer.csproj", "{DDBFF994-E076-43AD-B18D-049DFC1B670C}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {13B6DFC8-F5F6-4CC2-99DF-57A7CF042033}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {13B6DFC8-F5F6-4CC2-99DF-57A7CF042033}.Debug|Any CPU.Build.0 = Debug|Any CPU + {13B6DFC8-F5F6-4CC2-99DF-57A7CF042033}.Release|Any CPU.ActiveCfg = Release|Any CPU + {13B6DFC8-F5F6-4CC2-99DF-57A7CF042033}.Release|Any CPU.Build.0 = Release|Any CPU + {B754FB02-D501-4308-8B89-33AB7119C80D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B754FB02-D501-4308-8B89-33AB7119C80D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B754FB02-D501-4308-8B89-33AB7119C80D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B754FB02-D501-4308-8B89-33AB7119C80D}.Release|Any CPU.Build.0 = Release|Any CPU + {DDBFF994-E076-43AD-B18D-049DFC1B670C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DDBFF994-E076-43AD-B18D-049DFC1B670C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DDBFF994-E076-43AD-B18D-049DFC1B670C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DDBFF994-E076-43AD-B18D-049DFC1B670C}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/examples/csharp/Xds/Greeter/Greeter.csproj b/examples/csharp/Xds/Greeter/Greeter.csproj new file mode 100644 index 00000000000..232cf560edf --- /dev/null +++ b/examples/csharp/Xds/Greeter/Greeter.csproj @@ -0,0 +1,19 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + + diff --git a/examples/csharp/Xds/GreeterClient/GreeterClient.csproj b/examples/csharp/Xds/GreeterClient/GreeterClient.csproj new file mode 100644 index 00000000000..ac10d854972 --- /dev/null +++ b/examples/csharp/Xds/GreeterClient/GreeterClient.csproj @@ -0,0 +1,12 @@ + + + + netcoreapp2.1 + Exe + + + + + + + diff --git a/examples/csharp/Xds/GreeterClient/Program.cs b/examples/csharp/Xds/GreeterClient/Program.cs new file mode 100644 index 00000000000..0b2b002b1fd --- /dev/null +++ b/examples/csharp/Xds/GreeterClient/Program.cs @@ -0,0 +1,40 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using Grpc.Core; +using Helloworld; + +namespace GreeterClient +{ + class Program + { + public static void Main(string[] args) + { + // TODO: specify server address.. + + Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure); + + var client = new Greeter.GreeterClient(channel); + String user = "you"; + + var reply = client.SayHello(new HelloRequest { Name = user }); + Console.WriteLine("Greeter client received: " + reply.Message); + + channel.ShutdownAsync().Wait(); + Console.WriteLine("Press any key to exit..."); + Console.ReadKey(); + } + } +} diff --git a/examples/csharp/Xds/GreeterServer/GreeterServer.csproj b/examples/csharp/Xds/GreeterServer/GreeterServer.csproj new file mode 100644 index 00000000000..ac10d854972 --- /dev/null +++ b/examples/csharp/Xds/GreeterServer/GreeterServer.csproj @@ -0,0 +1,12 @@ + + + + netcoreapp2.1 + Exe + + + + + + + diff --git a/examples/csharp/Xds/GreeterServer/Program.cs b/examples/csharp/Xds/GreeterServer/Program.cs new file mode 100644 index 00000000000..030dd61c8b2 --- /dev/null +++ b/examples/csharp/Xds/GreeterServer/Program.cs @@ -0,0 +1,71 @@ +// Copyright 2020 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Net; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.HealthCheck; +using Helloworld; +using Grpc.Health; +using Grpc.Health.V1; +using Grpc.Reflection; +using Grpc.Reflection.V1Alpha; + +namespace GreeterServer +{ + class GreeterImpl : Greeter.GreeterBase + { + // Server side handler of the SayHello RPC + public override Task SayHello(HelloRequest request, ServerCallContext context) + { + String hostName = Dns.GetHostName(); + return Task.FromResult(new HelloReply { Message = $"Hello {request.Name} from {hostName}!"}); + } + } + + class Program + { + const int Port = 50051; + + public static void Main(string[] args) + { + var serviceDescriptors = new [] {Greeter.Descriptor, Health.Descriptor, ServerReflection.Descriptor}; + var greeterImpl = new GreeterImpl(); + var healthServiceImpl = new HealthServiceImpl(); + var reflectionImpl = new ReflectionServiceImpl(serviceDescriptors); + + Server server = new Server + { + Services = { Greeter.BindService(greeterImpl), Health.BindService(healthServiceImpl), ServerReflection.BindService(reflectionImpl) }, + Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) } + }; + server.Start(); + + // Mark all services as healthy. + foreach (var serviceDescriptor in serviceDescriptors) + { + healthServiceImpl.SetStatus(serviceDescriptor.FullName, HealthCheckResponse.Types.ServingStatus.Serving); + } + // Mark overall server status as healthy. + healthServiceImpl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); + + Console.WriteLine("Greeter server listening on port " + Port); + Console.WriteLine("Press any key to stop the server..."); + Console.ReadKey(); + + server.ShutdownAsync().Wait(); + } + } +} diff --git a/examples/csharp/Xds/README.md b/examples/csharp/Xds/README.md new file mode 100644 index 00000000000..7c09f307896 --- /dev/null +++ b/examples/csharp/Xds/README.md @@ -0,0 +1,29 @@ +gRPC Hostname example (C#) +======================== + +BACKGROUND +------------- +This is a version of the helloworld example with a server whose response includes its hostname. It also supports health and reflection services. This makes it a good server to test infrastructure, like load balancing. + +PREREQUISITES +------------- + +- The [.NET Core SDK 2.1+](https://www.microsoft.com/net/core) + +You can also build the solution `Greeter.sln` using Visual Studio 2019, +but it's not a requirement. + +BUILD AND RUN +------------- + +- Build and run the server + + ``` + > dotnet run -p GreeterServer + ``` + +- Build and run the client + + ``` + > dotnet run -p GreeterClient + ``` From 8049266e642e3a6a0782ab9eee5fee439f0acfcc Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 2 Jun 2020 17:31:56 +0200 Subject: [PATCH 08/14] improvements --- examples/csharp/Xds/GreeterServer/Program.cs | 6 +- examples/csharp/Xds/README.md | 90 +++++++++++++++++--- 2 files changed, 83 insertions(+), 13 deletions(-) diff --git a/examples/csharp/Xds/GreeterServer/Program.cs b/examples/csharp/Xds/GreeterServer/Program.cs index 030dd61c8b2..e7a2965739f 100644 --- a/examples/csharp/Xds/GreeterServer/Program.cs +++ b/examples/csharp/Xds/GreeterServer/Program.cs @@ -30,14 +30,14 @@ namespace GreeterServer // Server side handler of the SayHello RPC public override Task SayHello(HelloRequest request, ServerCallContext context) { - String hostName = Dns.GetHostName(); + String hostName = Dns.GetHostName(); // TODO: make hostname configurable return Task.FromResult(new HelloReply { Message = $"Hello {request.Name} from {hostName}!"}); } } class Program { - const int Port = 50051; + const int Port = 50051; // TODO: make port configurable public static void Main(string[] args) { @@ -49,7 +49,7 @@ namespace GreeterServer Server server = new Server { Services = { Greeter.BindService(greeterImpl), Health.BindService(healthServiceImpl), ServerReflection.BindService(reflectionImpl) }, - Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) } + Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) } // TODO: don't listen on just localhost }; server.Start(); diff --git a/examples/csharp/Xds/README.md b/examples/csharp/Xds/README.md index 7c09f307896..9b3f939dcf0 100644 --- a/examples/csharp/Xds/README.md +++ b/examples/csharp/Xds/README.md @@ -3,7 +3,7 @@ gRPC Hostname example (C#) BACKGROUND ------------- -This is a version of the helloworld example with a server whose response includes its hostname. It also supports health and reflection services. This makes it a good server to test infrastructure, like load balancing. +This is a version of the helloworld example with a server whose response includes its hostname. It also supports health and reflection services. This makes it a good server to test infrastructure, such as XDS load balancing. PREREQUISITES ------------- @@ -13,17 +13,87 @@ PREREQUISITES You can also build the solution `Greeter.sln` using Visual Studio 2019, but it's not a requirement. -BUILD AND RUN +RUN THE EXAMPLE ------------- -- Build and run the server +First, build and run the server, then verify the server is running and +check the server is behaving as expected (more on that below). - ``` - > dotnet run -p GreeterServer - ``` +``` +cd GreeterServer +dotnet run +``` -- Build and run the client +After configuring your xDS server to track the gRPC server we just started, +create a bootstrap file as desribed in [gRFC A27](https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md): - ``` - > dotnet run -p GreeterClient - ``` +``` +{ + xds_servers": [ + { + "server_uri": , + "channel_creds": [ + { + "type": , + "config": + } + ] + } + ], + "node": +} +``` + +Then point the `GRPC_XDS_BOOTSTRAP` environment variable at the bootstrap file: + +``` +export GRPC_XDS_BOOTSTRAP=/etc/xds-bootstrap.json +``` + +Finally, run your client: + +``` +cd GreeterClient +dotnet run -- xds-experimental:///my-backend +``` + +VERIFYING THE SERVER +------------- + +`grpcurl` can be used to test your server. If you don't have it, +install [`grpcurl`](https://github.com/fullstorydev/grpcurl/releases). This will allow +you to manually test the service. + +Exercise your server's application-layer service: + +```sh +> grpcurl --plaintext -d '{"name": "you"}' localhost:50051 +{ + "message": "Hello you from jtatt.muc.corp.google.com!" +} +``` + +Make sure that all of your server's services are available via reflection: + +```sh +> grpcurl --plaintext localhost:50051 list +grpc.health.v1.Health +grpc.reflection.v1alpha.ServerReflection +helloworld.Greeter +``` + +Make sure that your services are reporting healthy: + +```sh +> grpcurl --plaintext -d '{"service": "helloworld.Greeter"}' localhost:50051 +grpc.health.v1.Health/Check +{ + "status": "SERVING" +} + +> grpcurl --plaintext -d '{"service": ""}' localhost:50051 +grpc.health.v1.Health/Check +{ + "status": "SERVING" +} +``` From f5ecc0adc89c388a1a0614fe9b7e16eb1427a389 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 2 Jun 2020 17:58:17 +0200 Subject: [PATCH 09/14] add commandline parsing --- examples/csharp/Xds/Greeter/Greeter.csproj | 1 + examples/csharp/Xds/GreeterClient/Program.cs | 15 ++++++++++++-- examples/csharp/Xds/GreeterServer/Program.cs | 21 ++++++++++++++++---- examples/csharp/Xds/README.md | 2 +- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/examples/csharp/Xds/Greeter/Greeter.csproj b/examples/csharp/Xds/Greeter/Greeter.csproj index 232cf560edf..9351cbcb53a 100644 --- a/examples/csharp/Xds/Greeter/Greeter.csproj +++ b/examples/csharp/Xds/Greeter/Greeter.csproj @@ -9,6 +9,7 @@ + diff --git a/examples/csharp/Xds/GreeterClient/Program.cs b/examples/csharp/Xds/GreeterClient/Program.cs index 0b2b002b1fd..c6820a8f8d7 100644 --- a/examples/csharp/Xds/GreeterClient/Program.cs +++ b/examples/csharp/Xds/GreeterClient/Program.cs @@ -15,16 +15,27 @@ using System; using Grpc.Core; using Helloworld; +using CommandLine; namespace GreeterClient { class Program { + private class Options + { + [Option("server", Default = "localhost:50051", HelpText = "The address of the server")] + public string Server { get; set; } + } + public static void Main(string[] args) { - // TODO: specify server address.. + Parser.Default.ParseArguments(args) + .WithParsed(options => RunClient(options)); + } - Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure); + private static void RunClient(Options options) + { + Channel channel = new Channel(options.Server, ChannelCredentials.Insecure); var client = new Greeter.GreeterClient(channel); String user = "you"; diff --git a/examples/csharp/Xds/GreeterServer/Program.cs b/examples/csharp/Xds/GreeterServer/Program.cs index e7a2965739f..4c3e46158b3 100644 --- a/examples/csharp/Xds/GreeterServer/Program.cs +++ b/examples/csharp/Xds/GreeterServer/Program.cs @@ -22,6 +22,7 @@ using Grpc.Health; using Grpc.Health.V1; using Grpc.Reflection; using Grpc.Reflection.V1Alpha; +using CommandLine; namespace GreeterServer { @@ -30,16 +31,28 @@ namespace GreeterServer // Server side handler of the SayHello RPC public override Task SayHello(HelloRequest request, ServerCallContext context) { - String hostName = Dns.GetHostName(); // TODO: make hostname configurable + String hostName = Dns.GetHostName(); return Task.FromResult(new HelloReply { Message = $"Hello {request.Name} from {hostName}!"}); } } class Program { - const int Port = 50051; // TODO: make port configurable + class Options + { + [Option("port", Default = 50051, HelpText = "The port to listen on.")] + public int Port { get; set; } + + // TODO: make hostname configurable + } public static void Main(string[] args) + { + Parser.Default.ParseArguments(args) + .WithParsed(options => RunServer(options)); + } + + private static void RunServer(Options options) { var serviceDescriptors = new [] {Greeter.Descriptor, Health.Descriptor, ServerReflection.Descriptor}; var greeterImpl = new GreeterImpl(); @@ -49,7 +62,7 @@ namespace GreeterServer Server server = new Server { Services = { Greeter.BindService(greeterImpl), Health.BindService(healthServiceImpl), ServerReflection.BindService(reflectionImpl) }, - Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) } // TODO: don't listen on just localhost + Ports = { new ServerPort("[::]", options.Port, ServerCredentials.Insecure) } }; server.Start(); @@ -61,7 +74,7 @@ namespace GreeterServer // Mark overall server status as healthy. healthServiceImpl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); - Console.WriteLine("Greeter server listening on port " + Port); + Console.WriteLine("Greeter server listening on port " + options.Port); Console.WriteLine("Press any key to stop the server..."); Console.ReadKey(); diff --git a/examples/csharp/Xds/README.md b/examples/csharp/Xds/README.md index 9b3f939dcf0..ce0aa9f7443 100644 --- a/examples/csharp/Xds/README.md +++ b/examples/csharp/Xds/README.md @@ -54,7 +54,7 @@ Finally, run your client: ``` cd GreeterClient -dotnet run -- xds-experimental:///my-backend +dotnet run --server xds-experimental:///my-backend ``` VERIFYING THE SERVER From 0a452b9d1973102ee5d8826eb500aa38fb241827 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 2 Jun 2020 00:29:33 -0700 Subject: [PATCH 10/14] Allow configuring default service configs in grpc_cli --- test/cpp/util/grpc_cli.cc | 3 +++ test/cpp/util/grpc_tool.cc | 9 +++++++++ test/cpp/util/grpc_tool_test.cc | 23 +++++++++++++++++++++++ 3 files changed, 35 insertions(+) diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index a4035c57758..663b44b9f56 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -51,6 +51,9 @@ --decode=grpc.testing.SimpleResponse \ src/proto/grpc/testing/messages.proto \ < output.bin > output.txt + 10. --default_service_config, optional default service config to use + on the channel. Note that this may be ignored if the name resolver + returns a service config. */ #include diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc index 053fd3862c0..a7bdb2b54ff 100644 --- a/test/cpp/util/grpc_tool.cc +++ b/test/cpp/util/grpc_tool.cc @@ -57,6 +57,11 @@ DEFINE_string(proto_path, ".", "Path to look for the proto file."); DEFINE_string(protofiles, "", "Name of the proto file."); DEFINE_bool(binary_input, false, "Input in binary format"); DEFINE_bool(binary_output, false, "Output in binary format"); +DEFINE_string( + default_service_config, "", + "Default service config to use on the channel, if non-empty. Note " + "that this will be ignored if the name resolver returns a service " + "config."); DEFINE_bool(json_input, false, "Input in json format"); DEFINE_bool(json_output, false, "Output in json format"); DEFINE_string(infile, "", "Input file (default is stdin)"); @@ -217,6 +222,10 @@ std::shared_ptr CreateCliChannel( if (!cred.GetSslTargetNameOverride().empty()) { args.SetSslTargetNameOverride(cred.GetSslTargetNameOverride()); } + if (!FLAGS_default_service_config.empty()) { + args.SetString(GRPC_ARG_SERVICE_CONFIG, + FLAGS_default_service_config.c_str()); + } return ::grpc::CreateCustomChannel(server_address, cred.GetCredentials(), args); } diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc index 175f16c911c..5d15e8c183a 100644 --- a/test/cpp/util/grpc_tool_test.cc +++ b/test/cpp/util/grpc_tool_test.cc @@ -118,6 +118,7 @@ DECLARE_bool(batch); DECLARE_string(metadata); DECLARE_string(protofiles); DECLARE_string(proto_path); +DECLARE_string(default_service_config); namespace { @@ -1192,6 +1193,28 @@ TEST_F(GrpcToolTest, ListCommand_OverrideSslHostName) { ShutdownServer(); } +TEST_F(GrpcToolTest, ConfiguringDefaultServiceConfig) { + // Test input "grpc_cli list localhost: + // --default_service_config={\"loadBalancingConfig\":[{\"pick_first\":{}}]}" + std::stringstream output_stream; + const grpc::string server_address = SetUpServer(); + const char* argv[] = {"grpc_cli", "ls", server_address.c_str()}; + // Just check that the tool is still operational when --default_service_config + // is configured. This particular service config is in reality redundant with + // the channel's default configuration. + FLAGS_l = false; + FLAGS_default_service_config = + "{\"loadBalancingConfig\":[{\"pick_first\":{}}]}"; + EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), + std::bind(PrintStream, &output_stream, + std::placeholders::_1))); + FLAGS_default_service_config = ""; + EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), + "grpc.testing.EchoTestService\n" + "grpc.reflection.v1alpha.ServerReflection\n")); + ShutdownServer(); +} + } // namespace testing } // namespace grpc From 11c54ada0dc6751914dfcb3181e8e8250386c097 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 2 Jun 2020 11:11:06 -0700 Subject: [PATCH 11/14] Revert "Fix StartCall: make corking work and allow concurrent Start*" --- .../impl/codegen/client_callback_impl.h | 359 +++++++----------- .../end2end/client_callback_end2end_test.cc | 110 +----- 2 files changed, 163 insertions(+), 306 deletions(-) diff --git a/include/grpcpp/impl/codegen/client_callback_impl.h b/include/grpcpp/impl/codegen/client_callback_impl.h index 86a46d8c870..8e683743e06 100644 --- a/include/grpcpp/impl/codegen/client_callback_impl.h +++ b/include/grpcpp/impl/codegen/client_callback_impl.h @@ -461,51 +461,76 @@ class ClientCallbackReaderWriterImpl // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any read backlog // 3. Any write backlog - // 4. Recv trailing metadata (unless corked) + // 4. Recv trailing metadata, on_completion callback + started_ = true; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); if (!start_corked_) { start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); } - + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - { - grpc::internal::MutexLock lock(&start_mu_); - - if (backlog_.read_ops) { - call_.PerformOps(&read_ops_); - } - if (backlog_.write_ops) { - call_.PerformOps(&write_ops_); - } - if (backlog_.writes_done_ops) { - call_.PerformOps(&writes_done_ops_); - } - call_.PerformOps(&finish_ops_); - // The last thing in this critical section is to set started_ so that it - // can be used lock-free as well. - started_.store(true, std::memory_order_release); + // Also set up the read and write tags so that they don't have to be set up + // each time + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(); + }, + &read_ops_, /*can_inline=*/false); + read_ops_.set_core_cq_tag(&read_tag_); + if (read_ops_at_start_) { + call_.PerformOps(&read_ops_); + } + + if (write_ops_at_start_) { + call_.PerformOps(&write_ops_); + } + + if (writes_done_ops_at_start_) { + call_.PerformOps(&writes_done_ops_); } - // MaybeFinish outside the lock to make sure that destruction of this object - // doesn't take place while holding the lock (which would cause the lock to - // be released after destruction) - this->MaybeFinish(); + + finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, + &finish_ops_, /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); } void Read(Response* msg) override { read_ops_.RecvMessage(msg); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.read_ops = true; - return; - } + if (started_) { + call_.PerformOps(&read_ops_); + } else { + read_ops_at_start_ = true; } - call_.PerformOps(&read_ops_); } void Write(const Request* msg, ::grpc::WriteOptions options) override { + if (start_corked_) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); @@ -513,22 +538,18 @@ class ClientCallbackReaderWriterImpl // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(corked_write_needed_)) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; - } - - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.write_ops = true; - return; - } + if (started_) { + call_.PerformOps(&write_ops_); + } else { + write_ops_at_start_ = true; } - call_.PerformOps(&write_ops_); } void WritesDone() override { + if (start_corked_) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } writes_done_ops_.ClientSendClose(); writes_done_tag_.Set(call_.call(), [this](bool ok) { @@ -538,19 +559,11 @@ class ClientCallbackReaderWriterImpl &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(corked_write_needed_)) { - writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; + if (started_) { + call_.PerformOps(&writes_done_ops_); + } else { + writes_done_ops_at_start_ = true; } - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.writes_done_ops = true; - return; - } - } - call_.PerformOps(&writes_done_ops_); } void AddHold(int holds) override { @@ -567,42 +580,8 @@ class ClientCallbackReaderWriterImpl : context_(context), call_(call), reactor_(reactor), - start_corked_(context_->initial_metadata_corked_), - corked_write_needed_(start_corked_) { + start_corked_(context_->initial_metadata_corked_) { this->BindReactor(reactor); - - // Set up the unchanging parts of the start, read, and write tags and ops. - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); - }, - &start_ops_, /*can_inline=*/false); - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - - write_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(); - }, - &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - - read_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadDone(ok); - MaybeFinish(); - }, - &read_ops_, /*can_inline=*/false); - read_ops_.set_core_cq_tag(&read_tag_); - - // Also set up the Finish tag and op set. - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, - /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); } ::grpc_impl::ClientContext* const context_; @@ -613,9 +592,7 @@ class ClientCallbackReaderWriterImpl grpc::internal::CallOpRecvInitialMetadata> start_ops_; grpc::internal::CallbackWithSuccessTag start_tag_; - const bool start_corked_; - bool corked_write_needed_; // no lock needed since only accessed in - // Write/WritesDone which cannot be concurrent + bool start_corked_; grpc::internal::CallOpSet finish_ops_; grpc::internal::CallbackWithSuccessTag finish_tag_; @@ -626,27 +603,22 @@ class ClientCallbackReaderWriterImpl grpc::internal::CallOpClientSendClose> write_ops_; grpc::internal::CallbackWithSuccessTag write_tag_; + bool write_ops_at_start_{false}; grpc::internal::CallOpSet writes_done_ops_; grpc::internal::CallbackWithSuccessTag writes_done_tag_; + bool writes_done_ops_at_start_{false}; grpc::internal::CallOpSet> read_ops_; grpc::internal::CallbackWithSuccessTag read_tag_; + bool read_ops_at_start_{false}; - struct StartCallBacklog { - bool write_ops = false; - bool writes_done_ops = false; - bool read_ops = false; - }; - StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; - - // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish - std::atomic callbacks_outstanding_{3}; - std::atomic_bool started_{false}; - grpc::internal::Mutex start_mu_; + // Minimum of 2 callbacks to pre-register for start and finish + std::atomic callbacks_outstanding_{2}; + bool started_{false}; }; template @@ -698,7 +670,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any backlog - // 3. Recv trailing metadata + // 3. Recv trailing metadata, on_completion callback + started_ = true; start_tag_.Set(call_.call(), [this](bool ok) { @@ -720,13 +693,8 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { }, &read_ops_, /*can_inline=*/false); read_ops_.set_core_cq_tag(&read_tag_); - - { - grpc::internal::MutexLock lock(&start_mu_); - if (backlog_.read_ops) { - call_.PerformOps(&read_ops_); - } - started_.store(true, std::memory_order_release); + if (read_ops_at_start_) { + call_.PerformOps(&read_ops_); } finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, @@ -739,14 +707,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { void Read(Response* msg) override { read_ops_.RecvMessage(msg); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.read_ops = true; - return; - } + if (started_) { + call_.PerformOps(&read_ops_); + } else { + read_ops_at_start_ = true; } - call_.PerformOps(&read_ops_); } void AddHold(int holds) override { @@ -787,16 +752,11 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { grpc::internal::CallOpSet> read_ops_; grpc::internal::CallbackWithSuccessTag read_tag_; - - struct StartCallBacklog { - bool read_ops = false; - }; - StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; + bool read_ops_at_start_{false}; // Minimum of 2 callbacks to pre-register for start and finish std::atomic callbacks_outstanding_{2}; - std::atomic_bool started_{false}; - grpc::internal::Mutex start_mu_; + bool started_{false}; }; template @@ -849,60 +809,74 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata // 2. Any backlog - // 3. Recv trailing metadata + // 3. Recv trailing metadata, on_completion callback + started_ = true; + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_, /*can_inline=*/false); if (!start_corked_) { start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); } + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - { - grpc::internal::MutexLock lock(&start_mu_); - - if (backlog_.write_ops) { - call_.PerformOps(&write_ops_); - } - if (backlog_.writes_done_ops) { - call_.PerformOps(&writes_done_ops_); - } - call_.PerformOps(&finish_ops_); - // The last thing in this critical section is to set started_ so that it - // can be used lock-free as well. - started_.store(true, std::memory_order_release); + // Also set up the read and write tags so that they don't have to be set up + // each time + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_, /*can_inline=*/false); + write_ops_.set_core_cq_tag(&write_tag_); + + if (write_ops_at_start_) { + call_.PerformOps(&write_ops_); } - // MaybeFinish outside the lock to make sure that destruction of this object - // doesn't take place while holding the lock (which would cause the lock to - // be released after destruction) - this->MaybeFinish(); + + if (writes_done_ops_at_start_) { + call_.PerformOps(&writes_done_ops_); + } + + finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, + &finish_ops_, /*can_inline=*/false); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); } void Write(const Request* msg, ::grpc::WriteOptions options) override { - if (GPR_UNLIKELY(options.is_last_message())) { + if (start_corked_) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + + if (options.is_last_message()) { options.set_buffer_hint(); write_ops_.ClientSendClose(); } // TODO(vjpai): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok()); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - - if (GPR_UNLIKELY(corked_write_needed_)) { - write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; + if (started_) { + call_.PerformOps(&write_ops_); + } else { + write_ops_at_start_ = true; } - - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.write_ops = true; - return; - } - } - call_.PerformOps(&write_ops_); } - void WritesDone() override { + if (start_corked_) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } writes_done_ops_.ClientSendClose(); writes_done_tag_.Set(call_.call(), [this](bool ok) { @@ -912,21 +886,11 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { &writes_done_ops_, /*can_inline=*/false); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); - - if (GPR_UNLIKELY(corked_write_needed_)) { - writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, - context_->initial_metadata_flags()); - corked_write_needed_ = false; - } - - if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) { - grpc::internal::MutexLock lock(&start_mu_); - if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) { - backlog_.writes_done_ops = true; - return; - } + if (started_) { + call_.PerformOps(&writes_done_ops_); + } else { + writes_done_ops_at_start_ = true; } - call_.PerformOps(&writes_done_ops_); } void AddHold(int holds) override { @@ -945,36 +909,10 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { : context_(context), call_(call), reactor_(reactor), - start_corked_(context_->initial_metadata_corked_), - corked_write_needed_(start_corked_) { + start_corked_(context_->initial_metadata_corked_) { this->BindReactor(reactor); - - // Set up the unchanging parts of the start and write tags and ops. - start_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnReadInitialMetadataDone(ok); - MaybeFinish(); - }, - &start_ops_, /*can_inline=*/false); - start_ops_.RecvInitialMetadata(context_); - start_ops_.set_core_cq_tag(&start_tag_); - - write_tag_.Set(call_.call(), - [this](bool ok) { - reactor_->OnWriteDone(ok); - MaybeFinish(); - }, - &write_ops_, /*can_inline=*/false); - write_ops_.set_core_cq_tag(&write_tag_); - - // Also set up the Finish tag and op set. finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, - &finish_ops_, - /*can_inline=*/false); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); } ::grpc_impl::ClientContext* const context_; @@ -985,9 +923,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { grpc::internal::CallOpRecvInitialMetadata> start_ops_; grpc::internal::CallbackWithSuccessTag start_tag_; - const bool start_corked_; - bool corked_write_needed_; // no lock needed since only accessed in - // Write/WritesDone which cannot be concurrent + bool start_corked_; grpc::internal::CallOpSet @@ -1000,22 +936,17 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { grpc::internal::CallOpClientSendClose> write_ops_; grpc::internal::CallbackWithSuccessTag write_tag_; + bool write_ops_at_start_{false}; grpc::internal::CallOpSet writes_done_ops_; grpc::internal::CallbackWithSuccessTag writes_done_tag_; + bool writes_done_ops_at_start_{false}; - struct StartCallBacklog { - bool write_ops = false; - bool writes_done_ops = false; - }; - StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */; - - // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish - std::atomic callbacks_outstanding_{3}; - std::atomic_bool started_{false}; - grpc::internal::Mutex start_mu_; + // Minimum of 2 callbacks to pre-register for start and finish + std::atomic callbacks_outstanding_{2}; + bool started_{false}; }; template @@ -1054,6 +985,7 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { // This call initiates two batches, each with a callback // 1. Send initial metadata + write + writes done + recv initial metadata // 2. Read message, recv trailing metadata + started_ = true; start_tag_.Set(call_.call(), [this](bool ok) { @@ -1121,6 +1053,7 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary { // This call will have 2 callbacks: start and finish std::atomic callbacks_outstanding_{2}; + bool started_{false}; }; class ClientCallbackUnaryFactory { diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index c83d6409db0..4f8bfeba372 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -16,6 +16,12 @@ * */ +#include +#include +#include +#include +#include + #include #include #include @@ -25,14 +31,6 @@ #include #include #include -#include - -#include -#include -#include -#include -#include -#include #include "src/core/lib/gpr/env.h" #include "src/core/lib/iomgr/iomgr.h" @@ -45,6 +43,8 @@ #include "test/cpp/util/string_ref_helper.h" #include "test/cpp/util/test_credentials_provider.h" +#include + // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration // should be skipped based on a decision made at SetUp time. In particular, any // callback tests can only be run if the iomgr can run in the background or if @@ -1079,8 +1079,7 @@ class BidiClient public: BidiClient(grpc::testing::EchoTestService::Stub* stub, ServerTryCancelRequestPhase server_try_cancel, - int num_msgs_to_send, bool cork_metadata, bool first_write_async, - ClientCancelInfo client_cancel = {}) + int num_msgs_to_send, ClientCancelInfo client_cancel = {}) : server_try_cancel_(server_try_cancel), msgs_to_send_{num_msgs_to_send}, client_cancel_{client_cancel} { @@ -1090,9 +1089,8 @@ class BidiClient grpc::to_string(server_try_cancel)); } request_.set_message("Hello fren "); - context_.set_initial_metadata_corked(cork_metadata); stub->experimental_async()->BidiStream(&context_, this); - MaybeAsyncWrite(first_write_async); + MaybeWrite(); StartRead(&response_); StartCall(); } @@ -1113,10 +1111,6 @@ class BidiClient } } void OnWriteDone(bool ok) override { - if (async_write_thread_.joinable()) { - async_write_thread_.join(); - RemoveHold(); - } if (server_try_cancel_ == DO_NOT_CANCEL) { EXPECT_TRUE(ok); } else if (!ok) { @@ -1181,26 +1175,6 @@ class BidiClient } private: - void MaybeAsyncWrite(bool first_write_async) { - if (first_write_async) { - // Make sure that we have a write to issue. - // TODO(vjpai): Make this work with 0 writes case as well. - assert(msgs_to_send_ >= 1); - - AddHold(); - async_write_thread_ = std::thread([this] { - std::unique_lock lock(async_write_thread_mu_); - async_write_thread_cv_.wait( - lock, [this] { return async_write_thread_start_; }); - MaybeWrite(); - }); - std::lock_guard lock(async_write_thread_mu_); - async_write_thread_start_ = true; - async_write_thread_cv_.notify_one(); - return; - } - MaybeWrite(); - } void MaybeWrite() { if (client_cancel_.cancel && writes_complete_ == client_cancel_.ops_before_cancel) { @@ -1222,57 +1196,13 @@ class BidiClient std::mutex mu_; std::condition_variable cv_; bool done_ = false; - std::thread async_write_thread_; - bool async_write_thread_start_ = false; - std::mutex async_write_thread_mu_; - std::condition_variable async_write_thread_cv_; }; TEST_P(ClientCallbackEnd2endTest, BidiStream) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/false, /*first_write_async=*/false); - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - -TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) { - MAYBE_SKIP_TEST; - ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/false, /*first_write_async=*/true); - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - -TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) { - MAYBE_SKIP_TEST; - ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/true, /*first_write_async=*/false); - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - -TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { - MAYBE_SKIP_TEST; - ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/true, /*first_write_async=*/true); + BidiClient test{stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend}; test.Await(); // Make sure that the server interceptors were not notified of a cancel if (GetParam().use_interceptors) { @@ -1283,10 +1213,8 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/false, /*first_write_async=*/false, - ClientCancelInfo(2)); + BidiClient test{stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, ClientCancelInfo{2}}; test.Await(); // Make sure that the server interceptors were notified of a cancel if (GetParam().use_interceptors) { @@ -1298,8 +1226,7 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2, - /*cork_metadata=*/false, /*first_write_async=*/false); + BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2}; test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1312,9 +1239,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING, - /*num_msgs_to_send=*/10, /*cork_metadata=*/false, - /*first_write_async=*/false); + BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10}; test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1327,8 +1252,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5, - /*cork_metadata=*/false, /*first_write_async=*/false); + BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5}; test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { From 171eeb552ab268a3129fa27c927c22fb9d522deb Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 2 Jun 2020 21:08:37 +0200 Subject: [PATCH 12/14] make hostname configurable --- examples/csharp/Xds/GreeterServer/Program.cs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/examples/csharp/Xds/GreeterServer/Program.cs b/examples/csharp/Xds/GreeterServer/Program.cs index 4c3e46158b3..e10b1af9f07 100644 --- a/examples/csharp/Xds/GreeterServer/Program.cs +++ b/examples/csharp/Xds/GreeterServer/Program.cs @@ -28,11 +28,17 @@ namespace GreeterServer { class GreeterImpl : Greeter.GreeterBase { + private string hostname; + + public GreeterImpl(string hostname) + { + this.hostname = hostname; + } + // Server side handler of the SayHello RPC public override Task SayHello(HelloRequest request, ServerCallContext context) { - String hostName = Dns.GetHostName(); - return Task.FromResult(new HelloReply { Message = $"Hello {request.Name} from {hostName}!"}); + return Task.FromResult(new HelloReply { Message = $"Hello {request.Name} from {hostname}!"}); } } @@ -43,7 +49,8 @@ namespace GreeterServer [Option("port", Default = 50051, HelpText = "The port to listen on.")] public int Port { get; set; } - // TODO: make hostname configurable + [Option("hostname", Required = false, HelpText = "The name clients will see in responses. If not specified, machine's hostname will obtain automatically.")] + public string Hostname { get; set; } } public static void Main(string[] args) @@ -54,8 +61,10 @@ namespace GreeterServer private static void RunServer(Options options) { + var hostName = options.Hostname ?? Dns.GetHostName(); + var serviceDescriptors = new [] {Greeter.Descriptor, Health.Descriptor, ServerReflection.Descriptor}; - var greeterImpl = new GreeterImpl(); + var greeterImpl = new GreeterImpl(hostName); var healthServiceImpl = new HealthServiceImpl(); var reflectionImpl = new ReflectionServiceImpl(serviceDescriptors); From 21ee2335d26b7beb4f8b97285cf659f168917227 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 2 Jun 2020 13:44:12 -0700 Subject: [PATCH 13/14] Fix use-after-free bug for ResourceState of unsubscribed RDS resource. --- .../ext/filters/client_channel/xds/xds_api.cc | 184 +++++------------- .../ext/filters/client_channel/xds/xds_api.h | 34 +--- .../filters/client_channel/xds/xds_client.cc | 77 +++----- test/cpp/end2end/xds_end2end_test.cc | 48 ++--- 4 files changed, 103 insertions(+), 240 deletions(-) diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index d2d10b634c4..93319313ef3 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -273,42 +273,6 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node, arena); } -envoy_api_v2_DiscoveryRequest* CreateDiscoveryRequest( - upb_arena* arena, const char* type_url, const std::string& version, - const std::string& nonce, grpc_error* error) { - // Create a request. - envoy_api_v2_DiscoveryRequest* request = - envoy_api_v2_DiscoveryRequest_new(arena); - // Set type_url. - envoy_api_v2_DiscoveryRequest_set_type_url(request, - upb_strview_makez(type_url)); - // Set version_info. - if (!version.empty()) { - envoy_api_v2_DiscoveryRequest_set_version_info( - request, upb_strview_makez(version.c_str())); - } - // Set nonce. - if (!nonce.empty()) { - envoy_api_v2_DiscoveryRequest_set_response_nonce( - request, upb_strview_makez(nonce.c_str())); - } - // Set error_detail if it's a NACK. - if (error != GRPC_ERROR_NONE) { - grpc_slice error_description_slice; - GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, - &error_description_slice)); - upb_strview error_description_strview = - upb_strview_make(reinterpret_cast( - GPR_SLICE_START_PTR(error_description_slice)), - GPR_SLICE_LENGTH(error_description_slice)); - google_rpc_Status* error_detail = - envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena); - google_rpc_Status_set_message(error_detail, error_description_strview); - GRPC_ERROR_UNREF(error); - } - return request; -} - inline absl::string_view UpbStringToAbsl(const upb_strview& str) { return absl::string_view(str.data, str.size); } @@ -479,92 +443,43 @@ grpc_slice SerializeDiscoveryRequest(upb_arena* arena, } // namespace -grpc_slice XdsApi::CreateUnsupportedTypeNackRequest(const std::string& type_url, - const std::string& nonce, - grpc_error* error) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = CreateDiscoveryRequest( - arena.ptr(), type_url.c_str(), /*version=*/"", nonce, error); - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateLdsRequest(const std::string& server_name, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kLdsTypeUrl, version, nonce, error); - // Populate node. - if (populate_node) { - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "", - node_msg); - } - // Add resource_name. - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, upb_strview_make(server_name.data(), server_name.size()), - arena.ptr()); - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateRdsRequest(const std::string& route_config_name, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kRdsTypeUrl, version, nonce, error); - // Populate node. - if (populate_node) { - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "", - node_msg); - } - // Add resource_name. - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, - upb_strview_make(route_config_name.data(), route_config_name.size()), - arena.ptr()); - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateCdsRequest( - const std::set& cluster_names, +grpc_slice XdsApi::CreateAdsRequest( + const std::string& type_url, + const std::set& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, bool populate_node) { upb::Arena arena; + // Create a request. envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kCdsTypeUrl, version, nonce, error); - // Populate node. - if (populate_node) { - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_, "", - node_msg); + envoy_api_v2_DiscoveryRequest_new(arena.ptr()); + // Set type_url. + envoy_api_v2_DiscoveryRequest_set_type_url( + request, upb_strview_make(type_url.data(), type_url.size())); + // Set version_info. + if (!version.empty()) { + envoy_api_v2_DiscoveryRequest_set_version_info( + request, upb_strview_make(version.data(), version.size())); } - // Add resource_names. - for (const auto& cluster_name : cluster_names) { - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, upb_strview_make(cluster_name.data(), cluster_name.size()), - arena.ptr()); + // Set nonce. + if (!nonce.empty()) { + envoy_api_v2_DiscoveryRequest_set_response_nonce( + request, upb_strview_make(nonce.data(), nonce.size())); + } + // Set error_detail if it's a NACK. + if (error != GRPC_ERROR_NONE) { + grpc_slice error_description_slice; + GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, + &error_description_slice)); + upb_strview error_description_strview = + upb_strview_make(reinterpret_cast( + GPR_SLICE_START_PTR(error_description_slice)), + GPR_SLICE_LENGTH(error_description_slice)); + google_rpc_Status* error_detail = + envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, + arena.ptr()); + google_rpc_Status_set_message(error_detail, error_description_strview); + GRPC_ERROR_UNREF(error); } - MaybeLogDiscoveryRequest(client_, tracer_, request); - return SerializeDiscoveryRequest(arena.ptr(), request); -} - -grpc_slice XdsApi::CreateEdsRequest( - const std::set& eds_service_names, - const std::string& version, const std::string& nonce, grpc_error* error, - bool populate_node) { - upb::Arena arena; - envoy_api_v2_DiscoveryRequest* request = - CreateDiscoveryRequest(arena.ptr(), kEdsTypeUrl, version, nonce, error); // Populate node. if (populate_node) { envoy_api_v2_core_Node* node_msg = @@ -573,10 +488,9 @@ grpc_slice XdsApi::CreateEdsRequest( node_msg); } // Add resource_names. - for (const auto& eds_service_name : eds_service_names) { + for (const auto& resource_name : resource_names) { envoy_api_v2_DiscoveryRequest_add_resource_names( - request, - upb_strview_make(eds_service_name.data(), eds_service_name.size()), + request, upb_strview_make(resource_name.data(), resource_name.size()), arena.ptr()); } MaybeLogDiscoveryRequest(client_, tracer_, request); @@ -1288,13 +1202,13 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer, return GRPC_ERROR_NONE; } -grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer, - const envoy_api_v2_DiscoveryResponse* response, - const std::string& expected_server_name, - const std::string& expected_route_config_name, - const bool xds_routing_enabled, - absl::optional* rds_update, - upb_arena* arena) { +grpc_error* RdsResponseParse( + XdsClient* client, TraceFlag* tracer, + const envoy_api_v2_DiscoveryResponse* response, + const std::string& expected_server_name, + const std::set& expected_route_configuration_names, + const bool xds_routing_enabled, + absl::optional* rds_update, upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = @@ -1315,10 +1229,14 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer, return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode route_config."); } // Check route_config_name. Ignore unexpected route_config. - const upb_strview name = envoy_api_v2_RouteConfiguration_name(route_config); - const upb_strview expected_name = - upb_strview_makez(expected_route_config_name.c_str()); - if (!upb_strview_eql(name, expected_name)) continue; + const upb_strview route_config_name = + envoy_api_v2_RouteConfiguration_name(route_config); + absl::string_view route_config_name_strview(route_config_name.data, + route_config_name.size); + if (expected_route_configuration_names.find(route_config_name_strview) == + expected_route_configuration_names.end()) { + continue; + } // Parse the route_config. XdsApi::RdsUpdate local_rds_update; grpc_error* error = @@ -1603,7 +1521,7 @@ grpc_error* EdsResponseParse( grpc_error* XdsApi::ParseAdsResponse( const grpc_slice& encoded_response, const std::string& expected_server_name, - const std::string& expected_route_config_name, + const std::set& expected_route_configuration_names, const std::set& expected_cluster_names, const std::set& expected_eds_service_names, absl::optional* lds_update, @@ -1638,8 +1556,8 @@ grpc_error* XdsApi::ParseAdsResponse( xds_routing_enabled_, lds_update, arena.ptr()); } else if (*type_url == kRdsTypeUrl) { return RdsResponseParse(client_, tracer_, response, expected_server_name, - expected_route_config_name, xds_routing_enabled_, - rds_update, arena.ptr()); + expected_route_configuration_names, + xds_routing_enabled_, rds_update, arena.ptr()); } else if (*type_url == kCdsTypeUrl) { return CdsResponseParse(client_, tracer_, response, expected_cluster_names, cds_update_map, arena.ptr()); diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 4fa7b2bed31..8971faf79ab 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -230,47 +230,21 @@ class XdsApi { XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node); - // Creates a request to nack an unsupported resource type. + // Creates an ADS request. // Takes ownership of \a error. - grpc_slice CreateUnsupportedTypeNackRequest(const std::string& type_url, - const std::string& nonce, - grpc_error* error); - - // Creates an LDS request querying \a server_name. - // Takes ownership of \a error. - grpc_slice CreateLdsRequest(const std::string& server_name, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node); - - // Creates an RDS request querying \a route_config_name. - // Takes ownership of \a error. - grpc_slice CreateRdsRequest(const std::string& route_config_name, + grpc_slice CreateAdsRequest(const std::string& type_url, + const std::set& resource_names, const std::string& version, const std::string& nonce, grpc_error* error, bool populate_node); - // Creates a CDS request querying \a cluster_names. - // Takes ownership of \a error. - grpc_slice CreateCdsRequest(const std::set& cluster_names, - const std::string& version, - const std::string& nonce, grpc_error* error, - bool populate_node); - - // Creates an EDS request querying \a eds_service_names. - // Takes ownership of \a error. - grpc_slice CreateEdsRequest( - const std::set& eds_service_names, - const std::string& version, const std::string& nonce, grpc_error* error, - bool populate_node); - // Parses the ADS response and outputs the validated update for either CDS or // EDS. If the response can't be parsed at the top level, \a type_url will // point to an empty string; otherwise, it will point to the received data. grpc_error* ParseAdsResponse( const grpc_slice& encoded_response, const std::string& expected_server_name, - const std::string& expected_route_config_name, + const std::set& expected_route_configuration_names, const std::set& expected_cluster_names, const std::set& expected_eds_service_names, absl::optional* lds_update, diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 7a407da627f..db48084297f 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -255,8 +255,8 @@ class XdsClient::ChannelState::AdsCallState bool IsCurrentCallOnChannel() const; - std::set ClusterNamesForRequest(); - std::set EdsServiceNamesForRequest(); + std::set ResourceNamesForRequest( + const std::string& type_url); // The owning RetryableCall<>. RefCountedPtr> parent_; @@ -804,33 +804,13 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } auto& state = state_map_[type_url]; grpc_slice request_payload_slice; - std::set resource_names; - if (type_url == XdsApi::kLdsTypeUrl) { - resource_names.insert(xds_client()->server_name_); - request_payload_slice = xds_client()->api_.CreateLdsRequest( - xds_client()->server_name_, state.version, state.nonce, - GRPC_ERROR_REF(state.error), !sent_initial_message_); - state.subscribed_resources[xds_client()->server_name_]->Start(Ref()); - } else if (type_url == XdsApi::kRdsTypeUrl) { - resource_names.insert(xds_client()->lds_result_->route_config_name); - request_payload_slice = xds_client()->api_.CreateRdsRequest( - xds_client()->lds_result_->route_config_name, state.version, - state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); - state.subscribed_resources[xds_client()->lds_result_->route_config_name] - ->Start(Ref()); - } else if (type_url == XdsApi::kCdsTypeUrl) { - resource_names = ClusterNamesForRequest(); - request_payload_slice = xds_client()->api_.CreateCdsRequest( - resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error), - !sent_initial_message_); - } else if (type_url == XdsApi::kEdsTypeUrl) { - resource_names = EdsServiceNamesForRequest(); - request_payload_slice = xds_client()->api_.CreateEdsRequest( - resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error), - !sent_initial_message_); - } else { - request_payload_slice = xds_client()->api_.CreateUnsupportedTypeNackRequest( - type_url, state.nonce, GRPC_ERROR_REF(state.error)); + std::set resource_names = + ResourceNamesForRequest(type_url); + request_payload_slice = xds_client()->api_.CreateAdsRequest( + type_url, resource_names, state.version, state.nonce, + GRPC_ERROR_REF(state.error), !sent_initial_message_); + if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && + type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { state_map_.erase(type_url); } sent_initial_message_ = true; @@ -1242,12 +1222,10 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Note that ParseAdsResponse() also validates the response. grpc_error* parse_error = xds_client()->api_.ParseAdsResponse( response_slice, xds_client()->server_name_, - (xds_client()->lds_result_.has_value() - ? xds_client()->lds_result_->route_config_name - : ""), - ClusterNamesForRequest(), EdsServiceNamesForRequest(), &lds_update, - &rds_update, &cds_update_map, &eds_update_map, &version, &nonce, - &type_url); + ResourceNamesForRequest(XdsApi::kRdsTypeUrl), + ResourceNamesForRequest(XdsApi::kCdsTypeUrl), + ResourceNamesForRequest(XdsApi::kEdsTypeUrl), &lds_update, &rds_update, + &cds_update_map, &eds_update_map, &version, &nonce, &type_url); grpc_slice_unref_internal(response_slice); if (type_url.empty()) { // Ignore unparsable response. @@ -1351,25 +1329,18 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { } std::set -XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() { - std::set cluster_names; - for (auto& p : state_map_[XdsApi::kCdsTypeUrl].subscribed_resources) { - cluster_names.insert(p.first); - OrphanablePtr& state = p.second; - state->Start(Ref()); +XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( + const std::string& type_url) { + std::set resource_names; + auto it = state_map_.find(type_url); + if (it != state_map_.end()) { + for (auto& p : it->second.subscribed_resources) { + resource_names.insert(p.first); + OrphanablePtr& state = p.second; + state->Start(Ref()); + } } - return cluster_names; -} - -std::set -XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() { - std::set eds_names; - for (auto& p : state_map_[XdsApi::kEdsTypeUrl].subscribed_resources) { - eds_names.insert(p.first); - OrphanablePtr& state = p.second; - state->Start(Ref()); - } - return eds_names; + return resource_names; } // diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 271ddae52f3..a163c49c24a 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1916,30 +1916,6 @@ TEST_P(XdsResolverOnlyTest, ChangeClusters) { EXPECT_EQ(0, std::get<1>(counts)); } -// Tests that we go into TRANSIENT_FAILURE if the Listener is removed. -TEST_P(XdsResolverOnlyTest, ListenerRemoved) { - SetNextResolution({}); - SetNextResolutionForLbChannelAllBalancers(); - AdsServiceImpl::EdsResourceArgs args({ - {"locality0", GetBackendPorts()}, - }); - balancers_[0]->ads_service()->SetEdsResource( - AdsServiceImpl::BuildEdsResource(args)); - // We need to wait for all backends to come online. - WaitForAllBackends(); - // Unset LDS resource. - balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, - kDefaultResourceName); - // Wait for RPCs to start failing. - do { - } while (SendRpc(RpcOptions(), nullptr).ok()); - // Make sure RPCs are still failing. - CheckRpcSendFailure(1000); - // Make sure we ACK'ed the update. - EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state, - AdsServiceImpl::ResponseState::ACKED); -} - // Tests that we go into TRANSIENT_FAILURE if the Cluster disappears. TEST_P(XdsResolverOnlyTest, ClusterRemoved) { SetNextResolution({}); @@ -2294,6 +2270,30 @@ TEST_P(LdsRdsTest, Vanilla) { AdsServiceImpl::ResponseState::ACKED); } +// Tests that we go into TRANSIENT_FAILURE if the Listener is removed. +TEST_P(LdsRdsTest, ListenerRemoved) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Unset LDS resource. + balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, + kDefaultResourceName); + // Wait for RPCs to start failing. + do { + } while (SendRpc(RpcOptions(), nullptr).ok()); + // Make sure RPCs are still failing. + CheckRpcSendFailure(1000); + // Make sure we ACK'ed the update. + EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state, + AdsServiceImpl::ResponseState::ACKED); +} + // Tests that LDS client should send a NACK if matching domain can't be found in // the LDS response. TEST_P(LdsRdsTest, NoMatchedDomain) { From 4cfeb71b635aadaa97b7fa51b2ab0706eb44eb52 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 2 Jun 2020 14:25:48 -0700 Subject: [PATCH 14/14] Enable xds_routing_lb tracer in xds interop tests. --- tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh | 2 +- tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh | 2 +- tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh | 2 +- tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh | 2 +- tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh index 12729543bdb..0fe64ebac52 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh @@ -48,7 +48,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh index 667dc7c7e36..eee8876954d 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh @@ -48,7 +48,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py bazel build test/cpp/interop:xds_interop_client -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh index 03ecbc4b32c..85ddb2c8fe9 100755 --- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh @@ -48,7 +48,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py python tools/run_tests/run_tests.py -l csharp -c opt --build_only -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh index c0bbcea535b..fb90fa13c5d 100755 --- a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh @@ -61,7 +61,7 @@ export CC=/usr/bin/gcc composer install && \ ./bin/generate_proto_php.sh) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \ diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh index 8d643e3ad25..6f381729fa9 100644 --- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh @@ -48,7 +48,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py (cd src/ruby && bundle && rake compile) -GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ +GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ tools/run_tests/run_xds_tests.py \ --test_case=all \ --project_id=grpc-testing \