From a6d70b449bb5a26b3e6124d9fe9790b5c4579efb Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 20 May 2022 13:40:43 -0700 Subject: [PATCH] xds: don't start resource timer after ADS stream restart if resource is already cached (#29668) * xds: don't start resource timer after ADS stream restart * fix sanity * add missing build dep * attempt to use std::move() for absl::Status * sanity --- BUILD | 1 + .../resolver/xds/xds_resolver.cc | 22 ++++++++---- src/core/ext/xds/xds_client.cc | 34 +++++++++++++------ test/cpp/end2end/xds/xds_core_end2end_test.cc | 30 ++++++++++++++++ 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/BUILD b/BUILD index 2779c8b583a..fd7f1eaa1e1 100644 --- a/BUILD +++ b/BUILD @@ -4219,6 +4219,7 @@ grpc_cc_library( "iomgr_fwd", "orphanable", "ref_counted_ptr", + "server_address", "time", "uri_parser", "useful", diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index b1babb8cc46..55f1a37f0a2 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -79,6 +80,7 @@ #include "src/core/lib/resolver/resolver.h" #include "src/core/lib/resolver/resolver_factory.h" #include "src/core/lib/resolver/resolver_registry.h" +#include "src/core/lib/resolver/server_address.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config_call_data.h" @@ -788,9 +790,11 @@ void XdsResolver::StartLocked() { grpc_error_std_string(error).c_str()); std::string error_message; grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message); - Result result; - result.service_config = absl::UnavailableError( + absl::Status status = absl::UnavailableError( absl::StrCat("Failed to create XdsClient: ", error_message)); + Result result; + result.addresses = status; + result.service_config = std::move(status); result.args = grpc_channel_args_copy(args_); result_handler_->ReportResult(std::move(result)); GRPC_ERROR_UNREF(error); @@ -802,10 +806,12 @@ void XdsResolver::StartLocked() { const auto* authority_config = xds_client_->bootstrap().LookupAuthority(uri_.authority()); if (authority_config == nullptr) { - Result result; - result.service_config = absl::UnavailableError( + absl::Status status = absl::UnavailableError( absl::StrCat("Invalid target URI -- authority not found for ", uri_.authority().c_str())); + Result result; + result.addresses = status; + result.service_config = std::move(status); result.args = grpc_channel_args_copy(args_); result_handler_->ReportResult(std::move(result)); return; @@ -956,11 +962,13 @@ void XdsResolver::OnError(absl::string_view context, absl::Status status) { gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s: %s", this, std::string(context).c_str(), status.ToString().c_str()); if (xds_client_ == nullptr) return; + status = + absl::UnavailableError(absl::StrCat(context, ": ", status.ToString())); Result result; + result.addresses = status; + result.service_config = std::move(status); grpc_arg new_arg = xds_client_->MakeChannelArg(); result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1); - result.service_config = - absl::UnavailableError(absl::StrCat(context, ": ", status.ToString())); result_handler_->ReportResult(std::move(result)); } @@ -974,6 +982,7 @@ void XdsResolver::OnResourceDoesNotExist() { } current_virtual_host_.routes.clear(); Result result; + result.addresses.emplace(); grpc_error_handle error = GRPC_ERROR_NONE; result.service_config = ServiceConfigImpl::Create(args_, "{}", &error); GPR_ASSERT(*result.service_config != nullptr); @@ -1042,6 +1051,7 @@ void XdsResolver::GenerateResult() { return; } Result result; + result.addresses.emplace(); result.service_config = CreateServiceConfig(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this, diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index b2200071eb0..4998c81add3 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -198,9 +198,20 @@ class XdsClient::ChannelState::AdsCallState Unref(DEBUG_LOCATION, "Orphan"); } - void MaybeStartTimer(RefCountedPtr ads_calld) { + void MaybeStartTimer(RefCountedPtr ads_calld) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { if (!timer_start_needed_) return; timer_start_needed_ = false; + // Check if we already have a cached version of this resource + // (i.e., if this is the initial request for the resource after an + // ADS stream restart). If so, we don't start the timer, because + // (a) we already have the resource and (b) the server may + // optimize by not resending the resource that we already have. + auto& authority_state = + ads_calld->xds_client()->authority_state_map_[name_.authority]; + ResourceState& state = authority_state.resource_map[type_][name_.key]; + if (state.resource != nullptr) return; + // Start timer. ads_calld_ = std::move(ads_calld); Ref(DEBUG_LOCATION, "timer").release(); timer_pending_ = true; @@ -245,23 +256,23 @@ class XdsClient::ChannelState::AdsCallState ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { if (error == GRPC_ERROR_NONE && timer_pending_) { timer_pending_ = false; - absl::Status watcher_error = absl::UnavailableError(absl::StrFormat( - "timeout obtaining resource {type=%s name=%s} from xds server", - type_->type_url(), - XdsClient::ConstructFullXdsResourceName( - name_.authority, type_->type_url(), name_.key))); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] xds server %s: %s", + gpr_log(GPR_INFO, + "[xds_client %p] xds server %s: timeout obtaining resource " + "{type=%s name=%s} from xds server", ads_calld_->xds_client(), ads_calld_->chand()->server_.server_uri.c_str(), - watcher_error.ToString().c_str()); + std::string(type_->type_url()).c_str(), + XdsClient::ConstructFullXdsResourceName( + name_.authority, type_->type_url(), name_.key) + .c_str()); } auto& authority_state = ads_calld_->xds_client()->authority_state_map_[name_.authority]; ResourceState& state = authority_state.resource_map[type_][name_.key]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - ads_calld_->xds_client()->NotifyWatchersOnErrorLocked(state.watchers, - watcher_error); + ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist( + state.watchers); } GRPC_ERROR_UNREF(error); } @@ -306,7 +317,8 @@ class XdsClient::ChannelState::AdsCallState // Constructs a list of resource names of a given type for an ADS // request. Also starts the timer for each resource if needed. - std::vector ResourceNamesForRequest(const XdsResourceType* type); + std::vector ResourceNamesForRequest(const XdsResourceType* type) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); // The owning RetryableCall<>. RefCountedPtr> parent_; diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc index 2150251a0b0..405fbcdfe1e 100644 --- a/test/cpp/end2end/xds/xds_core_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc @@ -553,6 +553,36 @@ TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) { EXPECT_TRUE(error_seen); } +TEST_P(TimeoutTest, ServerDoesNotResendAfterAdsStreamRestart) { + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION); + // Stop balancer. + balancer_->Shutdown(); + // Tell balancer to require minimum version 1 for all resource types + // and to not reply to the requests. + balancer_->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1); + balancer_->ads_service()->IgnoreResourceType(kLdsTypeUrl); + balancer_->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1); + balancer_->ads_service()->IgnoreResourceType(kRdsTypeUrl); + balancer_->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1); + balancer_->ads_service()->IgnoreResourceType(kCdsTypeUrl); + balancer_->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1); + balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl); + // Restart balancer. + balancer_->Start(); + // Send RPCs for long enough to cover the ADS stream restart delay, + // the stream restart, and then the resulting timeout period, just to + // be sure that the channel continues to use the resources from before + // the restart. + absl::Time deadline = + absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor()); + do { + CheckRpcSendOk(DEBUG_LOCATION); + } while (absl::Now() < deadline); +} + // // BootstrapSourceTest - tests different bootstrap sources //