[XDS] fix federation bug that prevented load reports from being sent (#32826)

This bug occurred when the same xDS server was configured twice in the
same bootstrap config, once in an authority and again as the top-level
server. In that case, we were incorrectly failing to de-dup them and
were creating a separate channel for the LRS stream than the one that
already existed for the ADS stream. We fix this by canonicalizing the
server keys the same way in both cases.

As a separate follow-up item, I will work on trying to find a better way
to key these maps that does not suffer from this kind of fragility.
pull/32849/head^2
Mark D. Roth 2 years ago committed by GitHub
parent f2a7f6d51b
commit 26df3d14e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/core/ext/xds/xds_api.cc
  2. 25
      src/core/ext/xds/xds_client.cc
  3. 82
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  4. 3
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  5. 11
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  6. 9
      test/cpp/end2end/xds/xds_server.cc
  7. 6
      test/cpp/end2end/xds/xds_server.h

@ -504,6 +504,24 @@ std::string XdsApi::CreateLrsRequest(
return SerializeLrsRequest(context, request);
}
namespace {
void MaybeLogLrsResponse(
const XdsApiContext& context,
const envoy_service_load_stats_v3_LoadStatsResponse* response) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
const upb_MessageDef* msg_type =
envoy_service_load_stats_v3_LoadStatsResponse_getmsgdef(context.symtab);
char buf[10240];
upb_TextEncode(response, msg_type, nullptr, 0, buf, sizeof(buf));
gpr_log(GPR_DEBUG, "[xds_client %p] received LRS response: %s",
context.client, buf);
}
}
} // namespace
absl::Status XdsApi::ParseLrsResponse(absl::string_view encoded_response,
bool* send_all_clusters,
std::set<std::string>* cluster_names,
@ -517,6 +535,8 @@ absl::Status XdsApi::ParseLrsResponse(absl::string_view encoded_response,
if (decoded_response == nullptr) {
return absl::UnavailableError("Can't decode response.");
}
const XdsApiContext context = {client_, tracer_, symtab_->ptr(), arena.ptr()};
MaybeLogLrsResponse(context, decoded_response);
// Check send_all_clusters.
if (envoy_service_load_stats_v3_LoadStatsResponse_send_all_clusters(
decoded_response)) {

@ -445,8 +445,8 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
xds_client_(std::move(xds_client)),
server_(server) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
xds_client_.get(), server.server_uri().c_str());
gpr_log(GPR_INFO, "[xds_client %p] creating channel %p for server %s",
xds_client_.get(), this, server.server_uri().c_str());
}
absl::Status status;
transport_ = xds_client_->transport_factory_->Create(
@ -473,6 +473,10 @@ XdsClient::ChannelState::~ChannelState() {
// called from DualRefCounted::Unref, which cannot have a lock annotation for
// a lock in this subclass.
void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s",
xds_client(), this, server_.server_uri().c_str());
}
shutting_down_ = true;
transport_.reset();
// At this time, all strong refs are removed, remove from channel map to
@ -1208,6 +1212,11 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: scheduling load report timer",
xds_client(), parent_->chand()->server_.server_uri().c_str());
}
timer_handle_ = xds_client()->engine()->RunAfter(report_interval_, [this]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
@ -1354,6 +1363,10 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
return;
}
// Start reporting.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] xds server %s: creating load reporter",
xds_client(), chand()->server_.server_uri().c_str());
}
reporter_ = MakeOrphanable<Reporter>(
Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
}
@ -1566,6 +1579,14 @@ void XdsClient::WatchResource(const XdsResourceType* type,
xds_server = authority->server();
}
if (xds_server == nullptr) xds_server = &bootstrap_->server();
// Canonify the xDS server instance, so that we make sure we're using
// the same instance as will be used in AddClusterDropStats() and
// AddClusterLocalityStats(). This may yield a different result than
// the logic above if the same server is listed both in the authority
// and as the top-level server.
// TODO(roth): This is really ugly -- need to find a better way to
// index the xDS server than by address here.
xds_server = bootstrap_->FindXdsServer(*xds_server);
{
MutexLock lock(&mu_);
MaybeRegisterResourceTypeLocked(type);

@ -1293,6 +1293,8 @@ TEST_P(XdsFederationLoadReportingTest, FederationMultipleLoadReportingTest) {
authority_balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(authority_load_report.size(), 1UL);
ClientStats& authority_client_stats = authority_load_report.front();
EXPECT_EQ(authority_client_stats.cluster_name(), kNewClusterName);
EXPECT_EQ(authority_client_stats.eds_service_name(), kNewEdsServiceName);
EXPECT_EQ(kNumRpcsToAuthorityBalancer,
authority_client_stats.total_successful_requests());
EXPECT_EQ(0U, authority_client_stats.total_requests_in_progress());
@ -1307,6 +1309,8 @@ TEST_P(XdsFederationLoadReportingTest, FederationMultipleLoadReportingTest) {
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(default_load_report.size(), 1UL);
ClientStats& default_client_stats = default_load_report.front();
EXPECT_EQ(default_client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(default_client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(kNumRpcsToDefaultBalancer,
default_client_stats.total_successful_requests());
EXPECT_EQ(0U, default_client_stats.total_requests_in_progress());
@ -1318,6 +1322,84 @@ TEST_P(XdsFederationLoadReportingTest, FederationMultipleLoadReportingTest) {
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// This test covers a bug found in the wild whereby we incorrectly failed
// to de-dup xDS servers when the same server is used both in an authority
// and as the top-level server in the bootstrap config. This resulted in
// the ADS call and LRS call being in two different ChannelState objects,
// which resulted in the LRS load reports not being sent.
TEST_P(XdsFederationLoadReportingTest, SameServerInAuthorityAndTopLevel) {
ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION");
const char* kAuthority = "xds.example.com";
const char* kNewServerName = "whee%/server.example.com";
const char* kNewListenerName =
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
"whee%25/server.example.com";
const char* kNewRouteConfigName =
"xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
"new_route_config_name";
const char* kNewClusterName =
"xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
"cluster_name";
const char* kNewEdsServiceName =
"xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
"edsservice_name";
BootstrapBuilder builder = BootstrapBuilder();
std::string xds_server =
absl::StrCat("localhost:", authority_balancer_->port());
builder.AddAuthority(kAuthority, xds_server);
builder.SetDefaultServer(xds_server);
InitClient(builder);
CreateAndStartBackends(1);
authority_balancer_->lrs_service()->set_send_all_clusters(true);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
authority_balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsServiceName));
// New cluster
Cluster new_cluster = default_cluster_;
new_cluster.set_name(kNewClusterName);
new_cluster.mutable_eds_cluster_config()->set_service_name(
kNewEdsServiceName);
authority_balancer_->ads_service()->SetCdsResource(new_cluster);
// New Route
RouteConfiguration new_route_config = default_route_config_;
new_route_config.set_name(kNewRouteConfigName);
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
// New Listener
Listener listener = default_listener_;
listener.set_name(kNewListenerName);
SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
new_route_config);
// Create second channel to new target URI and send 1 RPC.
auto channel2 =
CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
ClientContext context;
EchoRequest request;
RpcOptions().SetupRpc(&context, &request);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(1U, backends_[0]->backend_service()->request_count());
// Wait for load report.
std::vector<ClientStats> authority_load_report =
authority_balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(authority_load_report.size(), 1UL);
ClientStats& authority_client_stats = authority_load_report.front();
EXPECT_EQ(authority_client_stats.cluster_name(), kNewClusterName);
EXPECT_EQ(authority_client_stats.eds_service_name(), kNewEdsServiceName);
EXPECT_EQ(1U, authority_client_stats.total_successful_requests());
EXPECT_EQ(0U, authority_client_stats.total_requests_in_progress());
EXPECT_EQ(1U, authority_client_stats.total_issued_requests());
EXPECT_EQ(0U, authority_client_stats.total_error_requests());
EXPECT_EQ(0U, authority_client_stats.total_dropped_requests());
EXPECT_EQ(1U, authority_balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, authority_balancer_->lrs_service()->response_count());
}
//
// SecureNamingTest - test that the right authority is used for the xDS server
//

@ -749,7 +749,8 @@ void XdsEnd2endTest::InitClient(BootstrapBuilder builder,
xds_channel_args_.num_args = xds_channel_args_to_add_.size();
xds_channel_args_.args = xds_channel_args_to_add_.data();
// Initialize XdsClient state.
builder.SetDefaultServer(absl::StrCat("localhost:", balancer_->port()));
builder.SetDefaultServer(absl::StrCat("localhost:", balancer_->port()),
/*ignore_if_set=*/true);
bootstrap_ = builder.Build();
if (GetParam().bootstrap_source() == XdsTestType::kBootstrapFromEnvVar) {
grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap_.c_str());

@ -415,8 +415,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
ignore_resource_deletion_ = true;
return *this;
}
BootstrapBuilder& SetDefaultServer(const std::string& server) {
top_server_ = server;
// If ignore_if_set is true, sets the default server only if it has
// not already been set.
BootstrapBuilder& SetDefaultServer(const std::string& server,
bool ignore_if_set = false) {
if (!ignore_if_set || top_server_.empty()) top_server_ = server;
return *this;
}
BootstrapBuilder& SetClientDefaultListenerResourceNameTemplate(
@ -432,9 +435,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
return *this;
}
BootstrapBuilder& AddAuthority(
const std::string& authority, const std::string& servers = "",
const std::string& authority, const std::string& server = "",
const std::string& client_listener_resource_name_template = "") {
authorities_[authority] = {servers,
authorities_[authority] = {server,
client_listener_resource_name_template};
return *this;
}

@ -237,13 +237,18 @@ void LrsServiceImpl::Shutdown() {
gpr_log(GPR_INFO, "LRS[%p]: shut down", this);
}
std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport() {
std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport(
absl::Duration timeout) {
timeout *= grpc_test_slowdown_factor();
grpc_core::MutexLock lock(&load_report_mu_);
grpc_core::CondVar cv;
if (result_queue_.empty()) {
load_report_cond_ = &cv;
while (result_queue_.empty()) {
cv.Wait(&load_report_mu_);
if (cv.WaitWithTimeout(&load_report_mu_, timeout)) {
gpr_log(GPR_ERROR, "timed out waiting for load report");
return {};
}
}
load_report_cond_ = nullptr;
}

@ -725,7 +725,11 @@ class LrsServiceImpl
void Shutdown();
std::vector<ClientStats> WaitForLoadReport();
// Returns an empty vector if the timeout elapses with no load report.
// TODO(roth): Change the default here to a finite duration and verify
// that it doesn't cause failures in any existing tests.
std::vector<ClientStats> WaitForLoadReport(
absl::Duration timeout = absl::InfiniteDuration());
private:
using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;

Loading…
Cancel
Save