[xds] xDS client per data plane target (#35730)

Closes #35730

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35730 from eugeneo:tasks/multiple-xds-clients 30b459658e
PiperOrigin-RevId: 610511488
pull/35931/head
Eugene Ostroukhov 9 months ago committed by Copybara-Service
parent a1b7e1e523
commit 70af2c2fda
  1. 43
      src/core/ext/xds/xds_client_grpc.cc
  2. 13
      src/core/ext/xds/xds_client_grpc.h
  3. 4
      src/core/ext/xds/xds_server_config_fetcher.cc
  4. 3
      src/core/resolver/xds/xds_resolver.cc
  5. 122
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  6. 79
      test/cpp/end2end/xds/xds_csds_end2end_test.cc
  7. 11
      test/cpp/end2end/xds/xds_end2end_test_lib.cc

@ -92,7 +92,9 @@ namespace {
Mutex* g_mu = new Mutex;
const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
GrpcXdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
// Key bytes live in clients so they outlive the entries in this map
NoDestruct<std::map<absl::string_view, GrpcXdsClient*>> g_xds_client_map
ABSL_GUARDED_BY(*g_mu);
char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
} // namespace
@ -139,9 +141,9 @@ absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
std::vector<RefCountedPtr<GrpcXdsClient>> GetAllXdsClients() {
MutexLock lock(g_mu);
std::vector<RefCountedPtr<GrpcXdsClient>> xds_clients;
if (g_xds_client != nullptr) {
for (const auto& key_client : *g_xds_client_map) {
auto xds_client =
g_xds_client->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs");
key_client.second->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs");
if (xds_client != nullptr) {
xds_clients.emplace_back(xds_client.TakeAsSubclass<GrpcXdsClient>());
}
@ -152,7 +154,7 @@ std::vector<RefCountedPtr<GrpcXdsClient>> GetAllXdsClients() {
} // namespace
absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
const ChannelArgs& args, const char* reason) {
absl::string_view key, const ChannelArgs& args, const char* reason) {
// If getting bootstrap from channel args, create a local XdsClient
// instance for the channel or server instead of using the global instance.
absl::optional<absl::string_view> bootstrap_config = args.GetString(
@ -164,13 +166,14 @@ absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
auto channel_args = ChannelArgs::FromC(xds_channel_args);
return MakeRefCounted<GrpcXdsClient>(
std::move(*bootstrap), channel_args,
key, std::move(*bootstrap), channel_args,
MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
}
// Otherwise, use the global instance.
MutexLock lock(g_mu);
if (g_xds_client != nullptr) {
auto xds_client = g_xds_client->RefIfNonZero(DEBUG_LOCATION, reason);
auto it = g_xds_client_map->find(key);
if (it != g_xds_client_map->end()) {
auto xds_client = it->second->RefIfNonZero(DEBUG_LOCATION, reason);
if (xds_client != nullptr) {
return xds_client.TakeAsSubclass<GrpcXdsClient>();
}
@ -188,9 +191,13 @@ absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
// Instantiate XdsClient.
auto channel_args = ChannelArgs::FromC(g_channel_args);
auto xds_client = MakeRefCounted<GrpcXdsClient>(
std::move(*bootstrap), channel_args,
key, std::move(*bootstrap), channel_args,
MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
g_xds_client = xds_client.get();
g_xds_client_map->emplace(xds_client->key(), xds_client.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "xDS client for key: %s was created",
std::string(key).c_str());
}
return xds_client;
}
@ -211,6 +218,8 @@ grpc_slice GrpcXdsClient::DumpAllClientConfigs()
arena.ptr());
xds_client->mu()->Lock();
xds_client->DumpClientConfig(&string_pool, arena.ptr(), client_config);
envoy_service_status_v3_ClientConfig_set_client_scope(
client_config, StdStringToUpbString(xds_client->key()));
}
// Serialize the upb message to bytes
size_t output_length;
@ -223,7 +232,8 @@ grpc_slice GrpcXdsClient::DumpAllClientConfigs()
}
GrpcXdsClient::GrpcXdsClient(
std::unique_ptr<GrpcXdsBootstrap> bootstrap, const ChannelArgs& args,
absl::string_view key, std::unique_ptr<GrpcXdsBootstrap> bootstrap,
const ChannelArgs& args,
OrphanablePtr<XdsTransportFactory> transport_factory)
: XdsClient(
std::move(bootstrap), std::move(transport_factory),
@ -237,13 +247,18 @@ GrpcXdsClient::GrpcXdsClient(
args.GetDurationFromIntMillis(
GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
.value_or(Duration::Seconds(15)))),
key_(key),
certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
static_cast<const GrpcXdsBootstrap&>(this->bootstrap())
.certificate_providers())) {}
GrpcXdsClient::~GrpcXdsClient() {
void GrpcXdsClient::Orphan() {
MutexLock lock(g_mu);
if (g_xds_client == this) g_xds_client = nullptr;
auto it = g_xds_client_map->find(key_);
if (it != g_xds_client_map->end() && it->second == this) {
g_xds_client_map->erase(it);
}
XdsClient::Orphan();
}
grpc_pollset_set* GrpcXdsClient::interested_parties() const {
@ -258,9 +273,9 @@ void SetXdsChannelArgsForTest(grpc_channel_args* args) {
g_channel_args = args;
}
void UnsetGlobalXdsClientForTest() {
void UnsetGlobalXdsClientsForTest() {
MutexLock lock(g_mu);
g_xds_client = nullptr;
g_xds_client_map->clear();
}
void SetXdsFallbackBootstrapConfig(const char* config) {

@ -43,7 +43,7 @@ class GrpcXdsClient : public XdsClient {
public:
// Factory function to get or create the global XdsClient instance.
static absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GetOrCreate(
const ChannelArgs& args, const char* reason);
absl::string_view key, const ChannelArgs& args, const char* reason);
// Builds ClientStatusResponse containing all resources from all XdsClients
static grpc_slice DumpAllClientConfigs();
@ -60,10 +60,12 @@ class GrpcXdsClient : public XdsClient {
// work for callers that use interested_parties() but not for callers
// that also use certificate_provider_store(), but we should consider
// alternatives for that case as well.
GrpcXdsClient(std::unique_ptr<GrpcXdsBootstrap> bootstrap,
GrpcXdsClient(absl::string_view key,
std::unique_ptr<GrpcXdsBootstrap> bootstrap,
const ChannelArgs& args,
OrphanablePtr<XdsTransportFactory> transport_factory);
~GrpcXdsClient() override;
void Orphan() override;
// Helpers for encoding the XdsClient object in channel args.
static absl::string_view ChannelArgName() {
@ -79,13 +81,16 @@ class GrpcXdsClient : public XdsClient {
return *certificate_provider_store_;
}
absl::string_view key() const { return key_; }
private:
std::string key_;
OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
};
namespace internal {
void SetXdsChannelArgsForTest(grpc_channel_args* args);
void UnsetGlobalXdsClientForTest();
void UnsetGlobalXdsClientsForTest();
// Sets bootstrap config to be used when no env var is set.
// Does not take ownership of config.
void SetXdsFallbackBootstrapConfig(const char* config);

@ -94,6 +94,8 @@ namespace {
using ReadDelayHandle = XdsClient::ReadDelayHandle;
constexpr absl::string_view kServerXdsClientKey = "#server";
TraceFlag grpc_xds_server_config_fetcher_trace(false,
"xds_server_config_fetcher");
@ -1372,7 +1374,7 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
"update=%p, user_data=%p}, args=%p)",
3, (notifier.on_serving_status_update, notifier.user_data, args));
auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(
channel_args, "XdsServerConfigFetcher");
grpc_core::kServerXdsClientKey, channel_args, "XdsServerConfigFetcher");
if (!xds_client.ok()) {
gpr_log(GPR_ERROR, "Failed to create xds client: %s",
xds_client.status().ToString().c_str());

@ -880,7 +880,8 @@ void XdsResolver::ClusterSelectionFilter::Call::OnClientInitialMetadata(
//
void XdsResolver::StartLocked() {
auto xds_client = GrpcXdsClient::GetOrCreate(args_, "xds resolver");
auto xds_client =
GrpcXdsClient::GetOrCreate(uri_.ToString(), args_, "xds resolver");
if (!xds_client.ok()) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "

@ -13,8 +13,10 @@
// limitations under the License.
//
#include <algorithm>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include <gmock/gmock.h>
@ -24,9 +26,11 @@
#include "src/core/client_channel/backup_poller.h"
#include "src/core/lib/config/config_vars.h"
#include "src/proto/grpc/testing/xds/v3/listener.pb.h"
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
#include "test/cpp/end2end/xds/xds_server.h"
namespace grpc {
namespace testing {
@ -167,7 +171,21 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, GlobalXdsClientTest,
XdsTestType::kBootstrapFromEnvVar)),
&XdsTestType::Name);
TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) {
TEST_P(GlobalXdsClientTest, MultipleChannelsSameTargetShareXdsClient) {
CreateAndStartBackends(1);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION);
// Create second channel and tell it to connect to the same server.
auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kServerName);
channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
// Make sure there's only one client connected.
EXPECT_EQ(1UL, balancer_->ads_service()->clients().size());
}
TEST_P(GlobalXdsClientTest,
MultipleChannelsDifferentTargetDoNotShareXdsClient) {
CreateAndStartBackends(1);
const char* kNewServerName = "new-server.example.com";
Listener listener = default_listener_;
@ -181,8 +199,8 @@ TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) {
auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kNewServerName);
channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
// Make sure there's only one client connected.
EXPECT_EQ(1UL, balancer_->ads_service()->clients().size());
// Make sure there are two clients connected.
EXPECT_EQ(2UL, balancer_->ads_service()->clients().size());
}
TEST_P(
@ -219,64 +237,62 @@ TEST_P(
}
// Tests that the NACK for multiple bad LDS resources includes both errors.
// This needs to be in GlobalXdsClientTest because the only way to request
// two LDS resources in the same XdsClient is for two channels to share
// the same XdsClient.
// This needs to use xDS server as this is the only scenario when XdsClient
// is shared.
TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) {
CreateAndStartBackends(1);
constexpr char kServerName2[] = "server.other.com";
constexpr char kServerName3[] = "server.another.com";
auto listener = default_listener_;
listener.clear_api_listener();
balancer_->ads_service()->SetLdsResource(listener);
listener.set_name(kServerName2);
balancer_->ads_service()->SetLdsResource(listener);
listener = default_listener_;
listener.set_name(kServerName3);
SetListenerAndRouteConfiguration(balancer_.get(), listener,
default_route_config_);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
CreateBackends(2, true);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForLdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(response_state->error_message,
"xDS response validation errors: ["
"resource index 0: server.example.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener]");
// Need to create a second channel to subscribe to a second LDS resource.
auto channel2 = CreateChannel(0, kServerName2);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
{
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
// Wait for second NACK to be reported to xDS server.
const auto response_state = WaitForLdsNack(DEBUG_LOCATION);
auto listener = default_server_listener_;
listener.clear_address();
listener.set_name(GetServerListenerName(backends_[0]->port()));
balancer_->ads_service()->SetLdsResource(listener);
backends_[0]->Start();
auto response_state = WaitForLdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(
response_state->error_message,
absl::StrFormat(
"xDS response validation errors: ["
"resource index 0: server.other.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener; "
"resource index 1: server.example.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener]");
"resource index 0: "
"grpc/server?xds.resource.listening_address=127.0.0.1:%lu: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener]",
backends_[0]->port()));
listener = default_server_listener_;
listener.clear_address();
listener.set_name(GetServerListenerName(backends_[1]->port()));
balancer_->ads_service()->SetLdsResource(listener);
backends_[1]->Start();
constexpr absl::string_view kMessageFormat =
"xDS response validation errors: ["
"resource index 0: "
"grpc/server?xds.resource.listening_address=127.0.0.1:%d: "
"INVALID_ARGUMENT: Listener has neither address nor "
"ApiListener; "
"resource index 1: "
"grpc/server?xds.resource.listening_address=127.0.0.1:%d: "
"INVALID_ARGUMENT: Listener has neither address nor "
"ApiListener"
"]";
const std::string expected_message1 = absl::StrFormat(
kMessageFormat, backends_[0]->port(), backends_[1]->port());
const std::string expected_message2 = absl::StrFormat(
kMessageFormat, backends_[1]->port(), backends_[0]->port());
response_state = WaitForNack(
DEBUG_LOCATION, [&]() -> absl::optional<AdsServiceImpl::ResponseState> {
auto response = balancer_->ads_service()->lds_response_state();
if (response.has_value() &&
response->state == AdsServiceImpl::ResponseState::NACKED) {
if (response->error_message == expected_message1 ||
response->error_message == expected_message2) {
return response;
}
// Now start a new channel with a third server name, this one with a
// valid resource.
auto channel3 = CreateChannel(0, kServerName3);
auto stub3 = grpc::testing::EchoTestService::NewStub(channel3);
{
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub3->Echo(&context, request, &response);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
gpr_log(GPR_INFO, "non-matching NACK message: %s",
response->error_message.c_str());
}
return absl::nullopt;
});
EXPECT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
}
// Tests that we don't trigger does-not-exist callbacks for a resource

@ -22,6 +22,7 @@
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/strip.h"
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
@ -344,7 +345,7 @@ TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpVanilla) {
// Fetches the client config
auto csds_response = FetchCsdsResponse();
gpr_log(GPR_INFO, "xDS config dump: %s", csds_response.DebugString().c_str());
EXPECT_EQ(1, csds_response.config_size());
ASSERT_EQ(1, csds_response.config_size());
const auto& client_config = csds_response.config(0);
// Validate the Node information
EXPECT_THAT(client_config.node(),
@ -615,6 +616,82 @@ TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterRequested) {
ClientResourceStatus::REQUESTED, ::testing::_))));
}
TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpMultiClient) {
Listener listener = default_listener_;
const char* kServer2Name = "server2.example.com";
listener.set_name(kServer2Name);
balancer_->ads_service()->SetLdsResource(listener);
SetListenerAndRouteConfiguration(balancer_.get(), listener,
default_route_config_);
CreateAndStartBackends(1);
const size_t kNumRpcs = 5;
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send several RPCs to ensure the xDS setup works
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
// Connect to a second server
auto channel2 = CreateChannel(0, kServer2Name);
channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
// Fetches the client config
auto csds_response = FetchCsdsResponse();
ASSERT_EQ(2, csds_response.config_size());
std::vector<std::string> scopes;
for (const auto& client_config : csds_response.config()) {
// Validate the Node information
EXPECT_THAT(client_config.node(),
EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
::testing::HasSubstr(grpc_version_string()),
::testing::ElementsAre(
"envoy.lb.does_not_support_overprovisioning")));
scopes.emplace_back(client_config.client_scope());
absl::string_view server = client_config.client_scope();
// Listener matcher depends on whether RDS is enabled.
::testing::Matcher<google::protobuf::Any> api_listener_matcher;
if (GetParam().enable_rds_testing()) {
api_listener_matcher = IsRdsEnabledHCM();
} else {
api_listener_matcher =
EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
}
// Construct list of all matchers.
std::vector<::testing::Matcher<
envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
matchers = {
// Listener
EqGenericXdsConfig(
kLdsTypeUrl, absl::StripPrefix(server, "xds:"), "3",
UnpackListener(EqListener(absl::StripPrefix(server, "xds:"),
api_listener_matcher)),
ClientResourceStatus::ACKED, ::testing::_),
// Cluster
EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
UnpackCluster(EqCluster(kDefaultClusterName)),
ClientResourceStatus::ACKED, ::testing::_),
// ClusterLoadAssignment
EqGenericXdsConfig(
kEdsTypeUrl, kDefaultEdsServiceName, "1",
UnpackClusterLoadAssignment(EqClusterLoadAssignment(
kDefaultEdsServiceName, backends_[0]->port(),
kDefaultLocalityWeight)),
ClientResourceStatus::ACKED, ::testing::_),
};
// If RDS is enabled, add matcher for RDS resource.
if (GetParam().enable_rds_testing()) {
matchers.push_back(EqGenericXdsConfig(
kRdsTypeUrl, kDefaultRouteConfigurationName, "2",
UnpackRouteConfiguration(EqRouteConfiguration(
kDefaultRouteConfigurationName, kDefaultClusterName)),
ClientResourceStatus::ACKED, ::testing::_));
}
// Validate the dumped xDS configs
EXPECT_THAT(client_config.generic_xds_configs(),
::testing::UnorderedElementsAreArray(matchers));
}
EXPECT_THAT(scopes, ::testing::UnorderedElementsAre(
"xds:server.example.com", "xds:server2.example.com"));
}
class CsdsShortAdsTimeoutTest : public ClientStatusDiscoveryServiceTest {
protected:
void SetUp() override {

@ -523,7 +523,7 @@ void XdsEnd2endTest::InitClient(XdsBootstrapBuilder builder,
// because it's not expecting the client to connect. It also
// ensures that each test can independently set the global channel
// args for the xDS channel.
grpc_core::internal::UnsetGlobalXdsClientForTest();
grpc_core::internal::UnsetGlobalXdsClientsForTest();
}
// Create channel and stub.
ResetStub();
@ -558,7 +558,14 @@ std::shared_ptr<Channel> XdsEnd2endTest::CreateChannel(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
&xds_channel_args_, &kChannelArgsArgVtable);
}
std::string uri = absl::StrCat("xds://", xds_authority, "/", server_name);
std::vector<absl::string_view> parts = {"xds:"};
if (xds_authority != nullptr && xds_authority[0] != '\0') {
parts.emplace_back("//");
parts.emplace_back(xds_authority);
parts.emplace_back("/");
}
parts.emplace_back(server_name);
std::string uri = absl::StrJoin(parts, "");
std::shared_ptr<ChannelCredentials> channel_creds =
GetParam().use_xds_credentials()
? XdsCredentials(CreateTlsFallbackCredentials())

Loading…
Cancel
Save