client_channel: allow LB policy to communicate update errors to resolver (#30809)

* client_channel: allow LB policy to communicate update errors to resolver

* fix tests

* Automated change: Fix sanity tests

* fix build

* fix another "ignoring return value" warning

* fix use-after-move

* fix channel to invoke resolver callback when service config fails on initial resolution

* remove outdated TODO

* improve comments

* fix PollingResolver to defer re-resolution requests while waiting for result-health callback

* Automated change: Fix sanity tests

* absl::exchange -> std::exchange

* fix dns_resolver_cooldown_test

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30956/head
Mark D. Roth 2 years ago committed by GitHub
parent aaec373a10
commit 9ff943b81e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 15
      src/core/ext/filters/client_channel/client_channel.cc
  3. 2
      src/core/ext/filters/client_channel/client_channel.h
  4. 4
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  5. 3
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
  6. 24
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 6
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  8. 15
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  9. 37
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  10. 19
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  11. 35
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  12. 14
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  13. 37
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  14. 9
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  15. 18
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  16. 33
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  17. 22
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  18. 113
      src/core/ext/filters/client_channel/resolver/polling_resolver.cc
  19. 11
      src/core/ext/filters/client_channel/resolver/polling_resolver.h
  20. 10
      src/core/lib/load_balancing/lb_policy.h
  21. 16
      src/core/lib/resolver/resolver.h
  22. 5
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  23. 3
      test/core/end2end/tests/retry_lb_drop.cc
  24. 11
      test/core/util/test_lb_policies.cc
  25. 67
      test/cpp/end2end/client_lb_end2end_test.cc

@ -5451,6 +5451,7 @@ grpc_cc_library(
"gpr", "gpr",
"grpc_base", "grpc_base",
"grpc_resolver", "grpc_resolver",
"grpc_service_config",
"grpc_trace", "grpc_trace",
"iomgr_fwd", "iomgr_fwd",
"iomgr_timer", "iomgr_timer",

@ -1171,6 +1171,9 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: got resolver result", this); gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
} }
// Grab resolver result health callback.
auto resolver_callback = std::move(result.result_health_callback);
absl::Status resolver_result_status;
// We only want to trace the address resolution in the follow cases: // We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change. // (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from // (b) Address resolution that causes number of backends to go from
@ -1222,6 +1225,8 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
// TRANSIENT_FAILURE. // TRANSIENT_FAILURE.
OnResolverErrorLocked(result.service_config.status()); OnResolverErrorLocked(result.service_config.status());
trace_strings.push_back("no valid service config"); trace_strings.push_back("no valid service config");
resolver_result_status =
absl::UnavailableError("no valid service config");
} }
} else if (*result.service_config == nullptr) { } else if (*result.service_config == nullptr) {
// Resolver did not return any service config. // Resolver did not return any service config.
@ -1266,7 +1271,7 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
gpr_log(GPR_INFO, "chand=%p: service config not changed", this); gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
} }
// Create or update LB policy, as needed. // Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked( resolver_result_status = CreateOrUpdateLbPolicyLocked(
std::move(lb_policy_config), std::move(lb_policy_config),
parsed_service_config->health_check_service_name(), std::move(result)); parsed_service_config->health_check_service_name(), std::move(result));
if (service_config_changed || config_selector_changed) { if (service_config_changed || config_selector_changed) {
@ -1280,6 +1285,10 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
trace_strings.push_back("Service config changed"); trace_strings.push_back("Service config changed");
} }
} }
// Invoke resolver callback if needed.
if (resolver_callback != nullptr) {
resolver_callback(std::move(resolver_result_status));
}
// Add channel trace event. // Add channel trace event.
if (!trace_strings.empty()) { if (!trace_strings.empty()) {
std::string message = std::string message =
@ -1326,7 +1335,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
} }
} }
void ClientChannel::CreateOrUpdateLbPolicyLocked( absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name, const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) { Resolver::Result result) {
@ -1353,7 +1362,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked(
gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this, gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
lb_policy_.get()); lb_policy_.get());
} }
lb_policy_->UpdateLocked(std::move(update_args)); return lb_policy_->UpdateLocked(std::move(update_args));
} }
// Creates a new LB policy. // Creates a new LB policy.

@ -250,7 +250,7 @@ class ClientChannel {
void OnResolverErrorLocked(absl::Status status) void OnResolverErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void CreateOrUpdateLbPolicyLocked( absl::Status CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name, const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);

@ -161,7 +161,7 @@ void ChildPolicyHandler::ShutdownLocked() {
} }
} }
void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// If the child policy name changes, we need to create a new child // If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store // policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child // the new child policy in pending_child_policy_. Once the new child
@ -253,7 +253,7 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
policy_to_update == pending_child_policy_.get() ? "pending " : "", policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update); policy_to_update);
} }
policy_to_update->UpdateLocked(std::move(args)); return policy_to_update->UpdateLocked(std::move(args));
} }
void ChildPolicyHandler::ExitIdleLocked() { void ChildPolicyHandler::ExitIdleLocked() {

@ -20,6 +20,7 @@
#include <utility> #include <utility>
#include "absl/status/status.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -43,7 +44,7 @@ class ChildPolicyHandler : public LoadBalancingPolicy {
absl::string_view name() const override { return "child_policy_handler"; } absl::string_view name() const override { return "child_policy_handler"; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;

@ -188,7 +188,7 @@ class GrpcLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kGrpclb; } absl::string_view name() const override { return kGrpclb; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
@ -473,7 +473,7 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked() override; void ShutdownLocked() override;
// Helper functions used in UpdateLocked(). // Helper functions used in UpdateLocked().
void UpdateBalancerChannelLocked(const ChannelArgs& args); absl::Status UpdateBalancerChannelLocked(const ChannelArgs& args);
void CancelBalancerChannelConnectivityWatchLocked(); void CancelBalancerChannelConnectivityWatchLocked();
@ -1524,7 +1524,7 @@ void GrpcLb::ResetBackoffLocked() {
} }
} }
void GrpcLb::UpdateLocked(UpdateArgs args) { absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr; const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config; config_ = args.config;
GPR_ASSERT(config_ != nullptr); GPR_ASSERT(config_ != nullptr);
@ -1540,7 +1540,7 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
} }
resolution_note_ = std::move(args.resolution_note); resolution_note_ = std::move(args.resolution_note);
// Update balancer channel. // Update balancer channel.
UpdateBalancerChannelLocked(args.args); absl::Status status = UpdateBalancerChannelLocked(args.args);
// Update the existing child policy, if any. // Update the existing child policy, if any.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks // If this is the initial update, start the fallback-at-startup checks
@ -1565,18 +1565,24 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// Start balancer call. // Start balancer call.
StartBalancerCallLocked(); StartBalancerCallLocked();
} }
return status;
} }
// //
// helpers for UpdateLocked() // helpers for UpdateLocked()
// //
void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) { absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter. // since we use this to trigger the client_load_reporting filter.
args_ = args.Set(GRPC_ARG_LB_POLICY_NAME, "grpclb"); args_ = args.Set(GRPC_ARG_LB_POLICY_NAME, "grpclb");
// Construct args for balancer channel. // Get balancer addresses.
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args); ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
absl::Status status;
if (balancer_addresses.empty()) {
status = absl::UnavailableError("balancer address list must be non-empty");
}
// Construct args for balancer channel.
ChannelArgs lb_channel_args = ChannelArgs lb_channel_args =
BuildBalancerChannelArgs(response_generator_.get(), args); BuildBalancerChannelArgs(response_generator_.get(), args);
// Create balancer channel if needed. // Create balancer channel if needed.
@ -1604,6 +1610,8 @@ void GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
result.addresses = std::move(balancer_addresses); result.addresses = std::move(balancer_addresses);
result.args = lb_channel_args; result.args = lb_channel_args;
response_generator_->SetResponse(std::move(result)); response_generator_->SetResponse(std::move(result));
// Return status.
return status;
} }
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
@ -1794,7 +1802,9 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this, gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
child_policy_.get()); child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); // TODO(roth): If we're in fallback mode and the child policy rejects the
// update, we should propagate that failure back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args));
} }
// //

@ -126,7 +126,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kOutlierDetection; } absl::string_view name() const override { return kOutlierDetection; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
@ -595,7 +595,7 @@ void OutlierDetectionLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
} }
void OutlierDetectionLb::UpdateLocked(UpdateArgs args) { absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this); gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this);
} }
@ -692,7 +692,7 @@ void OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
"[outlier_detection_lb %p] Updating child policy handler %p", this, "[outlier_detection_lb %p] Updating child policy handler %p", this,
child_policy_.get()); child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
void OutlierDetectionLb::MaybeUpdatePickerLocked() { void OutlierDetectionLb::MaybeUpdatePickerLocked() {

@ -69,7 +69,7 @@ class PickFirst : public LoadBalancingPolicy {
absl::string_view name() const override { return kPickFirst; } absl::string_view name() const override { return kPickFirst; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
@ -232,7 +232,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
this, std::move(addresses), latest_update_args_.args); this, std::move(addresses), latest_update_args_.args);
latest_pending_subchannel_list_->StartWatchingLocked(); latest_pending_subchannel_list_->StartWatchingLocked();
// Empty update or no valid subchannels. Put the channel in // Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE. // TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->num_subchannels() == 0) { if (latest_pending_subchannel_list_->num_subchannels() == 0) {
absl::Status status = absl::Status status =
latest_update_args_.addresses.ok() latest_update_args_.addresses.ok()
@ -242,6 +242,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status)); absl::make_unique<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
} }
// Otherwise, if this is the initial update, report CONNECTING. // Otherwise, if this is the initial update, report CONNECTING.
else if (subchannel_list_.get() == nullptr) { else if (subchannel_list_.get() == nullptr) {
@ -263,7 +264,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
} }
} }
void PickFirst::UpdateLocked(UpdateArgs args) { absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) { if (args.addresses.ok()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -276,6 +277,13 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
} }
// Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. // Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1); args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
// Set return status based on the address list.
absl::Status status;
if (!args.addresses.ok()) {
status = args.addresses.status();
} else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty");
}
// If the update contains a resolver error and we have a previous update // If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses. // that was not a resolver error, keep using the previous addresses.
if (!args.addresses.ok() && latest_update_args_.config != nullptr) { if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
@ -288,6 +296,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
if (!idle_) { if (!idle_) {
AttemptToConnectUsingLatestUpdateArgsLocked(); AttemptToConnectUsingLatestUpdateArgsLocked();
} }
return status;
} }
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(

@ -110,7 +110,7 @@ class PriorityLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kPriority; } absl::string_view name() const override { return kPriority; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
@ -126,8 +126,8 @@ class PriorityLb : public LoadBalancingPolicy {
const std::string& name() const { return name_; } const std::string& name() const { return name_; }
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config, absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests); bool ignore_reresolution_requests);
void ExitIdleLocked(); void ExitIdleLocked();
void ResetBackoffLocked(); void ResetBackoffLocked();
void MaybeDeactivateLocked(); void MaybeDeactivateLocked();
@ -344,7 +344,7 @@ void PriorityLb::ResetBackoffLocked() {
for (const auto& p : children_) p.second->ResetBackoffLocked(); for (const auto& p : children_) p.second->ResetBackoffLocked();
} }
void PriorityLb::UpdateLocked(UpdateArgs args) { absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] received update", this); gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
} }
@ -357,6 +357,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
resolution_note_ = std::move(args.resolution_note); resolution_note_ = std::move(args.resolution_note);
// Check all existing children against the new config. // Check all existing children against the new config.
update_in_progress_ = true; update_in_progress_ = true;
std::vector<std::string> errors;
for (const auto& p : children_) { for (const auto& p : children_) {
const std::string& child_name = p.first; const std::string& child_name = p.first;
auto& child = p.second; auto& child = p.second;
@ -366,13 +367,24 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
child->MaybeDeactivateLocked(); child->MaybeDeactivateLocked();
} else { } else {
// Existing child found in new config. Update it. // Existing child found in new config. Update it.
child->UpdateLocked(config_it->second.config, absl::Status status =
config_it->second.ignore_reresolution_requests); child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", child_name, ": ", status.ToString()));
}
} }
} }
update_in_progress_ = false; update_in_progress_ = false;
// Try to get connected. // Try to get connected.
ChoosePriorityLocked(); ChoosePriorityLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
} }
uint32_t PriorityLb::GetChildPriorityLocked( uint32_t PriorityLb::GetChildPriorityLocked(
@ -416,8 +428,11 @@ void PriorityLb::ChoosePriorityLocked() {
Ref(DEBUG_LOCATION, "ChildPriority"), child_name); Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
auto child_config = config_->children().find(child_name); auto child_config = config_->children().find(child_name);
GPR_DEBUG_ASSERT(child_config != config_->children().end()); GPR_DEBUG_ASSERT(child_config != config_->children().end());
child->UpdateLocked(child_config->second.config, // TODO(roth): If the child reports a non-OK status with the
child_config->second.ignore_reresolution_requests); // update, we need to propagate that back to the resolver somehow.
(void)child->UpdateLocked(
child_config->second.config,
child_config->second.ignore_reresolution_requests);
} else { } else {
// The child already exists. Reactivate if needed. // The child already exists. Reactivate if needed.
child->MaybeReactivateLocked(); child->MaybeReactivateLocked();
@ -668,10 +683,10 @@ PriorityLb::ChildPriority::GetPicker() {
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_); return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
} }
void PriorityLb::ChildPriority::UpdateLocked( absl::Status PriorityLb::ChildPriority::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config, RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests) { bool ignore_reresolution_requests) {
if (priority_policy_->shutting_down_) return; if (priority_policy_->shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update", gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
priority_policy_.get(), name_.c_str(), this); priority_policy_.get(), name_.c_str(), this);
@ -697,7 +712,7 @@ void PriorityLb::ChildPriority::UpdateLocked(
"[priority_lb %p] child %s (%p): updating child policy handler %p", "[priority_lb %p] child %s (%p): updating child policy handler %p",
priority_policy_.get(), name_.c_str(), this, child_policy_.get()); priority_policy_.get(), name_.c_str(), this, child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
OrphanablePtr<LoadBalancingPolicy> OrphanablePtr<LoadBalancingPolicy>

@ -154,7 +154,7 @@ class RingHash : public LoadBalancingPolicy {
absl::string_view name() const override { return kRingHash; } absl::string_view name() const override { return kRingHash; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
@ -817,7 +817,7 @@ void RingHash::ResetBackoffLocked() {
} }
} }
void RingHash::UpdateLocked(UpdateArgs args) { absl::Status RingHash::UpdateLocked(UpdateArgs args) {
config_ = std::move(args.config); config_ = std::move(args.config);
ServerAddressList addresses; ServerAddressList addresses;
if (args.addresses.ok()) { if (args.addresses.ok()) {
@ -831,9 +831,9 @@ void RingHash::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
this, args.addresses.status().ToString().c_str()); this, args.addresses.status().ToString().c_str());
} }
// If we already have a subchannel list, then ignore the resolver // If we already have a subchannel list, then keep using the existing
// failure and keep using the existing list. // list, but still report back that the update was not accepted.
if (subchannel_list_ != nullptr) return; if (subchannel_list_ != nullptr) return args.addresses.status();
} }
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) && if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
latest_pending_subchannel_list_ != nullptr) { latest_pending_subchannel_list_ != nullptr) {
@ -866,12 +866,13 @@ void RingHash::UpdateLocked(UpdateArgs args) {
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status)); absl::make_unique<TransientFailurePicker>(status));
} else { return status;
// Otherwise, report IDLE.
subchannel_list_->UpdateRingHashConnectivityStateLocked(
/*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
} }
// Otherwise, report IDLE.
subchannel_list_->UpdateRingHashConnectivityStateLocked(
/*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
} }
return absl::OkStatus();
} }
// //

@ -208,7 +208,7 @@ class RlsLb : public LoadBalancingPolicy {
explicit RlsLb(Args args); explicit RlsLb(Args args);
absl::string_view name() const override { return kRls; } absl::string_view name() const override { return kRls; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
@ -293,7 +293,7 @@ class RlsLb : public LoadBalancingPolicy {
// //
// Both methods grab the data they need from the parent object. // Both methods grab the data they need from the parent object.
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
void MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_); absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
void ExitIdleLocked() { void ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
@ -811,10 +811,10 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() {
} }
} }
void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() { absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
// If pending_config_ is not set, that means StartUpdate() failed, so // If pending_config_ is not set, that means StartUpdate() failed, so
// there's nothing to do here. // there's nothing to do here.
if (pending_config_ == nullptr) return; if (pending_config_ == nullptr) return absl::OkStatus();
// If child policy doesn't yet exist, create it. // If child policy doesn't yet exist, create it.
if (child_policy_ == nullptr) { if (child_policy_ == nullptr) {
Args create_args; Args create_args;
@ -844,7 +844,7 @@ void RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
update_args.config = std::move(pending_config_); update_args.config = std::move(pending_config_);
update_args.addresses = lb_policy_->addresses_; update_args.addresses = lb_policy_->addresses_;
update_args.args = lb_policy_->channel_args_; update_args.args = lb_policy_->channel_args_;
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
// //
@ -1809,7 +1809,9 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
// Now that we've released the lock, finish the update on any newly // Now that we've released the lock, finish the update on any newly
// created child policies. // created child policies.
for (ChildPolicyWrapper* child : child_policies_to_finish_update) { for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
child->MaybeFinishUpdate(); // TODO(roth): If the child reports an error with the update, we
// need to propagate that back to the resolver somehow.
(void)child->MaybeFinishUpdate();
} }
} }
@ -1897,7 +1899,7 @@ RlsLb::RlsLb(Args args)
} }
} }
void RlsLb::UpdateLocked(UpdateArgs args) { absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] policy updated", this); gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
} }
@ -1988,19 +1990,28 @@ void RlsLb::UpdateLocked(UpdateArgs args) {
} }
} }
// Now that we've released the lock, finish update of child policies. // Now that we've released the lock, finish update of child policies.
std::vector<std::string> errors;
if (update_child_policies) { if (update_child_policies) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this); gpr_log(GPR_INFO, "[rlslb %p] finishing child policy updates", this);
} }
for (auto& p : child_policy_map_) { for (auto& p : child_policy_map_) {
p.second->MaybeFinishUpdate(); absl::Status status = p.second->MaybeFinishUpdate();
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("target ", p.first, ": ", status.ToString()));
}
} }
} else if (created_default_child) { } else if (created_default_child) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update", gpr_log(GPR_INFO, "[rlslb %p] finishing default child policy update",
this); this);
} }
default_child_policy_->MaybeFinishUpdate(); absl::Status status = default_child_policy_->MaybeFinishUpdate();
if (!status.ok()) {
errors.emplace_back(absl::StrCat("target ", config_->default_target(),
": ", status.ToString()));
}
} }
update_in_progress_ = false; update_in_progress_ = false;
// In principle, we need to update the picker here only if the config // In principle, we need to update the picker here only if the config
@ -2010,6 +2021,12 @@ void RlsLb::UpdateLocked(UpdateArgs args) {
// remember to update the code here. So for now, we just unconditionally // remember to update the code here. So for now, we just unconditionally
// update the picker here, even though it's probably redundant. // update the picker here, even though it's probably redundant.
UpdatePickerLocked(); UpdatePickerLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
} }
void RlsLb::ExitIdleLocked() { void RlsLb::ExitIdleLocked() {

@ -68,7 +68,7 @@ class RoundRobin : public LoadBalancingPolicy {
absl::string_view name() const override { return kRoundRobin; } absl::string_view name() const override { return kRoundRobin; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
@ -266,7 +266,7 @@ void RoundRobin::ResetBackoffLocked() {
} }
} }
void RoundRobin::UpdateLocked(UpdateArgs args) { absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses; ServerAddressList addresses;
if (args.addresses.ok()) { if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
@ -279,9 +279,9 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str()); args.addresses.status().ToString().c_str());
} }
// If we already have a subchannel list, then ignore the resolver // If we already have a subchannel list, then keep using the existing
// failure and keep using the existing list. // list, but still report back that the update was not accepted.
if (subchannel_list_ != nullptr) return; if (subchannel_list_ != nullptr) return args.addresses.status();
} }
// Create new subchannel list, replacing the previous pending list, if any. // Create new subchannel list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
@ -308,15 +308,17 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status)); absl::make_unique<TransientFailurePicker>(status));
return status;
} }
// Otherwise, if this is the initial update, immediately promote it to // Otherwise, if this is the initial update, immediately promote it to
// subchannel_list_ and report CONNECTING. // subchannel_list_ and report CONNECTING.
else if (subchannel_list_.get() == nullptr) { if (subchannel_list_.get() == nullptr) {
subchannel_list_ = std::move(latest_pending_subchannel_list_); subchannel_list_ = std::move(latest_pending_subchannel_list_);
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(), GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"))); absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
} }
return absl::OkStatus();
} }
// //

@ -105,7 +105,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kWeightedTarget; } absl::string_view name() const override { return kWeightedTarget; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
@ -149,10 +149,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void Orphan() override; void Orphan() override;
void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<ServerAddressList> addresses, absl::StatusOr<ServerAddressList> addresses,
const std::string& resolution_note, const std::string& resolution_note,
const ChannelArgs& args); const ChannelArgs& args);
void ResetBackoffLocked(); void ResetBackoffLocked();
void DeactivateLocked(); void DeactivateLocked();
@ -301,8 +301,8 @@ void WeightedTargetLb::ResetBackoffLocked() {
for (auto& p : targets_) p.second->ResetBackoffLocked(); for (auto& p : targets_) p.second->ResetBackoffLocked();
} }
void WeightedTargetLb::UpdateLocked(UpdateArgs args) { absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return; if (shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this); gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
} }
@ -320,6 +320,7 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
// Update all children. // Update all children.
absl::StatusOr<HierarchicalAddressMap> address_map = absl::StatusOr<HierarchicalAddressMap> address_map =
MakeHierarchicalAddressMap(args.addresses); MakeHierarchicalAddressMap(args.addresses);
std::vector<std::string> errors;
for (const auto& p : config_->target_map()) { for (const auto& p : config_->target_map()) {
const std::string& name = p.first; const std::string& name = p.first;
const WeightedTargetLbConfig::ChildConfig& config = p.second; const WeightedTargetLbConfig::ChildConfig& config = p.second;
@ -335,8 +336,12 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
} else { } else {
addresses = address_map.status(); addresses = address_map.status();
} }
target->UpdateLocked(config, std::move(addresses), args.resolution_note, absl::Status status = target->UpdateLocked(config, std::move(addresses),
args.args); args.resolution_note, args.args);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", name, ": ", status.ToString()));
}
} }
update_in_progress_ = false; update_in_progress_ = false;
if (config_->target_map().empty()) { if (config_->target_map().empty()) {
@ -345,9 +350,15 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status)); absl::make_unique<TransientFailurePicker>(status));
return; return absl::OkStatus();
} }
UpdateStateLocked(); UpdateStateLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
} }
void WeightedTargetLb::UpdateStateLocked() { void WeightedTargetLb::UpdateStateLocked() {
@ -553,11 +564,11 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
return lb_policy; return lb_policy;
} }
void WeightedTargetLb::WeightedChild::UpdateLocked( absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config, const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<ServerAddressList> addresses, absl::StatusOr<ServerAddressList> addresses,
const std::string& resolution_note, const ChannelArgs& args) { const std::string& resolution_note, const ChannelArgs& args) {
if (weighted_target_policy_->shutting_down_) return; if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
// Update child weight. // Update child weight.
weight_ = config.weight; weight_ = config.weight;
// Reactivate if needed. // Reactivate if needed.
@ -587,7 +598,7 @@ void WeightedTargetLb::WeightedChild::UpdateLocked(
weighted_target_policy_.get(), this, name_.c_str(), weighted_target_policy_.get(), this, name_.c_str(),
child_policy_.get()); child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
void WeightedTargetLb::WeightedChild::ResetBackoffLocked() { void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {

@ -98,7 +98,7 @@ class CdsLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kCds; } absl::string_view name() const override { return kCds; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
@ -310,7 +310,7 @@ void CdsLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
} }
void CdsLb::UpdateLocked(UpdateArgs args) { absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
// Update config. // Update config.
auto old_config = std::move(config_); auto old_config = std::move(config_);
config_ = std::move(args.config); config_ = std::move(args.config);
@ -338,6 +338,7 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(), XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(),
std::move(watcher)); std::move(watcher));
} }
return absl::OkStatus();
} }
// Generates the discovery mechanism config for the specified cluster name. // Generates the discovery mechanism config for the specified cluster name.
@ -554,7 +555,9 @@ void CdsLb::OnClusterChanged(const std::string& name,
} else { } else {
args.args = args_; args.args = args_;
} }
child_policy_->UpdateLocked(std::move(args)); // TODO(roth): If the child policy reports an error with the update,
// we need to propagate the error to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(args));
} }
// Remove entries in watchers_ for any clusters not in clusters_added // Remove entries in watchers_ for any clusters not in clusters_added
for (auto it = watchers_.begin(); it != watchers_.end();) { for (auto it = watchers_.begin(); it != watchers_.end();) {

@ -191,7 +191,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kXdsClusterImpl; } absl::string_view name() const override { return kXdsClusterImpl; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
@ -269,9 +269,9 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args); const ChannelArgs& args);
void UpdateChildPolicyLocked(absl::StatusOr<ServerAddressList> addresses, absl::Status UpdateChildPolicyLocked(
std::string resolution_note, absl::StatusOr<ServerAddressList> addresses, std::string resolution_note,
const ChannelArgs& args); const ChannelArgs& args);
void MaybeUpdatePickerLocked(); void MaybeUpdatePickerLocked();
@ -483,7 +483,7 @@ void XdsClusterImplLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
} }
void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this); gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
} }
@ -525,8 +525,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
MaybeUpdatePickerLocked(); MaybeUpdatePickerLocked();
} }
// Update child policy. // Update child policy.
UpdateChildPolicyLocked(std::move(args.addresses), return UpdateChildPolicyLocked(std::move(args.addresses),
std::move(args.resolution_note), args.args); std::move(args.resolution_note), args.args);
} }
void XdsClusterImplLb::MaybeUpdatePickerLocked() { void XdsClusterImplLb::MaybeUpdatePickerLocked() {
@ -582,7 +582,7 @@ OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
return lb_policy; return lb_policy;
} }
void XdsClusterImplLb::UpdateChildPolicyLocked( absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
absl::StatusOr<ServerAddressList> addresses, std::string resolution_note, absl::StatusOr<ServerAddressList> addresses, std::string resolution_note,
const ChannelArgs& args) { const ChannelArgs& args) {
// Create policy if needed. // Create policy if needed.
@ -602,7 +602,7 @@ void XdsClusterImplLb::UpdateChildPolicyLocked(
"[xds_cluster_impl_lb %p] Updating child policy handler %p", this, "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
child_policy_.get()); child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
// //

@ -96,7 +96,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kXdsClusterManager; } absl::string_view name() const override { return kXdsClusterManager; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
@ -144,9 +144,10 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
void Orphan() override; void Orphan() override;
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config, absl::Status UpdateLocked(
const absl::StatusOr<ServerAddressList>& addresses, RefCountedPtr<LoadBalancingPolicy::Config> config,
const ChannelArgs& args); const absl::StatusOr<ServerAddressList>& addresses,
const ChannelArgs& args);
void ExitIdleLocked(); void ExitIdleLocked();
void ResetBackoffLocked(); void ResetBackoffLocked();
void DeactivateLocked(); void DeactivateLocked();
@ -274,8 +275,8 @@ void XdsClusterManagerLb::ResetBackoffLocked() {
for (auto& p : children_) p.second->ResetBackoffLocked(); for (auto& p : children_) p.second->ResetBackoffLocked();
} }
void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) { absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return; if (shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this); gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
} }
@ -291,6 +292,7 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
} }
} }
// Add or update the children in the new config. // Add or update the children in the new config.
std::vector<std::string> errors;
for (const auto& p : config_->cluster_map()) { for (const auto& p : config_->cluster_map()) {
const std::string& name = p.first; const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second; const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
@ -299,10 +301,21 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
child = MakeOrphanable<ClusterChild>(Ref(DEBUG_LOCATION, "ClusterChild"), child = MakeOrphanable<ClusterChild>(Ref(DEBUG_LOCATION, "ClusterChild"),
name); name);
} }
child->UpdateLocked(config, args.addresses, args.args); absl::Status status =
child->UpdateLocked(config, args.addresses, args.args);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", name, ": ", status.ToString()));
}
} }
update_in_progress_ = false; update_in_progress_ = false;
UpdateStateLocked(); UpdateStateLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
} }
void XdsClusterManagerLb::UpdateStateLocked() { void XdsClusterManagerLb::UpdateStateLocked() {
@ -470,11 +483,11 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
return lb_policy; return lb_policy;
} }
void XdsClusterManagerLb::ClusterChild::UpdateLocked( absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config, RefCountedPtr<LoadBalancingPolicy::Config> config,
const absl::StatusOr<ServerAddressList>& addresses, const absl::StatusOr<ServerAddressList>& addresses,
const ChannelArgs& args) { const ChannelArgs& args) {
if (xds_cluster_manager_policy_->shutting_down_) return; if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
// Update child weight. // Update child weight.
// Reactivate if needed. // Reactivate if needed.
if (delayed_removal_timer_callback_pending_) { if (delayed_removal_timer_callback_pending_) {
@ -499,7 +512,7 @@ void XdsClusterManagerLb::ClusterChild::UpdateLocked(
xds_cluster_manager_policy_.get(), this, name_.c_str(), xds_cluster_manager_policy_.get(), this, name_.c_str(),
child_policy_.get()); child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() { void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {

@ -140,7 +140,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
absl::string_view name() const override { return kXdsClusterResolver; } absl::string_view name() const override { return kXdsClusterResolver; }
void UpdateLocked(UpdateArgs args) override; absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
@ -382,7 +382,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
void MaybeDestroyChildPolicyLocked(); void MaybeDestroyChildPolicyLocked();
void UpdateChildPolicyLocked(); absl::Status UpdateChildPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args); const ChannelArgs& args);
ServerAddressList CreateChildPolicyAddressesLocked(); ServerAddressList CreateChildPolicyAddressesLocked();
@ -612,7 +612,7 @@ void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() {
} }
} }
void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { absl::Status XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this); gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this);
} }
@ -623,7 +623,8 @@ void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
// Update args. // Update args.
args_ = std::move(args.args); args_ = std::move(args.args);
// Update child policy if needed. // Update child policy if needed.
if (child_policy_ != nullptr) UpdateChildPolicyLocked(); absl::Status status;
if (child_policy_ != nullptr) status = UpdateChildPolicyLocked();
// Create endpoint watcher if needed. // Create endpoint watcher if needed.
if (is_initial_update) { if (is_initial_update) {
for (const auto& config : config_->discovery_mechanisms()) { for (const auto& config : config_->discovery_mechanisms()) {
@ -649,6 +650,7 @@ void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
discovery_mechanism.discovery_mechanism->Start(); discovery_mechanism.discovery_mechanism->Start();
} }
} }
return status;
} }
void XdsClusterResolverLb::ResetBackoffLocked() { void XdsClusterResolverLb::ResetBackoffLocked() {
@ -757,7 +759,9 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
if (!mechanism.latest_update.has_value()) return; if (!mechanism.latest_update.has_value()) return;
} }
// Update child policy. // Update child policy.
UpdateChildPolicyLocked(); // TODO(roth): If the child policy reports an error with the update,
// we need to propagate that error back to the resolver somehow.
(void)UpdateChildPolicyLocked();
} }
void XdsClusterResolverLb::OnError(size_t index, std::string resolution_note) { void XdsClusterResolverLb::OnError(size_t index, std::string resolution_note) {
@ -998,11 +1002,11 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
return std::move(*config); return std::move(*config);
} }
void XdsClusterResolverLb::UpdateChildPolicyLocked() { absl::Status XdsClusterResolverLb::UpdateChildPolicyLocked() {
if (shutting_down_) return; if (shutting_down_) return absl::OkStatus();
UpdateArgs update_args; UpdateArgs update_args;
update_args.config = CreateChildPolicyConfigLocked(); update_args.config = CreateChildPolicyConfigLocked();
if (update_args.config == nullptr) return; if (update_args.config == nullptr) return absl::OkStatus();
update_args.addresses = CreateChildPolicyAddressesLocked(); update_args.addresses = CreateChildPolicyAddressesLocked();
update_args.resolution_note = CreateChildPolicyResolutionNoteLocked(); update_args.resolution_note = CreateChildPolicyResolutionNoteLocked();
update_args.args = CreateChildPolicyArgsLocked(args_); update_args.args = CreateChildPolicyArgsLocked(args_);
@ -1013,7 +1017,7 @@ void XdsClusterResolverLb::UpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p", gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p",
this, child_policy_.get()); this, child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); return child_policy_->UpdateLocked(std::move(update_args));
} }
ChannelArgs XdsClusterResolverLb::CreateChildPolicyArgsLocked( ChannelArgs XdsClusterResolverLb::CreateChildPolicyArgsLocked(

@ -20,10 +20,13 @@
#include <inttypes.h> #include <inttypes.h>
#include <functional>
#include <utility> #include <utility>
#include <vector>
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/strings/strip.h" #include "absl/strings/strip.h"
@ -36,6 +39,7 @@
#include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
namespace grpc_core { namespace grpc_core {
@ -69,7 +73,16 @@ void PollingResolver::StartLocked() { MaybeStartResolvingLocked(); }
void PollingResolver::RequestReresolutionLocked() { void PollingResolver::RequestReresolutionLocked() {
if (request_ == nullptr) { if (request_ == nullptr) {
MaybeStartResolvingLocked(); // If we're still waiting for a result-health callback from the last
// result we reported, don't trigger the re-resolution until we get
// that callback.
if (result_status_state_ ==
ResultStatusState::kResultHealthCallbackPending) {
result_status_state_ =
ResultStatusState::kReresolutionRequestedWhileCallbackWasPending;
} else {
MaybeStartResolvingLocked();
}
} }
} }
@ -126,44 +139,78 @@ void PollingResolver::OnRequestCompleteLocked(Result result) {
} }
request_.reset(); request_.reset();
if (!shutdown_) { if (!shutdown_) {
if (result.service_config.ok() && result.addresses.ok()) { if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
// Reset backoff state so that we start from the beginning when the gpr_log(GPR_INFO,
// next request gets triggered. "[polling resolver %p] returning result: "
backoff_.Reset(); "addresses=%s, service_config=%s",
} else { this,
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) { result.addresses.ok()
gpr_log(GPR_INFO, ? absl::StrCat("<", result.addresses->size(), " addresses>")
"[polling resolver %p] resolution failed (will retry): " .c_str()
"address status \"%s\"; service config status \"%s\"", : result.addresses.status().ToString().c_str(),
this, result.addresses.status().ToString().c_str(), result.service_config.ok()
result.service_config.status().ToString().c_str()); ? (*result.service_config == nullptr
} ? "<null>"
// Set up for retry. : std::string((*result.service_config)->json_string())
// InvalidateNow to avoid getting stuck re-initializing this timer .c_str())
// in a loop while draining the currently-held WorkSerializer. : result.service_config.status().ToString().c_str());
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
Timestamp next_try = backoff_.NextAttemptTime();
Duration timeout = next_try - ExecCtx::Get()->Now();
GPR_ASSERT(!have_next_resolution_timer_);
have_next_resolution_timer_ = true;
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
this, timeout.millis());
} else {
gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
}
}
Ref(DEBUG_LOCATION, "next_resolution_timer").release();
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_);
} }
GPR_ASSERT(result.result_health_callback == nullptr);
RefCountedPtr<PollingResolver> self =
Ref(DEBUG_LOCATION, "result_health_callback");
result.result_health_callback = [self =
std::move(self)](absl::Status status) {
self->GetResultStatus(std::move(status));
};
result_status_state_ = ResultStatusState::kResultHealthCallbackPending;
result_handler_->ReportResult(std::move(result)); result_handler_->ReportResult(std::move(result));
} }
Unref(DEBUG_LOCATION, "OnRequestComplete"); Unref(DEBUG_LOCATION, "OnRequestComplete");
} }
void PollingResolver::GetResultStatus(absl::Status status) {
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
gpr_log(GPR_INFO, "[polling resolver %p] result status from channel: %s",
this, status.ToString().c_str());
}
if (status.ok()) {
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
backoff_.Reset();
// If a re-resolution attempt was requested while the result-status
// callback was pending, trigger a new request now.
if (std::exchange(result_status_state_, ResultStatusState::kNone) ==
ResultStatusState::kReresolutionRequestedWhileCallbackWasPending) {
MaybeStartResolvingLocked();
}
} else {
// Set up for retry.
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
Timestamp next_try = backoff_.NextAttemptTime();
Duration timeout = next_try - ExecCtx::Get()->Now();
GPR_ASSERT(!have_next_resolution_timer_);
have_next_resolution_timer_ = true;
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[polling resolver %p] retrying in %" PRId64 " ms",
this, timeout.millis());
} else {
gpr_log(GPR_INFO, "[polling resolver %p] retrying immediately", this);
}
}
Ref(DEBUG_LOCATION, "next_resolution_timer").release();
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, nullptr);
grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_);
// Reset result_status_state_. Note that even if re-resolution was
// requested while the result-health callback was pending, we can
// ignore it here, because we are in backoff to re-resolve anyway.
result_status_state_ = ResultStatusState::kNone;
}
}
void PollingResolver::MaybeStartResolvingLocked() { void PollingResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we // If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution. // can start the next resolution.

@ -22,6 +22,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "absl/status/status.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/backoff/backoff.h"
@ -77,6 +78,8 @@ class PollingResolver : public Resolver {
void OnRequestCompleteLocked(Result result); void OnRequestCompleteLocked(Result result);
void GetResultStatus(absl::Status status);
static void OnNextResolution(void* arg, grpc_error_handle error); static void OnNextResolution(void* arg, grpc_error_handle error);
void OnNextResolutionLocked(grpc_error_handle error); void OnNextResolutionLocked(grpc_error_handle error);
@ -105,6 +108,14 @@ class PollingResolver : public Resolver {
absl::optional<Timestamp> last_resolution_timestamp_; absl::optional<Timestamp> last_resolution_timestamp_;
/// retry backoff state /// retry backoff state
BackOff backoff_; BackOff backoff_;
/// state for handling interactions between re-resolution requests and
/// result health callbacks
enum class ResultStatusState {
kNone,
kResultHealthCallbackPending,
kReresolutionRequestedWhileCallbackWasPending,
};
ResultStatusState result_status_state_ = ResultStatusState::kNone;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -353,11 +353,11 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Updates the policy with new data from the resolver. Will be invoked /// Updates the policy with new data from the resolver. Will be invoked
/// immediately after LB policy is constructed, and then again whenever /// immediately after LB policy is constructed, and then again whenever
/// the resolver returns a new result. /// the resolver returns a new result. The returned status indicates
// TODO(roth): Change this to return some indication as to whether the /// whether the LB policy accepted the update; if non-OK, informs
// update has been accepted, so that we can indicate to the resolver /// polling-based resolvers that they should go into backoff delay and
// whether it should go into backoff to retry the resolution. /// eventually reattempt the resolution.
virtual void UpdateLocked(UpdateArgs) = 0; // NOLINT virtual absl::Status UpdateLocked(UpdateArgs) = 0; // NOLINT
/// Tries to enter a READY connectivity state. /// Tries to enter a READY connectivity state.
/// This is a no-op by default, since most LB policies never go into /// This is a no-op by default, since most LB policies never go into

@ -19,8 +19,10 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <functional>
#include <string> #include <string>
#include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -67,6 +69,17 @@ class Resolver : public InternallyRefCounted<Resolver> {
// TODO(roth): Before making this a public API, figure out a way to // TODO(roth): Before making this a public API, figure out a way to
// avoid exposing channel args this way. // avoid exposing channel args this way.
ChannelArgs args; ChannelArgs args;
// If non-null, this callback will be invoked when the LB policy has
// processed the result. The status value passed to the callback
// indicates whether the LB policy accepted the update. For polling
// resolvers, if the reported status is non-OK, then the resolver
// should put itself into backoff to retry the resolution later.
// The resolver impl must not call ResultHandler::ReportResult()
// again until after this callback has been invoked.
// The callback will be invoked within the channel's WorkSerializer.
// It may or may not be invoked before ResultHandler::ReportResult()
// returns, which is why it's a separate callback.
std::function<void(absl::Status)> result_health_callback;
}; };
/// A proxy object used by the resolver to return results to the /// A proxy object used by the resolver to return results to the
@ -76,9 +89,6 @@ class Resolver : public InternallyRefCounted<Resolver> {
virtual ~ResultHandler() {} virtual ~ResultHandler() {}
/// Reports a result to the channel. /// Reports a result to the channel.
// TODO(roth): Add a mechanism for the resolver to get back a signal
// indicating if the result was accepted by the LB policy, so that it
// knows whether to go into backoff to retry to resolution.
virtual void ReportResult(Result result) = 0; // NOLINT virtual void ReportResult(Result result) = 0; // NOLINT
}; };

@ -243,7 +243,10 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
state_ = state; state_ = state;
} }
void ReportResult(grpc_core::Resolver::Result /*result*/) override { void ReportResult(grpc_core::Resolver::Result result) override {
if (result.result_health_callback != nullptr) {
result.result_health_callback(absl::OkStatus());
}
ASSERT_NE(result_cb_, nullptr); ASSERT_NE(result_cb_, nullptr);
ASSERT_NE(state_, nullptr); ASSERT_NE(state_, nullptr);
ResultCallback cb = result_cb_; ResultCallback cb = result_cb_;

@ -59,9 +59,10 @@ class DropPolicy : public LoadBalancingPolicy {
absl::string_view name() const override { return kDropPolicyName; } absl::string_view name() const override { return kDropPolicyName; }
void UpdateLocked(UpdateArgs) override { absl::Status UpdateLocked(UpdateArgs) override {
channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(), channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<DropPicker>()); absl::make_unique<DropPicker>());
return absl::OkStatus();
} }
void ResetBackoffLocked() override {} void ResetBackoffLocked() override {}

@ -76,8 +76,8 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
~ForwardingLoadBalancingPolicy() override = default; ~ForwardingLoadBalancingPolicy() override = default;
void UpdateLocked(UpdateArgs args) override { absl::Status UpdateLocked(UpdateArgs args) override {
delegate_->UpdateLocked(std::move(args)); return delegate_->UpdateLocked(std::move(args));
} }
void ExitIdleLocked() override { delegate_->ExitIdleLocked(); } void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
@ -454,7 +454,7 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
absl::string_view name() const override { return kFixedAddressLbPolicyName; } absl::string_view name() const override { return kFixedAddressLbPolicyName; }
void UpdateLocked(UpdateArgs args) override { absl::Status UpdateLocked(UpdateArgs args) override {
auto* config = static_cast<FixedAddressConfig*>(args.config.get()); auto* config = static_cast<FixedAddressConfig*>(args.config.get());
gpr_log(GPR_INFO, "%s: update URI: %s", kFixedAddressLbPolicyName, gpr_log(GPR_INFO, "%s: update URI: %s", kFixedAddressLbPolicyName,
config->address().c_str()); config->address().c_str());
@ -471,7 +471,7 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
kFixedAddressLbPolicyName, uri.status().ToString().c_str()); kFixedAddressLbPolicyName, uri.status().ToString().c_str());
args.resolution_note = "no address in fixed_address_lb policy"; args.resolution_note = "no address in fixed_address_lb policy";
} }
ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args)); return ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
} }
private: private:
@ -671,10 +671,11 @@ class FailPolicy : public LoadBalancingPolicy {
absl::string_view name() const override { return kFailPolicyName; } absl::string_view name() const override { return kFailPolicyName; }
void UpdateLocked(UpdateArgs) override { absl::Status UpdateLocked(UpdateArgs) override {
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status_, GRPC_CHANNEL_TRANSIENT_FAILURE, status_,
absl::make_unique<FailPicker>(status_, pick_counter_)); absl::make_unique<FailPicker>(status_, pick_counter_));
return absl::OkStatus();
} }
void ResetBackoffLocked() override {} void ResetBackoffLocked() override {}

@ -1259,6 +1259,39 @@ TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
WaitForServer(DEBUG_LOCATION, stub, 0); WaitForServer(DEBUG_LOCATION, stub, 0);
} }
TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******");
// Send a resolver result with an empty address list and a callback
// that triggers a notification.
absl::Notification notification;
grpc_core::Resolver::Result result;
result.addresses.emplace();
result.result_health_callback = [&](absl::Status status) {
EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
EXPECT_EQ("address list must not be empty", status.message()) << status;
notification.Notify();
};
response_generator.SetResponse(std::move(result));
// Wait for channel to report TRANSIENT_FAILURE.
gpr_log(GPR_INFO, "****** TELLING CHANNEL TO CONNECT *******");
auto predicate = [](grpc_connectivity_state state) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(
WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
// Callback should have been run.
ASSERT_TRUE(notification.HasBeenNotified());
// Return a valid address.
gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******");
StartServers(1);
response_generator.SetNextResolution(GetServersPorts());
gpr_log(GPR_INFO, "****** SENDING WAIT_FOR_READY RPC *******");
CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
}
TEST_F(PickFirstTest, CheckStateBeforeStartWatch) { TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()}; std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports); StartServers(1, ports);
@ -1637,6 +1670,40 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
WaitForServer(DEBUG_LOCATION, stub, 2); WaitForServer(DEBUG_LOCATION, stub, 2);
} }
TEST_F(RoundRobinTest, FailsEmptyResolverUpdate) {
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******");
// Send a resolver result with an empty address list and a callback
// that triggers a notification.
absl::Notification notification;
grpc_core::Resolver::Result result;
result.addresses.emplace();
result.resolution_note = "injected error";
result.result_health_callback = [&](absl::Status status) {
EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
EXPECT_EQ("empty address list: injected error", status.message()) << status;
notification.Notify();
};
response_generator.SetResponse(std::move(result));
// Wait for channel to report TRANSIENT_FAILURE.
gpr_log(GPR_INFO, "****** TELLING CHANNEL TO CONNECT *******");
auto predicate = [](grpc_connectivity_state state) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(
WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
// Callback should have been run.
ASSERT_TRUE(notification.HasBeenNotified());
// Return a valid address.
gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******");
StartServers(1);
response_generator.SetNextResolution(GetServersPorts());
gpr_log(GPR_INFO, "****** SENDING WAIT_FOR_READY RPC *******");
CheckRpcSendOk(DEBUG_LOCATION, stub, /*wait_for_ready=*/true);
}
TEST_F(RoundRobinTest, TransientFailure) { TEST_F(RoundRobinTest, TransientFailure) {
// Start servers and create channel. Channel should go to READY state. // Start servers and create channel. Channel should go to READY state.
const int kNumServers = 3; const int kNumServers = 3;

Loading…
Cancel
Save