xds: don't start resource timer after ADS stream restart if resource is already cached (#29668)

* xds: don't start resource timer after ADS stream restart

* fix sanity

* add missing build dep

* attempt to use std::move() for absl::Status

* sanity
pull/29750/head
Mark D. Roth 3 years ago committed by GitHub
parent d4aed9e615
commit a6d70b449b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 22
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  3. 34
      src/core/ext/xds/xds_client.cc
  4. 30
      test/cpp/end2end/xds/xds_core_end2end_test.cc

@ -4219,6 +4219,7 @@ grpc_cc_library(
"iomgr_fwd",
"orphanable",
"ref_counted_ptr",
"server_address",
"time",
"uri_parser",
"useful",

@ -24,6 +24,7 @@
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -79,6 +80,7 @@
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resolver/resolver_factory.h"
#include "src/core/lib/resolver/resolver_registry.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_call_data.h"
@ -788,9 +790,11 @@ void XdsResolver::StartLocked() {
grpc_error_std_string(error).c_str());
std::string error_message;
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message);
Result result;
result.service_config = absl::UnavailableError(
absl::Status status = absl::UnavailableError(
absl::StrCat("Failed to create XdsClient: ", error_message));
Result result;
result.addresses = status;
result.service_config = std::move(status);
result.args = grpc_channel_args_copy(args_);
result_handler_->ReportResult(std::move(result));
GRPC_ERROR_UNREF(error);
@ -802,10 +806,12 @@ void XdsResolver::StartLocked() {
const auto* authority_config =
xds_client_->bootstrap().LookupAuthority(uri_.authority());
if (authority_config == nullptr) {
Result result;
result.service_config = absl::UnavailableError(
absl::Status status = absl::UnavailableError(
absl::StrCat("Invalid target URI -- authority not found for ",
uri_.authority().c_str()));
Result result;
result.addresses = status;
result.service_config = std::move(status);
result.args = grpc_channel_args_copy(args_);
result_handler_->ReportResult(std::move(result));
return;
@ -956,11 +962,13 @@ void XdsResolver::OnError(absl::string_view context, absl::Status status) {
gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s: %s",
this, std::string(context).c_str(), status.ToString().c_str());
if (xds_client_ == nullptr) return;
status =
absl::UnavailableError(absl::StrCat(context, ": ", status.ToString()));
Result result;
result.addresses = status;
result.service_config = std::move(status);
grpc_arg new_arg = xds_client_->MakeChannelArg();
result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
result.service_config =
absl::UnavailableError(absl::StrCat(context, ": ", status.ToString()));
result_handler_->ReportResult(std::move(result));
}
@ -974,6 +982,7 @@ void XdsResolver::OnResourceDoesNotExist() {
}
current_virtual_host_.routes.clear();
Result result;
result.addresses.emplace();
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config = ServiceConfigImpl::Create(args_, "{}", &error);
GPR_ASSERT(*result.service_config != nullptr);
@ -1042,6 +1051,7 @@ void XdsResolver::GenerateResult() {
return;
}
Result result;
result.addresses.emplace();
result.service_config = CreateServiceConfig();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,

@ -198,9 +198,20 @@ class XdsClient::ChannelState::AdsCallState
Unref(DEBUG_LOCATION, "Orphan");
}
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld) {
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (!timer_start_needed_) return;
timer_start_needed_ = false;
// Check if we already have a cached version of this resource
// (i.e., if this is the initial request for the resource after an
// ADS stream restart). If so, we don't start the timer, because
// (a) we already have the resource and (b) the server may
// optimize by not resending the resource that we already have.
auto& authority_state =
ads_calld->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
if (state.resource != nullptr) return;
// Start timer.
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
@ -245,23 +256,23 @@ class XdsClient::ChannelState::AdsCallState
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (error == GRPC_ERROR_NONE && timer_pending_) {
timer_pending_ = false;
absl::Status watcher_error = absl::UnavailableError(absl::StrFormat(
"timeout obtaining resource {type=%s name=%s} from xds server",
type_->type_url(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)));
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] xds server %s: %s",
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: timeout obtaining resource "
"{type=%s name=%s} from xds server",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri.c_str(),
watcher_error.ToString().c_str());
std::string(type_->type_url()).c_str(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
.c_str());
}
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_calld_->xds_client()->NotifyWatchersOnErrorLocked(state.watchers,
watcher_error);
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers);
}
GRPC_ERROR_UNREF(error);
}
@ -306,7 +317,8 @@ class XdsClient::ChannelState::AdsCallState
// Constructs a list of resource names of a given type for an ADS
// request. Also starts the timer for each resource if needed.
std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type);
std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;

@ -553,6 +553,36 @@ TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) {
EXPECT_TRUE(error_seen);
}
TEST_P(TimeoutTest, ServerDoesNotResendAfterAdsStreamRestart) {
CreateAndStartBackends(1);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION);
// Stop balancer.
balancer_->Shutdown();
// Tell balancer to require minimum version 1 for all resource types
// and to not reply to the requests.
balancer_->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kLdsTypeUrl);
balancer_->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kRdsTypeUrl);
balancer_->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kCdsTypeUrl);
balancer_->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1);
balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl);
// Restart balancer.
balancer_->Start();
// Send RPCs for long enough to cover the ADS stream restart delay,
// the stream restart, and then the resulting timeout period, just to
// be sure that the channel continues to use the resources from before
// the restart.
absl::Time deadline =
absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
do {
CheckRpcSendOk(DEBUG_LOCATION);
} while (absl::Now() < deadline);
}
//
// BootstrapSourceTest - tests different bootstrap sources
//

Loading…
Cancel
Save