From 70af2c2fdaf2c3f102827fae869f14334b847ed6 Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Mon, 26 Feb 2024 13:35:25 -0800 Subject: [PATCH] [xds] xDS client per data plane target (#35730) Closes #35730 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35730 from eugeneo:tasks/multiple-xds-clients 30b459658ec1ddf62ba416741769fe49b585c333 PiperOrigin-RevId: 610511488 --- src/core/ext/xds/xds_client_grpc.cc | 43 ++++-- src/core/ext/xds/xds_client_grpc.h | 13 +- src/core/ext/xds/xds_server_config_fetcher.cc | 4 +- src/core/resolver/xds/xds_resolver.cc | 3 +- test/cpp/end2end/xds/xds_core_end2end_test.cc | 132 ++++++++++-------- test/cpp/end2end/xds/xds_csds_end2end_test.cc | 79 ++++++++++- test/cpp/end2end/xds/xds_end2end_test_lib.cc | 11 +- 7 files changed, 204 insertions(+), 81 deletions(-) diff --git a/src/core/ext/xds/xds_client_grpc.cc b/src/core/ext/xds/xds_client_grpc.cc index 720ea4208fb..d5e4447cc02 100644 --- a/src/core/ext/xds/xds_client_grpc.cc +++ b/src/core/ext/xds/xds_client_grpc.cc @@ -92,7 +92,9 @@ namespace { Mutex* g_mu = new Mutex; const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; -GrpcXdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; +// Key bytes live in clients so they outlive the entries in this map +NoDestruct> g_xds_client_map + ABSL_GUARDED_BY(*g_mu); char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; } // namespace @@ -139,9 +141,9 @@ absl::StatusOr GetBootstrapContents(const char* fallback_config) { std::vector> GetAllXdsClients() { MutexLock lock(g_mu); std::vector> xds_clients; - if (g_xds_client != nullptr) { + for (const auto& key_client : *g_xds_client_map) { auto xds_client = - g_xds_client->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs"); + key_client.second->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs"); if (xds_client != nullptr) { xds_clients.emplace_back(xds_client.TakeAsSubclass()); } @@ -152,7 +154,7 @@ std::vector> GetAllXdsClients() { } // namespace absl::StatusOr> GrpcXdsClient::GetOrCreate( - const ChannelArgs& args, const char* reason) { + absl::string_view key, const ChannelArgs& args, const char* reason) { // If getting bootstrap from channel args, create a local XdsClient // instance for the channel or server instead of using the global instance. absl::optional bootstrap_config = args.GetString( @@ -164,13 +166,14 @@ absl::StatusOr> GrpcXdsClient::GetOrCreate( GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS); auto channel_args = ChannelArgs::FromC(xds_channel_args); return MakeRefCounted( - std::move(*bootstrap), channel_args, + key, std::move(*bootstrap), channel_args, MakeOrphanable(channel_args)); } // Otherwise, use the global instance. MutexLock lock(g_mu); - if (g_xds_client != nullptr) { - auto xds_client = g_xds_client->RefIfNonZero(DEBUG_LOCATION, reason); + auto it = g_xds_client_map->find(key); + if (it != g_xds_client_map->end()) { + auto xds_client = it->second->RefIfNonZero(DEBUG_LOCATION, reason); if (xds_client != nullptr) { return xds_client.TakeAsSubclass(); } @@ -188,9 +191,13 @@ absl::StatusOr> GrpcXdsClient::GetOrCreate( // Instantiate XdsClient. auto channel_args = ChannelArgs::FromC(g_channel_args); auto xds_client = MakeRefCounted( - std::move(*bootstrap), channel_args, + key, std::move(*bootstrap), channel_args, MakeOrphanable(channel_args)); - g_xds_client = xds_client.get(); + g_xds_client_map->emplace(xds_client->key(), xds_client.get()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "xDS client for key: %s was created", + std::string(key).c_str()); + } return xds_client; } @@ -211,6 +218,8 @@ grpc_slice GrpcXdsClient::DumpAllClientConfigs() arena.ptr()); xds_client->mu()->Lock(); xds_client->DumpClientConfig(&string_pool, arena.ptr(), client_config); + envoy_service_status_v3_ClientConfig_set_client_scope( + client_config, StdStringToUpbString(xds_client->key())); } // Serialize the upb message to bytes size_t output_length; @@ -223,7 +232,8 @@ grpc_slice GrpcXdsClient::DumpAllClientConfigs() } GrpcXdsClient::GrpcXdsClient( - std::unique_ptr bootstrap, const ChannelArgs& args, + absl::string_view key, std::unique_ptr bootstrap, + const ChannelArgs& args, OrphanablePtr transport_factory) : XdsClient( std::move(bootstrap), std::move(transport_factory), @@ -237,13 +247,18 @@ GrpcXdsClient::GrpcXdsClient( args.GetDurationFromIntMillis( GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS) .value_or(Duration::Seconds(15)))), + key_(key), certificate_provider_store_(MakeOrphanable( static_cast(this->bootstrap()) .certificate_providers())) {} -GrpcXdsClient::~GrpcXdsClient() { +void GrpcXdsClient::Orphan() { MutexLock lock(g_mu); - if (g_xds_client == this) g_xds_client = nullptr; + auto it = g_xds_client_map->find(key_); + if (it != g_xds_client_map->end() && it->second == this) { + g_xds_client_map->erase(it); + } + XdsClient::Orphan(); } grpc_pollset_set* GrpcXdsClient::interested_parties() const { @@ -258,9 +273,9 @@ void SetXdsChannelArgsForTest(grpc_channel_args* args) { g_channel_args = args; } -void UnsetGlobalXdsClientForTest() { +void UnsetGlobalXdsClientsForTest() { MutexLock lock(g_mu); - g_xds_client = nullptr; + g_xds_client_map->clear(); } void SetXdsFallbackBootstrapConfig(const char* config) { diff --git a/src/core/ext/xds/xds_client_grpc.h b/src/core/ext/xds/xds_client_grpc.h index 7093ec4017b..72df525ab6e 100644 --- a/src/core/ext/xds/xds_client_grpc.h +++ b/src/core/ext/xds/xds_client_grpc.h @@ -43,7 +43,7 @@ class GrpcXdsClient : public XdsClient { public: // Factory function to get or create the global XdsClient instance. static absl::StatusOr> GetOrCreate( - const ChannelArgs& args, const char* reason); + absl::string_view key, const ChannelArgs& args, const char* reason); // Builds ClientStatusResponse containing all resources from all XdsClients static grpc_slice DumpAllClientConfigs(); @@ -60,10 +60,12 @@ class GrpcXdsClient : public XdsClient { // work for callers that use interested_parties() but not for callers // that also use certificate_provider_store(), but we should consider // alternatives for that case as well. - GrpcXdsClient(std::unique_ptr bootstrap, + GrpcXdsClient(absl::string_view key, + std::unique_ptr bootstrap, const ChannelArgs& args, OrphanablePtr transport_factory); - ~GrpcXdsClient() override; + + void Orphan() override; // Helpers for encoding the XdsClient object in channel args. static absl::string_view ChannelArgName() { @@ -79,13 +81,16 @@ class GrpcXdsClient : public XdsClient { return *certificate_provider_store_; } + absl::string_view key() const { return key_; } + private: + std::string key_; OrphanablePtr certificate_provider_store_; }; namespace internal { void SetXdsChannelArgsForTest(grpc_channel_args* args); -void UnsetGlobalXdsClientForTest(); +void UnsetGlobalXdsClientsForTest(); // Sets bootstrap config to be used when no env var is set. // Does not take ownership of config. void SetXdsFallbackBootstrapConfig(const char* config); diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index b996a7b56d8..6e406843a64 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -94,6 +94,8 @@ namespace { using ReadDelayHandle = XdsClient::ReadDelayHandle; +constexpr absl::string_view kServerXdsClientKey = "#server"; + TraceFlag grpc_xds_server_config_fetcher_trace(false, "xds_server_config_fetcher"); @@ -1372,7 +1374,7 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create( "update=%p, user_data=%p}, args=%p)", 3, (notifier.on_serving_status_update, notifier.user_data, args)); auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate( - channel_args, "XdsServerConfigFetcher"); + grpc_core::kServerXdsClientKey, channel_args, "XdsServerConfigFetcher"); if (!xds_client.ok()) { gpr_log(GPR_ERROR, "Failed to create xds client: %s", xds_client.status().ToString().c_str()); diff --git a/src/core/resolver/xds/xds_resolver.cc b/src/core/resolver/xds/xds_resolver.cc index ce2f33482dd..39f7688a219 100644 --- a/src/core/resolver/xds/xds_resolver.cc +++ b/src/core/resolver/xds/xds_resolver.cc @@ -880,7 +880,8 @@ void XdsResolver::ClusterSelectionFilter::Call::OnClientInitialMetadata( // void XdsResolver::StartLocked() { - auto xds_client = GrpcXdsClient::GetOrCreate(args_, "xds resolver"); + auto xds_client = + GrpcXdsClient::GetOrCreate(uri_.ToString(), args_, "xds resolver"); if (!xds_client.ok()) { gpr_log(GPR_ERROR, "Failed to create xds client -- channel will remain in " diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc index 4f4f792affd..a0afbaaec82 100644 --- a/test/cpp/end2end/xds/xds_core_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc @@ -13,8 +13,10 @@ // limitations under the License. // +#include #include #include +#include #include #include @@ -24,9 +26,11 @@ #include "src/core/client_channel/backup_poller.h" #include "src/core/lib/config/config_vars.h" +#include "src/proto/grpc/testing/xds/v3/listener.pb.h" #include "test/core/util/resolve_localhost_ip46.h" #include "test/core/util/scoped_env_var.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" +#include "test/cpp/end2end/xds/xds_server.h" namespace grpc { namespace testing { @@ -167,7 +171,21 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, GlobalXdsClientTest, XdsTestType::kBootstrapFromEnvVar)), &XdsTestType::Name); -TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) { +TEST_P(GlobalXdsClientTest, MultipleChannelsSameTargetShareXdsClient) { + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION); + // Create second channel and tell it to connect to the same server. + auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kServerName); + channel2->GetState(/*try_to_connect=*/true); + ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1))); + // Make sure there's only one client connected. + EXPECT_EQ(1UL, balancer_->ads_service()->clients().size()); +} + +TEST_P(GlobalXdsClientTest, + MultipleChannelsDifferentTargetDoNotShareXdsClient) { CreateAndStartBackends(1); const char* kNewServerName = "new-server.example.com"; Listener listener = default_listener_; @@ -181,8 +199,8 @@ TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) { auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kNewServerName); channel2->GetState(/*try_to_connect=*/true); ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1))); - // Make sure there's only one client connected. - EXPECT_EQ(1UL, balancer_->ads_service()->clients().size()); + // Make sure there are two clients connected. + EXPECT_EQ(2UL, balancer_->ads_service()->clients().size()); } TEST_P( @@ -219,64 +237,62 @@ TEST_P( } // Tests that the NACK for multiple bad LDS resources includes both errors. -// This needs to be in GlobalXdsClientTest because the only way to request -// two LDS resources in the same XdsClient is for two channels to share -// the same XdsClient. +// This needs to use xDS server as this is the only scenario when XdsClient +// is shared. TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) { - CreateAndStartBackends(1); - constexpr char kServerName2[] = "server.other.com"; - constexpr char kServerName3[] = "server.another.com"; - auto listener = default_listener_; - listener.clear_api_listener(); - balancer_->ads_service()->SetLdsResource(listener); - listener.set_name(kServerName2); - balancer_->ads_service()->SetLdsResource(listener); - listener = default_listener_; - listener.set_name(kServerName3); - SetListenerAndRouteConfiguration(balancer_.get(), listener, - default_route_config_); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + CreateBackends(2, true); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}}); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - const auto response_state = WaitForLdsNack(DEBUG_LOCATION); + auto listener = default_server_listener_; + listener.clear_address(); + listener.set_name(GetServerListenerName(backends_[0]->port())); + balancer_->ads_service()->SetLdsResource(listener); + backends_[0]->Start(); + auto response_state = WaitForLdsNack(DEBUG_LOCATION); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_EQ(response_state->error_message, - "xDS response validation errors: [" - "resource index 0: server.example.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); - // Need to create a second channel to subscribe to a second LDS resource. - auto channel2 = CreateChannel(0, kServerName2); - auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); - { - ClientContext context; - EchoRequest request; - request.set_message(kRequestMessage); - EchoResponse response; - grpc::Status status = stub2->Echo(&context, request, &response); - EXPECT_FALSE(status.ok()); - // Wait for second NACK to be reported to xDS server. - const auto response_state = WaitForLdsNack(DEBUG_LOCATION); - ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_EQ( - response_state->error_message, - "xDS response validation errors: [" - "resource index 0: server.other.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener; " - "resource index 1: server.example.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); - } - // Now start a new channel with a third server name, this one with a - // valid resource. - auto channel3 = CreateChannel(0, kServerName3); - auto stub3 = grpc::testing::EchoTestService::NewStub(channel3); - { - ClientContext context; - EchoRequest request; - request.set_message(kRequestMessage); - EchoResponse response; - grpc::Status status = stub3->Echo(&context, request, &response); - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - } + EXPECT_EQ( + response_state->error_message, + absl::StrFormat( + "xDS response validation errors: [" + "resource index 0: " + "grpc/server?xds.resource.listening_address=127.0.0.1:%lu: " + "INVALID_ARGUMENT: Listener has neither address nor ApiListener]", + backends_[0]->port())); + listener = default_server_listener_; + listener.clear_address(); + listener.set_name(GetServerListenerName(backends_[1]->port())); + balancer_->ads_service()->SetLdsResource(listener); + backends_[1]->Start(); + constexpr absl::string_view kMessageFormat = + "xDS response validation errors: [" + "resource index 0: " + "grpc/server?xds.resource.listening_address=127.0.0.1:%d: " + "INVALID_ARGUMENT: Listener has neither address nor " + "ApiListener; " + "resource index 1: " + "grpc/server?xds.resource.listening_address=127.0.0.1:%d: " + "INVALID_ARGUMENT: Listener has neither address nor " + "ApiListener" + "]"; + const std::string expected_message1 = absl::StrFormat( + kMessageFormat, backends_[0]->port(), backends_[1]->port()); + const std::string expected_message2 = absl::StrFormat( + kMessageFormat, backends_[1]->port(), backends_[0]->port()); + response_state = WaitForNack( + DEBUG_LOCATION, [&]() -> absl::optional { + auto response = balancer_->ads_service()->lds_response_state(); + if (response.has_value() && + response->state == AdsServiceImpl::ResponseState::NACKED) { + if (response->error_message == expected_message1 || + response->error_message == expected_message2) { + return response; + } + gpr_log(GPR_INFO, "non-matching NACK message: %s", + response->error_message.c_str()); + } + return absl::nullopt; + }); + EXPECT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; } // Tests that we don't trigger does-not-exist callbacks for a resource diff --git a/test/cpp/end2end/xds/xds_csds_end2end_test.cc b/test/cpp/end2end/xds/xds_csds_end2end_test.cc index 117e5555285..cde8148b9c5 100644 --- a/test/cpp/end2end/xds/xds_csds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_csds_end2end_test.cc @@ -22,6 +22,7 @@ #include "absl/memory/memory.h" #include "absl/strings/str_cat.h" +#include "absl/strings/strip.h" #include #include @@ -344,7 +345,7 @@ TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpVanilla) { // Fetches the client config auto csds_response = FetchCsdsResponse(); gpr_log(GPR_INFO, "xDS config dump: %s", csds_response.DebugString().c_str()); - EXPECT_EQ(1, csds_response.config_size()); + ASSERT_EQ(1, csds_response.config_size()); const auto& client_config = csds_response.config(0); // Validate the Node information EXPECT_THAT(client_config.node(), @@ -615,6 +616,82 @@ TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterRequested) { ClientResourceStatus::REQUESTED, ::testing::_)))); } +TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpMultiClient) { + Listener listener = default_listener_; + const char* kServer2Name = "server2.example.com"; + listener.set_name(kServer2Name); + balancer_->ads_service()->SetLdsResource(listener); + SetListenerAndRouteConfiguration(balancer_.get(), listener, + default_route_config_); + CreateAndStartBackends(1); + const size_t kNumRpcs = 5; + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Send several RPCs to ensure the xDS setup works + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs); + // Connect to a second server + auto channel2 = CreateChannel(0, kServer2Name); + channel2->GetState(/*try_to_connect=*/true); + ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1))); + // Fetches the client config + auto csds_response = FetchCsdsResponse(); + ASSERT_EQ(2, csds_response.config_size()); + std::vector scopes; + for (const auto& client_config : csds_response.config()) { + // Validate the Node information + EXPECT_THAT(client_config.node(), + EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"), + ::testing::HasSubstr(grpc_version_string()), + ::testing::ElementsAre( + "envoy.lb.does_not_support_overprovisioning"))); + scopes.emplace_back(client_config.client_scope()); + absl::string_view server = client_config.client_scope(); + // Listener matcher depends on whether RDS is enabled. + ::testing::Matcher api_listener_matcher; + if (GetParam().enable_rds_testing()) { + api_listener_matcher = IsRdsEnabledHCM(); + } else { + api_listener_matcher = + EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName); + } + // Construct list of all matchers. + std::vector<::testing::Matcher< + envoy::service::status::v3::ClientConfig_GenericXdsConfig>> + matchers = { + // Listener + EqGenericXdsConfig( + kLdsTypeUrl, absl::StripPrefix(server, "xds:"), "3", + UnpackListener(EqListener(absl::StripPrefix(server, "xds:"), + api_listener_matcher)), + ClientResourceStatus::ACKED, ::testing::_), + // Cluster + EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1", + UnpackCluster(EqCluster(kDefaultClusterName)), + ClientResourceStatus::ACKED, ::testing::_), + // ClusterLoadAssignment + EqGenericXdsConfig( + kEdsTypeUrl, kDefaultEdsServiceName, "1", + UnpackClusterLoadAssignment(EqClusterLoadAssignment( + kDefaultEdsServiceName, backends_[0]->port(), + kDefaultLocalityWeight)), + ClientResourceStatus::ACKED, ::testing::_), + }; + // If RDS is enabled, add matcher for RDS resource. + if (GetParam().enable_rds_testing()) { + matchers.push_back(EqGenericXdsConfig( + kRdsTypeUrl, kDefaultRouteConfigurationName, "2", + UnpackRouteConfiguration(EqRouteConfiguration( + kDefaultRouteConfigurationName, kDefaultClusterName)), + ClientResourceStatus::ACKED, ::testing::_)); + } + // Validate the dumped xDS configs + EXPECT_THAT(client_config.generic_xds_configs(), + ::testing::UnorderedElementsAreArray(matchers)); + } + EXPECT_THAT(scopes, ::testing::UnorderedElementsAre( + "xds:server.example.com", "xds:server2.example.com")); +} + class CsdsShortAdsTimeoutTest : public ClientStatusDiscoveryServiceTest { protected: void SetUp() override { diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index b95d91a1dd9..c398694266f 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -523,7 +523,7 @@ void XdsEnd2endTest::InitClient(XdsBootstrapBuilder builder, // because it's not expecting the client to connect. It also // ensures that each test can independently set the global channel // args for the xDS channel. - grpc_core::internal::UnsetGlobalXdsClientForTest(); + grpc_core::internal::UnsetGlobalXdsClientsForTest(); } // Create channel and stub. ResetStub(); @@ -558,7 +558,14 @@ std::shared_ptr XdsEnd2endTest::CreateChannel( GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS, &xds_channel_args_, &kChannelArgsArgVtable); } - std::string uri = absl::StrCat("xds://", xds_authority, "/", server_name); + std::vector parts = {"xds:"}; + if (xds_authority != nullptr && xds_authority[0] != '\0') { + parts.emplace_back("//"); + parts.emplace_back(xds_authority); + parts.emplace_back("/"); + } + parts.emplace_back(server_name); + std::string uri = absl::StrJoin(parts, ""); std::shared_ptr channel_creds = GetParam().use_xds_credentials() ? XdsCredentials(CreateTlsFallbackCredentials())