diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc index ac5b82e98b8..8d8003d185a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -49,7 +49,8 @@ class ChildPolicyHandler::Helper std::unique_ptr picker) override { if (parent_->shutting_down_) return; // If this request is from the pending child policy, ignore it until - // it reports READY, at which point we swap it into place. + // it reports something other than CONNECTING, at which point we swap it + // into place. if (CalledByPendingChild()) { if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { gpr_log(GPR_INFO, @@ -57,7 +58,7 @@ class ChildPolicyHandler::Helper "reports state=%s", parent_.get(), this, child_, ConnectivityStateName(state)); } - if (state != GRPC_CHANNEL_READY) return; + if (state == GRPC_CHANNEL_CONNECTING) return; grpc_pollset_set_del_pollset_set( parent_->child_policy_->interested_parties(), parent_->interested_parties()); @@ -202,6 +203,10 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { // Cases 1, 2b, and 3b: create a new child policy. // If child_policy_ is null, we set it (case 1), else we set // pending_child_policy_ (cases 2b and 3b). + // TODO(roth): In cases 2b and 3b, we should start a timer here, so + // that there's an upper bound on the amount of time it takes us to + // switch to the new policy, even if the new policy stays in + // CONNECTING for a very long period of time. if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] creating new %schild policy %s", this, diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 3c1a38a8c0c..58ce4516619 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -18,6 +18,8 @@ #include +#include "absl/strings/str_cat.h" + #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" @@ -65,6 +67,7 @@ class CdsLb : public LoadBalancingPolicy { : parent_(std::move(parent)) {} void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override; void OnError(grpc_error* error) override; + void OnResourceDoesNotExist() override; private: RefCountedPtr parent_; @@ -211,6 +214,25 @@ void CdsLb::ClusterWatcher::OnError(grpc_error* error) { } } +void CdsLb::ClusterWatcher::OnResourceDoesNotExist() { + gpr_log(GPR_ERROR, "[cdslb %p] CDS resource for %s does not exist", + parent_.get(), parent_->config_->cluster().c_str()); + // Go into TRANSIENT_FAILURE if we have not yet created the child + // policy (i.e., we have not yet received data from xds). Otherwise, + // we keep running with the data we had previously. + // TODO(roth): Once traffic splitting is implemented, this should be + // fixed to report TRANSIENT_FAILURE unconditionally. + if (parent_->child_policy_ == nullptr) { + parent_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::make_unique( + GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("CDS resource \"", parent_->config_->cluster(), + "\" does not exist") + .c_str()))); + } +} + // // CdsLb::Helper // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 9a159b3f4d0..cd49192fc4c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -341,6 +341,23 @@ class EdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { } } + void OnResourceDoesNotExist() override { + gpr_log(GPR_ERROR, "[edslb %p] EDS resource does not exist", + eds_policy_.get()); + // Go into TRANSIENT_FAILURE if we have not yet created the child + // policy (i.e., we have not yet received data from xds). Otherwise, + // we keep running with the data we had previously. + // TODO(roth): Once traffic splitting is implemented, this should be + // fixed to report TRANSIENT_FAILURE unconditionally. + if (eds_policy_->child_policy_ == nullptr) { + eds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::make_unique( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "EDS resource does not exist"))); + } + } + private: RefCountedPtr eds_policy_; }; diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 5dec92ba7ef..28ea4bb5c17 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -71,6 +71,7 @@ class XdsResolver : public Resolver { void OnServiceConfigChanged( RefCountedPtr service_config) override; void OnError(grpc_error* error) override; + void OnResourceDoesNotExist() override; private: RefCountedPtr resolver_; @@ -109,6 +110,20 @@ void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) { resolver_->result_handler()->ReturnResult(std::move(result)); } +void XdsResolver::ServiceConfigWatcher::OnResourceDoesNotExist() { + if (resolver_->xds_client_ == nullptr) return; + gpr_log(GPR_ERROR, + "[xds_resolver %p] LDS/RDS resource does not exist -- returning " + "empty service config", + resolver_.get()); + Result result; + result.service_config = + ServiceConfig::Create("{}", &result.service_config_error); + GPR_ASSERT(result.service_config != nullptr); + result.args = grpc_channel_args_copy(resolver_->args_); + resolver_->result_handler()->ReturnResult(std::move(result)); +} + void XdsResolver::StartLocked() { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index a9a5a286670..973b03ceb00 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -895,9 +895,15 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( gpr_log(GPR_INFO, "[xds_client %p] LDS update does not include requested resource", xds_client()); - xds_client()->service_config_watcher_->OnError( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "LDS update does not include requested resource")); + if (xds_client()->lds_result_.has_value() && + !xds_client()->lds_result_->route_config_name.empty()) { + Unsubscribe(XdsApi::kRdsTypeUrl, + xds_client()->lds_result_->route_config_name, + /*delay_unsubscription=*/false); + xds_client()->rds_result_.reset(); + } + xds_client()->lds_result_.reset(); + xds_client()->service_config_watcher_->OnResourceDoesNotExist(); return; } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -936,6 +942,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( Unsubscribe( XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name, /*delay_unsubscription=*/!lds_update->route_config_name.empty()); + xds_client()->rds_result_.reset(); } xds_client()->lds_result_ = std::move(lds_update); if (xds_client()->lds_result_->rds_update.has_value()) { @@ -963,9 +970,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( gpr_log(GPR_INFO, "[xds_client %p] RDS update does not include requested resource", xds_client()); - xds_client()->service_config_watcher_->OnError( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "RDS update does not include requested resource")); + xds_client()->rds_result_.reset(); + xds_client()->service_config_watcher_->OnResourceDoesNotExist(); return; } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -1051,20 +1057,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( } } // For any subscribed resource that is not present in the update, - // remove it from the cache and notify watchers of the error. + // remove it from the cache and notify watchers that it does not exist. for (const auto& p : cds_state.subscribed_resources) { const std::string& cluster_name = p.first; if (cds_update_map.find(cluster_name) == cds_update_map.end()) { ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name]; cluster_state.update.reset(); for (const auto& p : cluster_state.watchers) { - p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Cluster not present in CDS update")); + p.first->OnResourceDoesNotExist(); } } } - // Also remove any EDS resources that are no longer referred to by any CDS - // resources. + // For any EDS resource that is no longer referred to by any CDS + // resources, remove it from the cache and notify watchers that it + // does not exist. auto& eds_state = state_map_[XdsApi::kEdsTypeUrl]; for (const auto& p : eds_state.subscribed_resources) { const std::string& eds_resource_name = p.first; @@ -1074,8 +1080,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( xds_client()->endpoint_map_[eds_resource_name]; endpoint_state.update.reset(); for (const auto& p : endpoint_state.watchers) { - p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "ClusterLoadAssignment resource removed due to CDS update")); + p.first->OnResourceDoesNotExist(); } } } diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 725c5da011a..8233c5d2e08 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -50,6 +50,8 @@ class XdsClient : public InternallyRefCounted { RefCountedPtr service_config) = 0; virtual void OnError(grpc_error* error) = 0; + + virtual void OnResourceDoesNotExist() = 0; }; // Cluster data watcher interface. Implemented by callers. @@ -60,6 +62,8 @@ class XdsClient : public InternallyRefCounted { virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; virtual void OnError(grpc_error* error) = 0; + + virtual void OnResourceDoesNotExist() = 0; }; // Endpoint data watcher interface. Implemented by callers. @@ -70,6 +74,8 @@ class XdsClient : public InternallyRefCounted { virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; virtual void OnError(grpc_error* error) = 0; + + virtual void OnResourceDoesNotExist() = 0; }; // If *error is not GRPC_ERROR_NONE after construction, then there was diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 2e3b4e9ca78..ac727688ca5 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -890,84 +890,6 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, UpdatesGoToMostRecentChildPolicy) { - const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); - ResetStub(kFallbackTimeoutMs); - int unreachable_balancer_port = grpc_pick_unused_port_or_die(); - int unreachable_backend_port = grpc_pick_unused_port_or_die(); - // Phase 1: Start with RR pointing to first backend. - gpr_log(GPR_INFO, "PHASE 1: Initial setup with RR with first backend"); - SetNextResolution( - { - // Unreachable balancer. - {unreachable_balancer_port, ""}, - }, - { - // Fallback address: first backend. - {backends_[0]->port_, ""}, - }, - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"grpclb\":{\n" - " \"childPolicy\":[\n" - " { \"round_robin\":{} }\n" - " ]\n" - " } }\n" - " ]\n" - "}"); - // RPCs should go to first backend. - WaitForBackend(0); - // Phase 2: Switch to PF pointing to unreachable backend. - gpr_log(GPR_INFO, "PHASE 2: Update to use PF with unreachable backend"); - SetNextResolution( - { - // Unreachable balancer. - {unreachable_balancer_port, ""}, - }, - { - // Fallback address: unreachable backend. - {unreachable_backend_port, ""}, - }, - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"grpclb\":{\n" - " \"childPolicy\":[\n" - " { \"pick_first\":{} }\n" - " ]\n" - " } }\n" - " ]\n" - "}"); - // RPCs should continue to go to the first backend, because the new - // PF child policy will never go into state READY. - WaitForBackend(0); - // Phase 3: Switch back to RR pointing to second and third backends. - // This ensures that we create a new policy rather than updating the - // pending PF policy. - gpr_log(GPR_INFO, "PHASE 3: Update to use RR again with two backends"); - SetNextResolution( - { - // Unreachable balancer. - {unreachable_balancer_port, ""}, - }, - { - // Fallback address: second and third backends. - {backends_[1]->port_, ""}, - {backends_[2]->port_, ""}, - }, - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"grpclb\":{\n" - " \"childPolicy\":[\n" - " { \"round_robin\":{} }\n" - " ]\n" - " } }\n" - " ]\n" - "}"); - // RPCs should go to the second and third backends. - WaitForBackend(1); - WaitForBackend(2); -} - TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { SetNextResolutionAllBalancers(); // Same backend listed twice. diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 7e0b2733bb7..49a975ca2f4 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1897,6 +1897,30 @@ TEST_P(XdsResolverOnlyTest, ChangeClusters) { EXPECT_EQ(0, std::get<1>(counts)); } +// Tests that we go into TRANSIENT_FAILURE if the Listener is removed. +TEST_P(XdsResolverOnlyTest, ListenerRemoved) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Unset CDS resource. + balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, + kDefaultResourceName); + // Wait for RPCs to start failing. + do { + } while (SendRpc(RpcOptions(), nullptr).ok()); + // Make sure RPCs are still failing. + CheckRpcSendFailure(1000); + // Make sure we ACK'ed the update. + EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state, + AdsServiceImpl::ResponseState::ACKED); +} + // Tests that things keep workng if the cluster resource disappears. TEST_P(XdsResolverOnlyTest, ClusterRemoved) { SetNextResolution({});