Merge pull request #22858 from markdroth/xds_on_resource_does_not_exist

xds: When listener does not exist, put channel in TRANSIENT_FAILURE.
pull/22918/head
Mark D. Roth 5 years ago committed by GitHub
commit e4801d2ecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  2. 22
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  3. 17
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  4. 15
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  5. 31
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 6
      src/core/ext/filters/client_channel/xds/xds_client.h
  7. 78
      test/cpp/end2end/grpclb_end2end_test.cc
  8. 24
      test/cpp/end2end/xds_end2end_test.cc

@ -49,7 +49,8 @@ class ChildPolicyHandler::Helper
std::unique_ptr<SubchannelPicker> picker) override {
if (parent_->shutting_down_) return;
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
// it reports something other than CONNECTING, at which point we swap it
// into place.
if (CalledByPendingChild()) {
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO,
@ -57,7 +58,7 @@ class ChildPolicyHandler::Helper
"reports state=%s",
parent_.get(), this, child_, ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
if (state == GRPC_CHANNEL_CONNECTING) return;
grpc_pollset_set_del_pollset_set(
parent_->child_policy_->interested_parties(),
parent_->interested_parties());
@ -202,6 +203,10 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
// TODO(roth): In cases 2b and 3b, we should start a timer here, so
// that there's an upper bound on the amount of time it takes us to
// switch to the new policy, even if the new policy stays in
// CONNECTING for a very long period of time.
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[child_policy_handler %p] creating new %schild policy %s", this,

@ -18,6 +18,8 @@
#include <string.h>
#include "absl/strings/str_cat.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
@ -65,6 +67,7 @@ class CdsLb : public LoadBalancingPolicy {
: parent_(std::move(parent)) {}
void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override;
void OnError(grpc_error* error) override;
void OnResourceDoesNotExist() override;
private:
RefCountedPtr<CdsLb> parent_;
@ -211,6 +214,25 @@ void CdsLb::ClusterWatcher::OnError(grpc_error* error) {
}
}
void CdsLb::ClusterWatcher::OnResourceDoesNotExist() {
gpr_log(GPR_ERROR, "[cdslb %p] CDS resource for %s does not exist",
parent_.get(), parent_->config_->cluster().c_str());
// Go into TRANSIENT_FAILURE if we have not yet created the child
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
// TODO(roth): Once traffic splitting is implemented, this should be
// fixed to report TRANSIENT_FAILURE unconditionally.
if (parent_->child_policy_ == nullptr) {
parent_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::make_unique<TransientFailurePicker>(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("CDS resource \"", parent_->config_->cluster(),
"\" does not exist")
.c_str())));
}
}
//
// CdsLb::Helper
//

@ -341,6 +341,23 @@ class EdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
}
}
void OnResourceDoesNotExist() override {
gpr_log(GPR_ERROR, "[edslb %p] EDS resource does not exist",
eds_policy_.get());
// Go into TRANSIENT_FAILURE if we have not yet created the child
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
// TODO(roth): Once traffic splitting is implemented, this should be
// fixed to report TRANSIENT_FAILURE unconditionally.
if (eds_policy_->child_policy_ == nullptr) {
eds_policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::make_unique<TransientFailurePicker>(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS resource does not exist")));
}
}
private:
RefCountedPtr<EdsLb> eds_policy_;
};

@ -71,6 +71,7 @@ class XdsResolver : public Resolver {
void OnServiceConfigChanged(
RefCountedPtr<ServiceConfig> service_config) override;
void OnError(grpc_error* error) override;
void OnResourceDoesNotExist() override;
private:
RefCountedPtr<XdsResolver> resolver_;
@ -109,6 +110,20 @@ void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
resolver_->result_handler()->ReturnResult(std::move(result));
}
void XdsResolver::ServiceConfigWatcher::OnResourceDoesNotExist() {
if (resolver_->xds_client_ == nullptr) return;
gpr_log(GPR_ERROR,
"[xds_resolver %p] LDS/RDS resource does not exist -- returning "
"empty service config",
resolver_.get());
Result result;
result.service_config =
ServiceConfig::Create("{}", &result.service_config_error);
GPR_ASSERT(result.service_config != nullptr);
result.args = grpc_channel_args_copy(resolver_->args_);
resolver_->result_handler()->ReturnResult(std::move(result));
}
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(

@ -895,9 +895,15 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
gpr_log(GPR_INFO,
"[xds_client %p] LDS update does not include requested resource",
xds_client());
xds_client()->service_config_watcher_->OnError(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"LDS update does not include requested resource"));
if (xds_client()->lds_result_.has_value() &&
!xds_client()->lds_result_->route_config_name.empty()) {
Unsubscribe(XdsApi::kRdsTypeUrl,
xds_client()->lds_result_->route_config_name,
/*delay_unsubscription=*/false);
xds_client()->rds_result_.reset();
}
xds_client()->lds_result_.reset();
xds_client()->service_config_watcher_->OnResourceDoesNotExist();
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -936,6 +942,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
Unsubscribe(
XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name,
/*delay_unsubscription=*/!lds_update->route_config_name.empty());
xds_client()->rds_result_.reset();
}
xds_client()->lds_result_ = std::move(lds_update);
if (xds_client()->lds_result_->rds_update.has_value()) {
@ -963,9 +970,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
gpr_log(GPR_INFO,
"[xds_client %p] RDS update does not include requested resource",
xds_client());
xds_client()->service_config_watcher_->OnError(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RDS update does not include requested resource"));
xds_client()->rds_result_.reset();
xds_client()->service_config_watcher_->OnResourceDoesNotExist();
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -1051,20 +1057,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers of the error.
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : cds_state.subscribed_resources) {
const std::string& cluster_name = p.first;
if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
cluster_state.update.reset();
for (const auto& p : cluster_state.watchers) {
p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Cluster not present in CDS update"));
p.first->OnResourceDoesNotExist();
}
}
}
// Also remove any EDS resources that are no longer referred to by any CDS
// resources.
// For any EDS resource that is no longer referred to by any CDS
// resources, remove it from the cache and notify watchers that it
// does not exist.
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
for (const auto& p : eds_state.subscribed_resources) {
const std::string& eds_resource_name = p.first;
@ -1074,8 +1080,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
xds_client()->endpoint_map_[eds_resource_name];
endpoint_state.update.reset();
for (const auto& p : endpoint_state.watchers) {
p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"ClusterLoadAssignment resource removed due to CDS update"));
p.first->OnResourceDoesNotExist();
}
}
}

@ -50,6 +50,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
RefCountedPtr<ServiceConfig> service_config) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
// Cluster data watcher interface. Implemented by callers.
@ -60,6 +62,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
// Endpoint data watcher interface. Implemented by callers.
@ -70,6 +74,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
// If *error is not GRPC_ERROR_NONE after construction, then there was

@ -890,84 +890,6 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, UpdatesGoToMostRecentChildPolicy) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
int unreachable_balancer_port = grpc_pick_unused_port_or_die();
int unreachable_backend_port = grpc_pick_unused_port_or_die();
// Phase 1: Start with RR pointing to first backend.
gpr_log(GPR_INFO, "PHASE 1: Initial setup with RR with first backend");
SetNextResolution(
{
// Unreachable balancer.
{unreachable_balancer_port, ""},
},
{
// Fallback address: first backend.
{backends_[0]->port_, ""},
},
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"round_robin\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
// RPCs should go to first backend.
WaitForBackend(0);
// Phase 2: Switch to PF pointing to unreachable backend.
gpr_log(GPR_INFO, "PHASE 2: Update to use PF with unreachable backend");
SetNextResolution(
{
// Unreachable balancer.
{unreachable_balancer_port, ""},
},
{
// Fallback address: unreachable backend.
{unreachable_backend_port, ""},
},
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"pick_first\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
// RPCs should continue to go to the first backend, because the new
// PF child policy will never go into state READY.
WaitForBackend(0);
// Phase 3: Switch back to RR pointing to second and third backends.
// This ensures that we create a new policy rather than updating the
// pending PF policy.
gpr_log(GPR_INFO, "PHASE 3: Update to use RR again with two backends");
SetNextResolution(
{
// Unreachable balancer.
{unreachable_balancer_port, ""},
},
{
// Fallback address: second and third backends.
{backends_[1]->port_, ""},
{backends_[2]->port_, ""},
},
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"round_robin\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
// RPCs should go to the second and third backends.
WaitForBackend(1);
WaitForBackend(2);
}
TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
SetNextResolutionAllBalancers();
// Same backend listed twice.

@ -1897,6 +1897,30 @@ TEST_P(XdsResolverOnlyTest, ChangeClusters) {
EXPECT_EQ(0, std::get<1>(counts));
}
// Tests that we go into TRANSIENT_FAILURE if the Listener is removed.
TEST_P(XdsResolverOnlyTest, ListenerRemoved) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
// We need to wait for all backends to come online.
WaitForAllBackends();
// Unset CDS resource.
balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl,
kDefaultResourceName);
// Wait for RPCs to start failing.
do {
} while (SendRpc(RpcOptions(), nullptr).ok());
// Make sure RPCs are still failing.
CheckRpcSendFailure(1000);
// Make sure we ACK'ed the update.
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
}
// Tests that things keep workng if the cluster resource disappears.
TEST_P(XdsResolverOnlyTest, ClusterRemoved) {
SetNextResolution({});

Loading…
Cancel
Save