From 9ff943b81e1fda799043db08bf31efdb6d781f21 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 12 Sep 2022 17:17:03 -0700 Subject: [PATCH] client_channel: allow LB policy to communicate update errors to resolver (#30809) * client_channel: allow LB policy to communicate update errors to resolver * fix tests * Automated change: Fix sanity tests * fix build * fix another "ignoring return value" warning * fix use-after-move * fix channel to invoke resolver callback when service config fails on initial resolution * remove outdated TODO * improve comments * fix PollingResolver to defer re-resolution requests while waiting for result-health callback * Automated change: Fix sanity tests * absl::exchange -> std::exchange * fix dns_resolver_cooldown_test Co-authored-by: markdroth --- BUILD | 1 + .../filters/client_channel/client_channel.cc | 15 ++- .../filters/client_channel/client_channel.h | 2 +- .../lb_policy/child_policy_handler.cc | 4 +- .../lb_policy/child_policy_handler.h | 3 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 24 ++-- .../outlier_detection/outlier_detection.cc | 6 +- .../lb_policy/pick_first/pick_first.cc | 15 ++- .../lb_policy/priority/priority.cc | 37 ++++-- .../lb_policy/ring_hash/ring_hash.cc | 19 +-- .../client_channel/lb_policy/rls/rls.cc | 35 ++++-- .../lb_policy/round_robin/round_robin.cc | 14 ++- .../weighted_target/weighted_target.cc | 37 ++++-- .../client_channel/lb_policy/xds/cds.cc | 9 +- .../lb_policy/xds/xds_cluster_impl.cc | 18 +-- .../lb_policy/xds/xds_cluster_manager.cc | 33 +++-- .../lb_policy/xds/xds_cluster_resolver.cc | 22 ++-- .../resolver/polling_resolver.cc | 113 +++++++++++++----- .../resolver/polling_resolver.h | 11 ++ src/core/lib/load_balancing/lb_policy.h | 10 +- src/core/lib/resolver/resolver.h | 16 ++- .../resolvers/dns_resolver_cooldown_test.cc | 5 +- test/core/end2end/tests/retry_lb_drop.cc | 3 +- test/core/util/test_lb_policies.cc | 11 +- test/cpp/end2end/client_lb_end2end_test.cc | 67 +++++++++++ 25 files changed, 383 insertions(+), 147 deletions(-) diff --git a/BUILD b/BUILD index 2eeda3d6898..5360ef62815 100644 --- a/BUILD +++ b/BUILD @@ -5451,6 +5451,7 @@ grpc_cc_library( "gpr", "grpc_base", "grpc_resolver", + "grpc_service_config", "grpc_trace", "iomgr_fwd", "iomgr_timer", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 9eb5961482d..3846f86aeda 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1171,6 +1171,9 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: got resolver result", this); } + // Grab resolver result health callback. + auto resolver_callback = std::move(result.result_health_callback); + absl::Status resolver_result_status; // We only want to trace the address resolution in the follow cases: // (a) Address resolution resulted in service config change. // (b) Address resolution that causes number of backends to go from @@ -1222,6 +1225,8 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { // TRANSIENT_FAILURE. OnResolverErrorLocked(result.service_config.status()); trace_strings.push_back("no valid service config"); + resolver_result_status = + absl::UnavailableError("no valid service config"); } } else if (*result.service_config == nullptr) { // Resolver did not return any service config. @@ -1266,7 +1271,7 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { gpr_log(GPR_INFO, "chand=%p: service config not changed", this); } // Create or update LB policy, as needed. - CreateOrUpdateLbPolicyLocked( + resolver_result_status = CreateOrUpdateLbPolicyLocked( std::move(lb_policy_config), parsed_service_config->health_check_service_name(), std::move(result)); if (service_config_changed || config_selector_changed) { @@ -1280,6 +1285,10 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { trace_strings.push_back("Service config changed"); } } + // Invoke resolver callback if needed. + if (resolver_callback != nullptr) { + resolver_callback(std::move(resolver_result_status)); + } // Add channel trace event. if (!trace_strings.empty()) { std::string message = @@ -1326,7 +1335,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) { } } -void ClientChannel::CreateOrUpdateLbPolicyLocked( +absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, const absl::optional& health_check_service_name, Resolver::Result result) { @@ -1353,7 +1362,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked( gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this, lb_policy_.get()); } - lb_policy_->UpdateLocked(std::move(update_args)); + return lb_policy_->UpdateLocked(std::move(update_args)); } // Creates a new LB policy. diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index e4e4853fe00..0684e4876c1 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -250,7 +250,7 @@ class ClientChannel { void OnResolverErrorLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); - void CreateOrUpdateLbPolicyLocked( + absl::Status CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, const absl::optional& health_check_service_name, Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc index 358558002ac..41568e68963 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -161,7 +161,7 @@ void ChildPolicyHandler::ShutdownLocked() { } } -void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { +absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) { // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store // the new child policy in pending_child_policy_. Once the new child @@ -253,7 +253,7 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { policy_to_update == pending_child_policy_.get() ? "pending " : "", policy_to_update); } - policy_to_update->UpdateLocked(std::move(args)); + return policy_to_update->UpdateLocked(std::move(args)); } void ChildPolicyHandler::ExitIdleLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h index d13552848c7..19651aa0a1c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h @@ -20,6 +20,7 @@ #include +#include "absl/status/status.h" #include "absl/strings/string_view.h" #include "src/core/lib/channel/channel_args.h" @@ -43,7 +44,7 @@ class ChildPolicyHandler : public LoadBalancingPolicy { absl::string_view name() const override { return "child_policy_handler"; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 6d2ecf4fe00..c9b8b5dd7a4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -188,7 +188,7 @@ class GrpcLb : public LoadBalancingPolicy { absl::string_view name() const override { return kGrpclb; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; private: @@ -473,7 +473,7 @@ class GrpcLb : public LoadBalancingPolicy { void ShutdownLocked() override; // Helper functions used in UpdateLocked(). - void UpdateBalancerChannelLocked(const ChannelArgs& args); + absl::Status UpdateBalancerChannelLocked(const ChannelArgs& args); void CancelBalancerChannelConnectivityWatchLocked(); @@ -1524,7 +1524,7 @@ void GrpcLb::ResetBackoffLocked() { } } -void GrpcLb::UpdateLocked(UpdateArgs args) { +absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { const bool is_initial_update = lb_channel_ == nullptr; config_ = args.config; GPR_ASSERT(config_ != nullptr); @@ -1540,7 +1540,7 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { } resolution_note_ = std::move(args.resolution_note); // Update balancer channel. - UpdateBalancerChannelLocked(args.args); + absl::Status status = UpdateBalancerChannelLocked(args.args); // Update the existing child policy, if any. if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); // If this is the initial update, start the fallback-at-startup checks @@ -1565,18 +1565,24 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { // Start balancer call. StartBalancerCallLocked(); } + return status; } // // helpers for UpdateLocked() // -void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) { +absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) { // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. args_ = args.Set(GRPC_ARG_LB_POLICY_NAME, "grpclb"); - // Construct args for balancer channel. + // Get balancer addresses. ServerAddressList balancer_addresses = ExtractBalancerAddresses(args); + absl::Status status; + if (balancer_addresses.empty()) { + status = absl::UnavailableError("balancer address list must be non-empty"); + } + // Construct args for balancer channel. ChannelArgs lb_channel_args = BuildBalancerChannelArgs(response_generator_.get(), args); // Create balancer channel if needed. @@ -1604,6 +1610,8 @@ void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) { result.addresses = std::move(balancer_addresses); result.args = lb_channel_args; response_generator_->SetResponse(std::move(result)); + // Return status. + return status; } void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { @@ -1794,7 +1802,9 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this, child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + // TODO(roth): If we're in fallback mode and the child policy rejects the + // update, we should propagate that failure back to the resolver somehow. + (void)child_policy_->UpdateLocked(std::move(update_args)); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index 9d4379661a8..ab0496d9c67 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -126,7 +126,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy { absl::string_view name() const override { return kOutlierDetection; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; @@ -595,7 +595,7 @@ void OutlierDetectionLb::ResetBackoffLocked() { if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } -void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { +absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this); } @@ -692,7 +692,7 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { "[outlier_detection_lb %p] Updating child policy handler %p", this, child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } void OutlierDetectionLb::MaybeUpdatePickerLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 3337d9f5a27..4de551743c4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -69,7 +69,7 @@ class PickFirst : public LoadBalancingPolicy { absl::string_view name() const override { return kPickFirst; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; @@ -232,7 +232,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { this, std::move(addresses), latest_update_args_.args); latest_pending_subchannel_list_->StartWatchingLocked(); // Empty update or no valid subchannels. Put the channel in - // TRANSIENT_FAILURE. + // TRANSIENT_FAILURE and request re-resolution. if (latest_pending_subchannel_list_->num_subchannels() == 0) { absl::Status status = latest_update_args_.addresses.ok() @@ -242,6 +242,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); + channel_control_helper()->RequestReresolution(); } // Otherwise, if this is the initial update, report CONNECTING. else if (subchannel_list_.get() == nullptr) { @@ -263,7 +264,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { } } -void PickFirst::UpdateLocked(UpdateArgs args) { +absl::Status PickFirst::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { if (args.addresses.ok()) { gpr_log(GPR_INFO, @@ -276,6 +277,13 @@ void PickFirst::UpdateLocked(UpdateArgs args) { } // Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1); + // Set return status based on the address list. + absl::Status status; + if (!args.addresses.ok()) { + status = args.addresses.status(); + } else if (args.addresses->empty()) { + status = absl::UnavailableError("address list must not be empty"); + } // If the update contains a resolver error and we have a previous update // that was not a resolver error, keep using the previous addresses. if (!args.addresses.ok() && latest_update_args_.config != nullptr) { @@ -288,6 +296,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) { if (!idle_) { AttemptToConnectUsingLatestUpdateArgsLocked(); } + return status; } void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index 896c4a81f64..4172ecda1d0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -110,7 +110,7 @@ class PriorityLb : public LoadBalancingPolicy { absl::string_view name() const override { return kPriority; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; @@ -126,8 +126,8 @@ class PriorityLb : public LoadBalancingPolicy { const std::string& name() const { return name_; } - void UpdateLocked(RefCountedPtr config, - bool ignore_reresolution_requests); + absl::Status UpdateLocked(RefCountedPtr config, + bool ignore_reresolution_requests); void ExitIdleLocked(); void ResetBackoffLocked(); void MaybeDeactivateLocked(); @@ -344,7 +344,7 @@ void PriorityLb::ResetBackoffLocked() { for (const auto& p : children_) p.second->ResetBackoffLocked(); } -void PriorityLb::UpdateLocked(UpdateArgs args) { +absl::Status PriorityLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] received update", this); } @@ -357,6 +357,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) { resolution_note_ = std::move(args.resolution_note); // Check all existing children against the new config. update_in_progress_ = true; + std::vector errors; for (const auto& p : children_) { const std::string& child_name = p.first; auto& child = p.second; @@ -366,13 +367,24 @@ void PriorityLb::UpdateLocked(UpdateArgs args) { child->MaybeDeactivateLocked(); } else { // Existing child found in new config. Update it. - child->UpdateLocked(config_it->second.config, - config_it->second.ignore_reresolution_requests); + absl::Status status = + child->UpdateLocked(config_it->second.config, + config_it->second.ignore_reresolution_requests); + if (!status.ok()) { + errors.emplace_back( + absl::StrCat("child ", child_name, ": ", status.ToString())); + } } } update_in_progress_ = false; // Try to get connected. ChoosePriorityLocked(); + // Return status. + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } + return absl::OkStatus(); } uint32_t PriorityLb::GetChildPriorityLocked( @@ -416,8 +428,11 @@ void PriorityLb::ChoosePriorityLocked() { Ref(DEBUG_LOCATION, "ChildPriority"), child_name); auto child_config = config_->children().find(child_name); GPR_DEBUG_ASSERT(child_config != config_->children().end()); - child->UpdateLocked(child_config->second.config, - child_config->second.ignore_reresolution_requests); + // TODO(roth): If the child reports a non-OK status with the + // update, we need to propagate that back to the resolver somehow. + (void)child->UpdateLocked( + child_config->second.config, + child_config->second.ignore_reresolution_requests); } else { // The child already exists. Reactivate if needed. child->MaybeReactivateLocked(); @@ -668,10 +683,10 @@ PriorityLb::ChildPriority::GetPicker() { return absl::make_unique(picker_wrapper_); } -void PriorityLb::ChildPriority::UpdateLocked( +absl::Status PriorityLb::ChildPriority::UpdateLocked( RefCountedPtr config, bool ignore_reresolution_requests) { - if (priority_policy_->shutting_down_) return; + if (priority_policy_->shutting_down_) return absl::OkStatus(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update", priority_policy_.get(), name_.c_str(), this); @@ -697,7 +712,7 @@ void PriorityLb::ChildPriority::UpdateLocked( "[priority_lb %p] child %s (%p): updating child policy handler %p", priority_policy_.get(), name_.c_str(), this, child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } OrphanablePtr diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 0f194a29404..1075e9f6790 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -154,7 +154,7 @@ class RingHash : public LoadBalancingPolicy { absl::string_view name() const override { return kRingHash; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; private: @@ -817,7 +817,7 @@ void RingHash::ResetBackoffLocked() { } } -void RingHash::UpdateLocked(UpdateArgs args) { +absl::Status RingHash::UpdateLocked(UpdateArgs args) { config_ = std::move(args.config); ServerAddressList addresses; if (args.addresses.ok()) { @@ -831,9 +831,9 @@ void RingHash::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", this, args.addresses.status().ToString().c_str()); } - // If we already have a subchannel list, then ignore the resolver - // failure and keep using the existing list. - if (subchannel_list_ != nullptr) return; + // If we already have a subchannel list, then keep using the existing + // list, but still report back that the update was not accepted. + if (subchannel_list_ != nullptr) return args.addresses.status(); } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) && latest_pending_subchannel_list_ != nullptr) { @@ -866,12 +866,13 @@ void RingHash::UpdateLocked(UpdateArgs args) { channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); - } else { - // Otherwise, report IDLE. - subchannel_list_->UpdateRingHashConnectivityStateLocked( - /*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus()); + return status; } + // Otherwise, report IDLE. + subchannel_list_->UpdateRingHashConnectivityStateLocked( + /*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus()); } + return absl::OkStatus(); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 46c630cef7a..ceea9a2a874 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -208,7 +208,7 @@ class RlsLb : public LoadBalancingPolicy { explicit RlsLb(Args args); absl::string_view name() const override { return kRls; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; @@ -293,7 +293,7 @@ class RlsLb : public LoadBalancingPolicy { // // Both methods grab the data they need from the parent object. void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); - void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_); + absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_); void ExitIdleLocked() { if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); @@ -811,10 +811,10 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() { } } -void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() { +absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() { // If pending_config_ is not set, that means StartUpdate() failed, so // there's nothing to do here. - if (pending_config_ == nullptr) return; + if (pending_config_ == nullptr) return absl::OkStatus(); // If child policy doesn't yet exist, create it. if (child_policy_ == nullptr) { Args create_args; @@ -844,7 +844,7 @@ void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() { update_args.config = std::move(pending_config_); update_args.addresses = lb_policy_->addresses_; update_args.args = lb_policy_->channel_args_; - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } // @@ -1809,7 +1809,9 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) { // Now that we've released the lock, finish the update on any newly // created child policies. for (ChildPolicyWrapper* child : child_policies_to_finish_update) { - child->MaybeFinishUpdate(); + // TODO(roth): If the child reports an error with the update, we + // need to propagate that back to the resolver somehow. + (void)child->MaybeFinishUpdate(); } } @@ -1897,7 +1899,7 @@ RlsLb::RlsLb(Args args) } } -void RlsLb::UpdateLocked(UpdateArgs args) { +absl::Status RlsLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { gpr_log(GPR_INFO, "[rlslb %p] policy updated", this); } @@ -1988,19 +1990,28 @@ void RlsLb::UpdateLocked(UpdateArgs args) { } } // Now that we've released the lock, finish update of child policies. + std::vector errors; if (update_child_policies) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this); } for (auto& p : child_policy_map_) { - p.second->MaybeFinishUpdate(); + absl::Status status = p.second->MaybeFinishUpdate(); + if (!status.ok()) { + errors.emplace_back( + absl::StrCat("target ", p.first, ": ", status.ToString())); + } } } else if (created_default_child) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update", this); } - default_child_policy_->MaybeFinishUpdate(); + absl::Status status = default_child_policy_->MaybeFinishUpdate(); + if (!status.ok()) { + errors.emplace_back(absl::StrCat("target ", config_->default_target(), + ": ", status.ToString())); + } } update_in_progress_ = false; // In principle, we need to update the picker here only if the config @@ -2010,6 +2021,12 @@ void RlsLb::UpdateLocked(UpdateArgs args) { // remember to update the code here. So for now, we just unconditionally // update the picker here, even though it's probably redundant. UpdatePickerLocked(); + // Return status. + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } + return absl::OkStatus(); } void RlsLb::ExitIdleLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index febc30ee54f..87e028ba176 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -68,7 +68,7 @@ class RoundRobin : public LoadBalancingPolicy { absl::string_view name() const override { return kRoundRobin; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; private: @@ -266,7 +266,7 @@ void RoundRobin::ResetBackoffLocked() { } } -void RoundRobin::UpdateLocked(UpdateArgs args) { +absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { ServerAddressList addresses; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { @@ -279,9 +279,9 @@ void RoundRobin::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, args.addresses.status().ToString().c_str()); } - // If we already have a subchannel list, then ignore the resolver - // failure and keep using the existing list. - if (subchannel_list_ != nullptr) return; + // If we already have a subchannel list, then keep using the existing + // list, but still report back that the update was not accepted. + if (subchannel_list_ != nullptr) return args.addresses.status(); } // Create new subchannel list, replacing the previous pending list, if any. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && @@ -308,15 +308,17 @@ void RoundRobin::UpdateLocked(UpdateArgs args) { channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); + return status; } // Otherwise, if this is the initial update, immediately promote it to // subchannel_list_ and report CONNECTING. - else if (subchannel_list_.get() == nullptr) { + if (subchannel_list_.get() == nullptr) { subchannel_list_ = std::move(latest_pending_subchannel_list_); channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker"))); } + return absl::OkStatus(); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index e242fc6be0f..7b5c2e98d8e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -105,7 +105,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { absl::string_view name() const override { return kWeightedTarget; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; private: @@ -149,10 +149,10 @@ class WeightedTargetLb : public LoadBalancingPolicy { void Orphan() override; - void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, - absl::StatusOr addresses, - const std::string& resolution_note, - const ChannelArgs& args); + absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, + absl::StatusOr addresses, + const std::string& resolution_note, + const ChannelArgs& args); void ResetBackoffLocked(); void DeactivateLocked(); @@ -301,8 +301,8 @@ void WeightedTargetLb::ResetBackoffLocked() { for (auto& p : targets_) p.second->ResetBackoffLocked(); } -void WeightedTargetLb::UpdateLocked(UpdateArgs args) { - if (shutting_down_) return; +absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) { + if (shutting_down_) return absl::OkStatus(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this); } @@ -320,6 +320,7 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) { // Update all children. absl::StatusOr address_map = MakeHierarchicalAddressMap(args.addresses); + std::vector errors; for (const auto& p : config_->target_map()) { const std::string& name = p.first; const WeightedTargetLbConfig::ChildConfig& config = p.second; @@ -335,8 +336,12 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) { } else { addresses = address_map.status(); } - target->UpdateLocked(config, std::move(addresses), args.resolution_note, - args.args); + absl::Status status = target->UpdateLocked(config, std::move(addresses), + args.resolution_note, args.args); + if (!status.ok()) { + errors.emplace_back( + absl::StrCat("child ", name, ": ", status.ToString())); + } } update_in_progress_ = false; if (config_->target_map().empty()) { @@ -345,9 +350,15 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) { channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); - return; + return absl::OkStatus(); } UpdateStateLocked(); + // Return status. + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } + return absl::OkStatus(); } void WeightedTargetLb::UpdateStateLocked() { @@ -553,11 +564,11 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( return lb_policy; } -void WeightedTargetLb::WeightedChild::UpdateLocked( +absl::Status WeightedTargetLb::WeightedChild::UpdateLocked( const WeightedTargetLbConfig::ChildConfig& config, absl::StatusOr addresses, const std::string& resolution_note, const ChannelArgs& args) { - if (weighted_target_policy_->shutting_down_) return; + if (weighted_target_policy_->shutting_down_) return absl::OkStatus(); // Update child weight. weight_ = config.weight; // Reactivate if needed. @@ -587,7 +598,7 @@ void WeightedTargetLb::WeightedChild::UpdateLocked( weighted_target_policy_.get(), this, name_.c_str(), child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } void WeightedTargetLb::WeightedChild::ResetBackoffLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 90b47fd6aec..fdccea5fb39 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -98,7 +98,7 @@ class CdsLb : public LoadBalancingPolicy { absl::string_view name() const override { return kCds; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; void ExitIdleLocked() override; @@ -310,7 +310,7 @@ void CdsLb::ExitIdleLocked() { if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); } -void CdsLb::UpdateLocked(UpdateArgs args) { +absl::Status CdsLb::UpdateLocked(UpdateArgs args) { // Update config. auto old_config = std::move(config_); config_ = std::move(args.config); @@ -338,6 +338,7 @@ void CdsLb::UpdateLocked(UpdateArgs args) { XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(), std::move(watcher)); } + return absl::OkStatus(); } // Generates the discovery mechanism config for the specified cluster name. @@ -554,7 +555,9 @@ void CdsLb::OnClusterChanged(const std::string& name, } else { args.args = args_; } - child_policy_->UpdateLocked(std::move(args)); + // TODO(roth): If the child policy reports an error with the update, + // we need to propagate the error to the resolver somehow. + (void)child_policy_->UpdateLocked(std::move(args)); } // Remove entries in watchers_ for any clusters not in clusters_added for (auto it = watchers_.begin(); it != watchers_.end();) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index c9f02d3482e..8bca406b7fd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -191,7 +191,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy { absl::string_view name() const override { return kXdsClusterImpl; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; @@ -269,9 +269,9 @@ class XdsClusterImplLb : public LoadBalancingPolicy { OrphanablePtr CreateChildPolicyLocked( const ChannelArgs& args); - void UpdateChildPolicyLocked(absl::StatusOr addresses, - std::string resolution_note, - const ChannelArgs& args); + absl::Status UpdateChildPolicyLocked( + absl::StatusOr addresses, std::string resolution_note, + const ChannelArgs& args); void MaybeUpdatePickerLocked(); @@ -483,7 +483,7 @@ void XdsClusterImplLb::ResetBackoffLocked() { if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } -void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { +absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this); } @@ -525,8 +525,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { MaybeUpdatePickerLocked(); } // Update child policy. - UpdateChildPolicyLocked(std::move(args.addresses), - std::move(args.resolution_note), args.args); + return UpdateChildPolicyLocked(std::move(args.addresses), + std::move(args.resolution_note), args.args); } void XdsClusterImplLb::MaybeUpdatePickerLocked() { @@ -582,7 +582,7 @@ OrphanablePtr XdsClusterImplLb::CreateChildPolicyLocked( return lb_policy; } -void XdsClusterImplLb::UpdateChildPolicyLocked( +absl::Status XdsClusterImplLb::UpdateChildPolicyLocked( absl::StatusOr addresses, std::string resolution_note, const ChannelArgs& args) { // Create policy if needed. @@ -602,7 +602,7 @@ void XdsClusterImplLb::UpdateChildPolicyLocked( "[xds_cluster_impl_lb %p] Updating child policy handler %p", this, child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index bd37f38cde1..bb7f4a768d8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -96,7 +96,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { absl::string_view name() const override { return kXdsClusterManager; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; @@ -144,9 +144,10 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { void Orphan() override; - void UpdateLocked(RefCountedPtr config, - const absl::StatusOr& addresses, - const ChannelArgs& args); + absl::Status UpdateLocked( + RefCountedPtr config, + const absl::StatusOr& addresses, + const ChannelArgs& args); void ExitIdleLocked(); void ResetBackoffLocked(); void DeactivateLocked(); @@ -274,8 +275,8 @@ void XdsClusterManagerLb::ResetBackoffLocked() { for (auto& p : children_) p.second->ResetBackoffLocked(); } -void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) { - if (shutting_down_) return; +absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) { + if (shutting_down_) return absl::OkStatus(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this); } @@ -291,6 +292,7 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) { } } // Add or update the children in the new config. + std::vector errors; for (const auto& p : config_->cluster_map()) { const std::string& name = p.first; const RefCountedPtr& config = p.second; @@ -299,10 +301,21 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) { child = MakeOrphanable(Ref(DEBUG_LOCATION, "ClusterChild"), name); } - child->UpdateLocked(config, args.addresses, args.args); + absl::Status status = + child->UpdateLocked(config, args.addresses, args.args); + if (!status.ok()) { + errors.emplace_back( + absl::StrCat("child ", name, ": ", status.ToString())); + } } update_in_progress_ = false; UpdateStateLocked(); + // Return status. + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } + return absl::OkStatus(); } void XdsClusterManagerLb::UpdateStateLocked() { @@ -470,11 +483,11 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked( return lb_policy; } -void XdsClusterManagerLb::ClusterChild::UpdateLocked( +absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked( RefCountedPtr config, const absl::StatusOr& addresses, const ChannelArgs& args) { - if (xds_cluster_manager_policy_->shutting_down_) return; + if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus(); // Update child weight. // Reactivate if needed. if (delayed_removal_timer_callback_pending_) { @@ -499,7 +512,7 @@ void XdsClusterManagerLb::ClusterChild::UpdateLocked( xds_cluster_manager_policy_.get(), this, name_.c_str(), child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 89aa549f287..c808b5b20f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -140,7 +140,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { absl::string_view name() const override { return kXdsClusterResolver; } - void UpdateLocked(UpdateArgs args) override; + absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; void ExitIdleLocked() override; @@ -382,7 +382,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { void MaybeDestroyChildPolicyLocked(); - void UpdateChildPolicyLocked(); + absl::Status UpdateChildPolicyLocked(); OrphanablePtr CreateChildPolicyLocked( const ChannelArgs& args); ServerAddressList CreateChildPolicyAddressesLocked(); @@ -612,7 +612,7 @@ void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() { } } -void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { +absl::Status XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this); } @@ -623,7 +623,8 @@ void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { // Update args. args_ = std::move(args.args); // Update child policy if needed. - if (child_policy_ != nullptr) UpdateChildPolicyLocked(); + absl::Status status; + if (child_policy_ != nullptr) status = UpdateChildPolicyLocked(); // Create endpoint watcher if needed. if (is_initial_update) { for (const auto& config : config_->discovery_mechanisms()) { @@ -649,6 +650,7 @@ void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { discovery_mechanism.discovery_mechanism->Start(); } } + return status; } void XdsClusterResolverLb::ResetBackoffLocked() { @@ -757,7 +759,9 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index, if (!mechanism.latest_update.has_value()) return; } // Update child policy. - UpdateChildPolicyLocked(); + // TODO(roth): If the child policy reports an error with the update, + // we need to propagate that error back to the resolver somehow. + (void)UpdateChildPolicyLocked(); } void XdsClusterResolverLb::OnError(size_t index, std::string resolution_note) { @@ -998,11 +1002,11 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { return std::move(*config); } -void XdsClusterResolverLb::UpdateChildPolicyLocked() { - if (shutting_down_) return; +absl::Status XdsClusterResolverLb::UpdateChildPolicyLocked() { + if (shutting_down_) return absl::OkStatus(); UpdateArgs update_args; update_args.config = CreateChildPolicyConfigLocked(); - if (update_args.config == nullptr) return; + if (update_args.config == nullptr) return absl::OkStatus(); update_args.addresses = CreateChildPolicyAddressesLocked(); update_args.resolution_note = CreateChildPolicyResolutionNoteLocked(); update_args.args = CreateChildPolicyArgsLocked(args_); @@ -1013,7 +1017,7 @@ void XdsClusterResolverLb::UpdateChildPolicyLocked() { gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p", this, child_policy_.get()); } - child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } ChannelArgs XdsClusterResolverLb::CreateChildPolicyArgsLocked( diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc index 9b4863f85cf..4b7dd24baae 100644 --- a/src/core/ext/filters/client_channel/resolver/polling_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.cc @@ -20,10 +20,13 @@ #include +#include #include +#include #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" @@ -36,6 +39,7 @@ #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/service_config/service_config.h" #include "src/core/lib/uri/uri_parser.h" namespace grpc_core { @@ -69,7 +73,16 @@ void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); } void PollingResolver::RequestReresolutionLocked() { if (request_ == nullptr) { - MaybeStartResolvingLocked(); + // If we're still waiting for a result-health callback from the last + // result we reported, don't trigger the re-resolution until we get + // that callback. + if (result_status_state_ == + ResultStatusState::kResultHealthCallbackPending) { + result_status_state_ = + ResultStatusState::kReresolutionRequestedWhileCallbackWasPending; + } else { + MaybeStartResolvingLocked(); + } } } @@ -126,44 +139,78 @@ void PollingResolver::OnRequestCompleteLocked(Result result) { } request_.reset(); if (!shutdown_) { - if (result.service_config.ok() && result.addresses.ok()) { - // Reset backoff state so that we start from the beginning when the - // next request gets triggered. - backoff_.Reset(); - } else { - if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { - gpr_log(GPR_INFO, - "[polling resolver %p] resolution failed (will retry): " - "address status \"%s\"; service config status \"%s\"", - this, result.addresses.status().ToString().c_str(), - result.service_config.status().ToString().c_str()); - } - // Set up for retry. - // InvalidateNow to avoid getting stuck re-initializing this timer - // in a loop while draining the currently-held WorkSerializer. - // Also see https://github.com/grpc/grpc/issues/26079. - ExecCtx::Get()->InvalidateNow(); - Timestamp next_try = backoff_.NextAttemptTime(); - Duration timeout = next_try - ExecCtx::Get()->Now(); - GPR_ASSERT(!have_next_resolution_timer_); - have_next_resolution_timer_ = true; - if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { - if (timeout > Duration::Zero()) { - gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms", - this, timeout.millis()); - } else { - gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this); - } - } - Ref(DEBUG_LOCATION, "next_resolution_timer").release(); - GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr); - grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_); + if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { + gpr_log(GPR_INFO, + "[polling resolver %p] returning result: " + "addresses=%s, service_config=%s", + this, + result.addresses.ok() + ? absl::StrCat("<", result.addresses->size(), " addresses>") + .c_str() + : result.addresses.status().ToString().c_str(), + result.service_config.ok() + ? (*result.service_config == nullptr + ? "" + : std::string((*result.service_config)->json_string()) + .c_str()) + : result.service_config.status().ToString().c_str()); } + GPR_ASSERT(result.result_health_callback == nullptr); + RefCountedPtr self = + Ref(DEBUG_LOCATION, "result_health_callback"); + result.result_health_callback = [self = + std::move(self)](absl::Status status) { + self->GetResultStatus(std::move(status)); + }; + result_status_state_ = ResultStatusState::kResultHealthCallbackPending; result_handler_->ReportResult(std::move(result)); } Unref(DEBUG_LOCATION, "OnRequestComplete"); } +void PollingResolver::GetResultStatus(absl::Status status) { + if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { + gpr_log(GPR_INFO, "[polling resolver %p] result status from channel: %s", + this, status.ToString().c_str()); + } + if (status.ok()) { + // Reset backoff state so that we start from the beginning when the + // next request gets triggered. + backoff_.Reset(); + // If a re-resolution attempt was requested while the result-status + // callback was pending, trigger a new request now. + if (std::exchange(result_status_state_, ResultStatusState::kNone) == + ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) { + MaybeStartResolvingLocked(); + } + } else { + // Set up for retry. + // InvalidateNow to avoid getting stuck re-initializing this timer + // in a loop while draining the currently-held WorkSerializer. + // Also see https://github.com/grpc/grpc/issues/26079. + ExecCtx::Get()->InvalidateNow(); + Timestamp next_try = backoff_.NextAttemptTime(); + Duration timeout = next_try - ExecCtx::Get()->Now(); + GPR_ASSERT(!have_next_resolution_timer_); + have_next_resolution_timer_ = true; + if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { + if (timeout > Duration::Zero()) { + gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms", + this, timeout.millis()); + } else { + gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this); + } + } + Ref(DEBUG_LOCATION, "next_resolution_timer").release(); + GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr); + grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_); + // Reset result_status_state_. Note that even if re-resolution was + // requested while the result-health callback was pending, we can + // ignore it here, because we are in backoff to re-resolve anyway. + result_status_state_ = ResultStatusState::kNone; + } +} + void PollingResolver::MaybeStartResolvingLocked() { // If there is an existing timer, the time it fires is the earliest time we // can start the next resolution. diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.h b/src/core/ext/filters/client_channel/resolver/polling_resolver.h index 9a2528c96f1..8abbec16cfd 100644 --- a/src/core/ext/filters/client_channel/resolver/polling_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.h @@ -22,6 +22,7 @@ #include #include +#include "absl/status/status.h" #include "absl/types/optional.h" #include "src/core/lib/backoff/backoff.h" @@ -77,6 +78,8 @@ class PollingResolver : public Resolver { void OnRequestCompleteLocked(Result result); + void GetResultStatus(absl::Status status); + static void OnNextResolution(void* arg, grpc_error_handle error); void OnNextResolutionLocked(grpc_error_handle error); @@ -105,6 +108,14 @@ class PollingResolver : public Resolver { absl::optional last_resolution_timestamp_; /// retry backoff state BackOff backoff_; + /// state for handling interactions between re-resolution requests and + /// result health callbacks + enum class ResultStatusState { + kNone, + kResultHealthCallbackPending, + kReresolutionRequestedWhileCallbackWasPending, + }; + ResultStatusState result_status_state_ = ResultStatusState::kNone; }; } // namespace grpc_core diff --git a/src/core/lib/load_balancing/lb_policy.h b/src/core/lib/load_balancing/lb_policy.h index 5a642c16c2f..96ed933136d 100644 --- a/src/core/lib/load_balancing/lb_policy.h +++ b/src/core/lib/load_balancing/lb_policy.h @@ -353,11 +353,11 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Updates the policy with new data from the resolver. Will be invoked /// immediately after LB policy is constructed, and then again whenever - /// the resolver returns a new result. - // TODO(roth): Change this to return some indication as to whether the - // update has been accepted, so that we can indicate to the resolver - // whether it should go into backoff to retry the resolution. - virtual void UpdateLocked(UpdateArgs) = 0; // NOLINT + /// the resolver returns a new result. The returned status indicates + /// whether the LB policy accepted the update; if non-OK, informs + /// polling-based resolvers that they should go into backoff delay and + /// eventually reattempt the resolution. + virtual absl::Status UpdateLocked(UpdateArgs) = 0; // NOLINT /// Tries to enter a READY connectivity state. /// This is a no-op by default, since most LB policies never go into diff --git a/src/core/lib/resolver/resolver.h b/src/core/lib/resolver/resolver.h index 255a12cd552..47096858800 100644 --- a/src/core/lib/resolver/resolver.h +++ b/src/core/lib/resolver/resolver.h @@ -19,8 +19,10 @@ #include +#include #include +#include "absl/status/status.h" #include "absl/status/statusor.h" #include "src/core/lib/channel/channel_args.h" @@ -67,6 +69,17 @@ class Resolver : public InternallyRefCounted { // TODO(roth): Before making this a public API, figure out a way to // avoid exposing channel args this way. ChannelArgs args; + // If non-null, this callback will be invoked when the LB policy has + // processed the result. The status value passed to the callback + // indicates whether the LB policy accepted the update. For polling + // resolvers, if the reported status is non-OK, then the resolver + // should put itself into backoff to retry the resolution later. + // The resolver impl must not call ResultHandler::ReportResult() + // again until after this callback has been invoked. + // The callback will be invoked within the channel's WorkSerializer. + // It may or may not be invoked before ResultHandler::ReportResult() + // returns, which is why it's a separate callback. + std::function result_health_callback; }; /// A proxy object used by the resolver to return results to the @@ -76,9 +89,6 @@ class Resolver : public InternallyRefCounted { virtual ~ResultHandler() {} /// Reports a result to the channel. - // TODO(roth): Add a mechanism for the resolver to get back a signal - // indicating if the result was accepted by the LB policy, so that it - // knows whether to go into backoff to retry to resolution. virtual void ReportResult(Result result) = 0; // NOLINT }; diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index c24e03eb9c7..1b2a6a49dae 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -243,7 +243,10 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { state_ = state; } - void ReportResult(grpc_core::Resolver::Result /*result*/) override { + void ReportResult(grpc_core::Resolver::Result result) override { + if (result.result_health_callback != nullptr) { + result.result_health_callback(absl::OkStatus()); + } ASSERT_NE(result_cb_, nullptr); ASSERT_NE(state_, nullptr); ResultCallback cb = result_cb_; diff --git a/test/core/end2end/tests/retry_lb_drop.cc b/test/core/end2end/tests/retry_lb_drop.cc index 51e24853bc2..c1c627464b1 100644 --- a/test/core/end2end/tests/retry_lb_drop.cc +++ b/test/core/end2end/tests/retry_lb_drop.cc @@ -59,9 +59,10 @@ class DropPolicy : public LoadBalancingPolicy { absl::string_view name() const override { return kDropPolicyName; } - void UpdateLocked(UpdateArgs) override { + absl::Status UpdateLocked(UpdateArgs) override { channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(), absl::make_unique()); + return absl::OkStatus(); } void ResetBackoffLocked() override {} diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index aad85eb157f..c45f78e3606 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -76,8 +76,8 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy { ~ForwardingLoadBalancingPolicy() override = default; - void UpdateLocked(UpdateArgs args) override { - delegate_->UpdateLocked(std::move(args)); + absl::Status UpdateLocked(UpdateArgs args) override { + return delegate_->UpdateLocked(std::move(args)); } void ExitIdleLocked() override { delegate_->ExitIdleLocked(); } @@ -454,7 +454,7 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { absl::string_view name() const override { return kFixedAddressLbPolicyName; } - void UpdateLocked(UpdateArgs args) override { + absl::Status UpdateLocked(UpdateArgs args) override { auto* config = static_cast(args.config.get()); gpr_log(GPR_INFO, "%s: update URI: %s", kFixedAddressLbPolicyName, config->address().c_str()); @@ -471,7 +471,7 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { kFixedAddressLbPolicyName, uri.status().ToString().c_str()); args.resolution_note = "no address in fixed_address_lb policy"; } - ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args)); + return ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args)); } private: @@ -671,10 +671,11 @@ class FailPolicy : public LoadBalancingPolicy { absl::string_view name() const override { return kFailPolicyName; } - void UpdateLocked(UpdateArgs) override { + absl::Status UpdateLocked(UpdateArgs) override { channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status_, absl::make_unique(status_, pick_counter_)); + return absl::OkStatus(); } void ResetBackoffLocked() override {} diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e24e2b5afef..7c714fd4395 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -1259,6 +1259,39 @@ TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) { WaitForServer(DEBUG_LOCATION, stub, 0); } +TEST_F(PickFirstTest, FailsEmptyResolverUpdate) { + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("pick_first", response_generator); + auto stub = BuildStub(channel); + gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******"); + // Send a resolver result with an empty address list and a callback + // that triggers a notification. + absl::Notification notification; + grpc_core::Resolver::Result result; + result.addresses.emplace(); + result.result_health_callback = [&](absl::Status status) { + EXPECT_EQ(absl::StatusCode::kUnavailable, status.code()); + EXPECT_EQ("address list must not be empty", status.message()) << status; + notification.Notify(); + }; + response_generator.SetResponse(std::move(result)); + // Wait for channel to report TRANSIENT_FAILURE. + gpr_log(GPR_INFO, "****** TELLING CHANNEL TO CONNECT *******"); + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_TRANSIENT_FAILURE; + }; + EXPECT_TRUE( + WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true)); + // Callback should have been run. + ASSERT_TRUE(notification.HasBeenNotified()); + // Return a valid address. + gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******"); + StartServers(1); + response_generator.SetNextResolution(GetServersPorts()); + gpr_log(GPR_INFO, "****** SENDING WAIT_FOR_READY RPC *******"); + CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true); +} + TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { std::vector ports = {grpc_pick_unused_port_or_die()}; StartServers(1, ports); @@ -1637,6 +1670,40 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) { WaitForServer(DEBUG_LOCATION, stub, 2); } +TEST_F(RoundRobinTest, FailsEmptyResolverUpdate) { + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******"); + // Send a resolver result with an empty address list and a callback + // that triggers a notification. + absl::Notification notification; + grpc_core::Resolver::Result result; + result.addresses.emplace(); + result.resolution_note = "injected error"; + result.result_health_callback = [&](absl::Status status) { + EXPECT_EQ(absl::StatusCode::kUnavailable, status.code()); + EXPECT_EQ("empty address list: injected error", status.message()) << status; + notification.Notify(); + }; + response_generator.SetResponse(std::move(result)); + // Wait for channel to report TRANSIENT_FAILURE. + gpr_log(GPR_INFO, "****** TELLING CHANNEL TO CONNECT *******"); + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_TRANSIENT_FAILURE; + }; + EXPECT_TRUE( + WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true)); + // Callback should have been run. + ASSERT_TRUE(notification.HasBeenNotified()); + // Return a valid address. + gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******"); + StartServers(1); + response_generator.SetNextResolution(GetServersPorts()); + gpr_log(GPR_INFO, "****** SENDING WAIT_FOR_READY RPC *******"); + CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true); +} + TEST_F(RoundRobinTest, TransientFailure) { // Start servers and create channel. Channel should go to READY state. const int kNumServers = 3;