From 76c93769e2fbf28ac8b7d9b4a69598691e396e53 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 1 May 2024 10:04:11 -0700 Subject: [PATCH] [LB policies] fix handling of UpdateLocked() result (#36463) This fixes some TODOs added in #30809 for cases where LB policies lazily create child policies. Credit to @ejona86 for pointing out that simply calling `RequestReresolution()` in this case will ultimately result in the exponential backoff behavior we want. This also adds some missing plumbing in code added as part of the dualstack work (in the endpoint_list library and in ring_hash) to propagate non-OK statuses from `UpdateLocked()`. When I first made the dualstack changes, I didn't bother with this plumbing, because there are no cases today where these code-paths will actually see a non-OK status (`EndpointAddresses` won't allow creating an endpoint with 0 addresses, and that's the only case where pick_first will return a non-OK status), and I wasn't sure if we would stick with the approach of returning status from `UpdateLocked()` due to the aforementioned lazy creation case. However, now that we have a good solution for the lazy creation case, I've added the necessary plumbing, just so that we don't have a bug if in the future pick_first winds up returning non-OK status in some other case. I have not bothered to fix the propagation in the grpclb policy, since that looked like it would be slightly more work than it's really worth at this point. Closes #36463 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36463 from markdroth:lb_reresolve_for_lazy_child_creation 49043b2d80b52134294f1ac4b84c56441edd218b PiperOrigin-RevId: 629755047 --- src/core/load_balancing/endpoint_list.cc | 6 +-- src/core/load_balancing/endpoint_list.h | 21 +++++++---- src/core/load_balancing/priority/priority.cc | 11 ++++-- .../load_balancing/ring_hash/ring_hash.cc | 37 ++++++++++++++----- src/core/load_balancing/rls/rls.cc | 13 +++++-- .../load_balancing/round_robin/round_robin.cc | 21 ++++++++--- .../weighted_round_robin.cc | 20 +++++++--- 7 files changed, 92 insertions(+), 37 deletions(-) diff --git a/src/core/load_balancing/endpoint_list.cc b/src/core/load_balancing/endpoint_list.cc index f86d5af16ff..5b952490740 100644 --- a/src/core/load_balancing/endpoint_list.cc +++ b/src/core/load_balancing/endpoint_list.cc @@ -87,7 +87,7 @@ class EndpointList::Endpoint::Helper final // EndpointList::Endpoint // -void EndpointList::Endpoint::Init( +absl::Status EndpointList::Endpoint::Init( const EndpointAddresses& addresses, const ChannelArgs& args, std::shared_ptr work_serializer) { ChannelArgs child_args = @@ -123,9 +123,7 @@ void EndpointList::Endpoint::Init( update_args.addresses = std::make_shared(addresses); update_args.args = child_args; update_args.config = std::move(*config); - // TODO(roth): If the child reports a non-OK status with the update, - // we need to propagate that back to the resolver somehow. - (void)child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } void EndpointList::Endpoint::Orphan() { diff --git a/src/core/load_balancing/endpoint_list.h b/src/core/load_balancing/endpoint_list.h index daf4f1fe366..cd8276bf51a 100644 --- a/src/core/load_balancing/endpoint_list.h +++ b/src/core/load_balancing/endpoint_list.h @@ -53,7 +53,8 @@ class MyEndpointList : public EndpointList { public: MyEndpointList(RefCountedPtr lb_policy, EndpointAddressesIterator* endpoints, - const ChannelArgs& args) + const ChannelArgs& args, + std::vector* errors) : EndpointList(std::move(lb_policy), GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer) ? "MyEndpointList" @@ -63,7 +64,7 @@ class MyEndpointList : public EndpointList { const EndpointAddresses& addresses, const ChannelArgs& args) { return MakeOrphanable( std::move(endpoint_list), addresses, args, - policy()->work_serializer()); + policy()->work_serializer(), errors); }); } @@ -71,10 +72,15 @@ class MyEndpointList : public EndpointList { class MyEndpoint : public Endpoint { public: MyEndpoint(RefCountedPtr endpoint_list, - const EndpointAddresses& address, const ChannelArgs& args, - std::shared_ptr work_serializer) + const EndpointAddresses& addresses, const ChannelArgs& args, + std::shared_ptr work_serializer, + std::vector* errors) : Endpoint(std::move(endpoint_list)) { - Init(addresses, args, std::move(work_serializer)); + absl::Status status = Init(addresses, args, std::move(work_serializer)); + if (!status.ok()) { + errors->emplace_back(absl::StrCat( + "endpoint ", addresses.ToString(), ": ", status.ToString())); + } } private: @@ -120,8 +126,9 @@ class EndpointList : public InternallyRefCounted { explicit Endpoint(RefCountedPtr endpoint_list) : endpoint_list_(std::move(endpoint_list)) {} - void Init(const EndpointAddresses& addresses, const ChannelArgs& args, - std::shared_ptr work_serializer); + absl::Status Init(const EndpointAddresses& addresses, + const ChannelArgs& args, + std::shared_ptr work_serializer); // Templated for convenience, to provide a short-hand for // down-casting in the caller. diff --git a/src/core/load_balancing/priority/priority.cc b/src/core/load_balancing/priority/priority.cc index ac3eea9c371..0ccfe20c953 100644 --- a/src/core/load_balancing/priority/priority.cc +++ b/src/core/load_balancing/priority/priority.cc @@ -415,11 +415,16 @@ void PriorityLb::ChoosePriorityLocked() { child_name); auto child_config = config_->children().find(child_name); GPR_DEBUG_ASSERT(child_config != config_->children().end()); - // TODO(roth): If the child reports a non-OK status with the - // update, we need to propagate that back to the resolver somehow. - (void)child->UpdateLocked( + // If the child policy returns a non-OK status, request re-resolution. + // Note that this will initially cause fixed backoff delay in the + // resolver instead of exponential delay. However, once the + // resolver returns the initial re-resolution, we will be able to + // return non-OK from UpdateLocked(), which will trigger + // exponential backoff instead. + absl::Status status = child->UpdateLocked( child_config->second.config, child_config->second.ignore_reresolution_requests); + if (!status.ok()) channel_control_helper()->RequestReresolution(); } else { // The child already exists. Reactivate if needed. child->MaybeReactivateLocked(); diff --git a/src/core/load_balancing/ring_hash/ring_hash.cc b/src/core/load_balancing/ring_hash/ring_hash.cc index 41782c70655..667b23440cc 100644 --- a/src/core/load_balancing/ring_hash/ring_hash.cc +++ b/src/core/load_balancing/ring_hash/ring_hash.cc @@ -169,7 +169,7 @@ class RingHash final : public LoadBalancingPolicy { size_t index() const { return index_; } - void UpdateLocked(size_t index); + absl::Status UpdateLocked(size_t index); grpc_connectivity_state connectivity_state() const { return connectivity_state_; @@ -196,7 +196,7 @@ class RingHash final : public LoadBalancingPolicy { class Helper; void CreateChildPolicy(); - void UpdateChildPolicyLocked(); + absl::Status UpdateChildPolicyLocked(); // Called when the child policy reports a connectivity state update. void OnStateUpdate(grpc_connectivity_state new_state, @@ -498,9 +498,10 @@ void RingHash::RingHashEndpoint::Orphan() { Unref(); } -void RingHash::RingHashEndpoint::UpdateLocked(size_t index) { +absl::Status RingHash::RingHashEndpoint::UpdateLocked(size_t index) { index_ = index; - if (child_policy_ != nullptr) UpdateChildPolicyLocked(); + if (child_policy_ == nullptr) return absl::OkStatus(); + return UpdateChildPolicyLocked(); } void RingHash::RingHashEndpoint::ResetBackoffLocked() { @@ -541,10 +542,19 @@ void RingHash::RingHashEndpoint::CreateChildPolicy() { // this policy, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), ring_hash_->interested_parties()); - UpdateChildPolicyLocked(); + // If the child policy returns a non-OK status, request re-resolution. + // Note that this will initially cause fixed backoff delay in the + // resolver instead of exponential delay. However, once the + // resolver returns the initial re-resolution, we will be able to + // return non-OK from UpdateLocked(), which will trigger + // exponential backoff instead. + absl::Status status = UpdateChildPolicyLocked(); + if (!status.ok()) { + ring_hash_->channel_control_helper()->RequestReresolution(); + } } -void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() { +absl::Status RingHash::RingHashEndpoint::UpdateChildPolicyLocked() { // Construct pick_first config. auto config = CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( @@ -557,9 +567,7 @@ void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() { std::make_shared(ring_hash_->endpoints_[index_]); update_args.args = ring_hash_->args_; update_args.config = std::move(*config); - // TODO(roth): If the child reports a non-OK status with the update, - // we need to propagate that back to the resolver somehow. - (void)child_policy_->UpdateLocked(std::move(update_args)); + return child_policy_->UpdateLocked(std::move(update_args)); } void RingHash::RingHashEndpoint::OnStateUpdate( @@ -667,13 +675,18 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) { this, static_cast(args.config.get())); // Update endpoint map. std::map> endpoint_map; + std::vector errors; for (size_t i = 0; i < endpoints_.size(); ++i) { const EndpointAddresses& addresses = endpoints_[i]; const EndpointAddressSet address_set(addresses.addresses()); // If present in old map, retain it; otherwise, create a new one. auto it = endpoint_map_.find(address_set); if (it != endpoint_map_.end()) { - it->second->UpdateLocked(i); + absl::Status status = it->second->UpdateLocked(i); + if (!status.ok()) { + errors.emplace_back(absl::StrCat("endpoint ", address_set.ToString(), + ": ", status.ToString())); + } endpoint_map.emplace(address_set, std::move(it->second)); } else { endpoint_map.emplace(address_set, MakeOrphanable( @@ -695,6 +708,10 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) { // Return a new picker. UpdateAggregatedConnectivityStateLocked(/*entered_transient_failure=*/false, absl::OkStatus()); + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } return absl::OkStatus(); } diff --git a/src/core/load_balancing/rls/rls.cc b/src/core/load_balancing/rls/rls.cc index 93de1b7ae9f..1207daac8c3 100644 --- a/src/core/load_balancing/rls/rls.cc +++ b/src/core/load_balancing/rls/rls.cc @@ -1860,9 +1860,16 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) { // Now that we've released the lock, finish the update on any newly // created child policies. for (ChildPolicyWrapper* child : child_policies_to_finish_update) { - // TODO(roth): If the child reports an error with the update, we - // need to propagate that back to the resolver somehow. - (void)child->MaybeFinishUpdate(); + // If the child policy returns a non-OK status, request re-resolution. + // Note that this will initially cause fixed backoff delay in the + // resolver instead of exponential delay. However, once the + // resolver returns the initial re-resolution, we will be able to + // return non-OK from UpdateLocked(), which will trigger + // exponential backoff instead. + absl::Status status = child->MaybeFinishUpdate(); + if (!status.ok()) { + lb_policy_->channel_control_helper()->RequestReresolution(); + } } } diff --git a/src/core/load_balancing/round_robin/round_robin.cc b/src/core/load_balancing/round_robin/round_robin.cc index 85d524e10f3..a232b3fdc17 100644 --- a/src/core/load_balancing/round_robin/round_robin.cc +++ b/src/core/load_balancing/round_robin/round_robin.cc @@ -72,7 +72,8 @@ class RoundRobin final : public LoadBalancingPolicy { public: RoundRobinEndpointList(RefCountedPtr round_robin, EndpointAddressesIterator* endpoints, - const ChannelArgs& args) + const ChannelArgs& args, + std::vector* errors) : EndpointList(std::move(round_robin), GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) ? "RoundRobinEndpointList" @@ -82,7 +83,7 @@ class RoundRobin final : public LoadBalancingPolicy { const EndpointAddresses& addresses, const ChannelArgs& args) { return MakeOrphanable( std::move(endpoint_list), addresses, args, - policy()->work_serializer()); + policy()->work_serializer(), errors); }); } @@ -92,9 +93,14 @@ class RoundRobin final : public LoadBalancingPolicy { RoundRobinEndpoint(RefCountedPtr endpoint_list, const EndpointAddresses& addresses, const ChannelArgs& args, - std::shared_ptr work_serializer) + std::shared_ptr work_serializer, + std::vector* errors) : Endpoint(std::move(endpoint_list)) { - Init(addresses, args, std::move(work_serializer)); + absl::Status status = Init(addresses, args, std::move(work_serializer)); + if (!status.ok()) { + errors->emplace_back(absl::StrCat("endpoint ", addresses.ToString(), + ": ", status.ToString())); + } } private: @@ -255,9 +261,10 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this, latest_pending_endpoint_list_.get()); } + std::vector errors; latest_pending_endpoint_list_ = MakeOrphanable( RefAsSubclass(DEBUG_LOCATION, "RoundRobinEndpointList"), - addresses, args.args); + addresses, args.args, &errors); // If the new list is empty, immediately promote it to // endpoint_list_ and report TRANSIENT_FAILURE. if (latest_pending_endpoint_list_->size() == 0) { @@ -281,6 +288,10 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { if (endpoint_list_ == nullptr) { endpoint_list_ = std::move(latest_pending_endpoint_list_); } + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } return absl::OkStatus(); } diff --git a/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc b/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc index b93f51f410a..73311eb07eb 100644 --- a/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc @@ -221,11 +221,16 @@ class WeightedRoundRobin final : public LoadBalancingPolicy { public: WrrEndpoint(RefCountedPtr endpoint_list, const EndpointAddresses& addresses, const ChannelArgs& args, - std::shared_ptr work_serializer) + std::shared_ptr work_serializer, + std::vector* errors) : Endpoint(std::move(endpoint_list)), weight_(policy()->GetOrCreateWeight( addresses.addresses())) { - Init(addresses, args, std::move(work_serializer)); + absl::Status status = Init(addresses, args, std::move(work_serializer)); + if (!status.ok()) { + errors->emplace_back(absl::StrCat("endpoint ", addresses.ToString(), + ": ", status.ToString())); + } } RefCountedPtr weight() const { return weight_; } @@ -261,7 +266,7 @@ class WeightedRoundRobin final : public LoadBalancingPolicy { WrrEndpointList(RefCountedPtr wrr, EndpointAddressesIterator* endpoints, - const ChannelArgs& args) + const ChannelArgs& args, std::vector* errors) : EndpointList(std::move(wrr), GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) ? "WrrEndpointList" @@ -271,7 +276,7 @@ class WeightedRoundRobin final : public LoadBalancingPolicy { const EndpointAddresses& addresses, const ChannelArgs& args) { return MakeOrphanable( std::move(endpoint_list), addresses, args, - policy()->work_serializer()); + policy()->work_serializer(), errors); }); } @@ -767,8 +772,9 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[WRR %p] replacing previous pending endpoint list %p", this, latest_pending_endpoint_list_.get()); } + std::vector errors; latest_pending_endpoint_list_ = MakeOrphanable( - RefAsSubclass(), addresses.get(), args.args); + RefAsSubclass(), addresses.get(), args.args, &errors); // If the new list is empty, immediately promote it to // endpoint_list_ and report TRANSIENT_FAILURE. if (latest_pending_endpoint_list_->size() == 0) { @@ -792,6 +798,10 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { if (endpoint_list_.get() == nullptr) { endpoint_list_ = std::move(latest_pending_endpoint_list_); } + if (!errors.empty()) { + return absl::UnavailableError(absl::StrCat( + "errors from children: [", absl::StrJoin(errors, "; "), "]")); + } return absl::OkStatus(); }