diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 991fc99c627..533951fb281 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -416,16 +416,11 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler"); } - void ReturnResult(Resolver::Result result) override + void ReportResult(Resolver::Result result) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { chand_->OnResolverResultChangedLocked(std::move(result)); } - void ReturnError(grpc_error_handle error) override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { - chand_->OnResolverErrorLocked(error); - } - private: ClientChannel* chand_; }; @@ -1121,7 +1116,6 @@ ClientChannel::~ClientChannel() { } DestroyResolverAndLbPolicyLocked(); grpc_channel_args_destroy(channel_args_); - GRPC_ERROR_UNREF(resolver_transient_failure_error_); // Stop backup polling. grpc_client_channel_stop_backup_polling(interested_parties_); grpc_pollset_set_destroy(interested_parties_); @@ -1202,26 +1196,29 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { // // We track a list of strings to eventually be concatenated and traced. absl::InlinedVector trace_strings; - if (result.addresses.empty() && previous_resolution_contained_addresses_) { + const bool resolution_contains_addresses = + result.addresses.ok() && !result.addresses->empty(); + if (!resolution_contains_addresses && + previous_resolution_contained_addresses_) { trace_strings.push_back("Address list became empty"); - } else if (!result.addresses.empty() && + } else if (resolution_contains_addresses && !previous_resolution_contained_addresses_) { trace_strings.push_back("Address list became non-empty"); } - previous_resolution_contained_addresses_ = !result.addresses.empty(); + previous_resolution_contained_addresses_ = resolution_contains_addresses; std::string service_config_error_string_storage; - if (result.service_config_error != GRPC_ERROR_NONE) { + if (!result.service_config.ok()) { service_config_error_string_storage = - grpc_error_std_string(result.service_config_error); + result.service_config.status().ToString(); trace_strings.push_back(service_config_error_string_storage.c_str()); } // Choose the service config. RefCountedPtr service_config; RefCountedPtr config_selector; - if (result.service_config_error != GRPC_ERROR_NONE) { + if (!result.service_config.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s", - this, grpc_error_std_string(result.service_config_error).c_str()); + this, result.service_config.status().ToString().c_str()); } // If the service config was invalid, then fallback to the // previously returned service config. @@ -1235,13 +1232,13 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { service_config = saved_service_config_; config_selector = saved_config_selector_; } else { - // We received an invalid service config and we don't have a + // We received a service config error and we don't have a // previous service config to fall back to. Put the channel into // TRANSIENT_FAILURE. - OnResolverErrorLocked(GRPC_ERROR_REF(result.service_config_error)); + OnResolverErrorLocked(result.service_config.status()); trace_strings.push_back("no valid service config"); } - } else if (result.service_config == nullptr) { + } else if (*result.service_config == nullptr) { // Resolver did not return any service config. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, @@ -1252,9 +1249,12 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { service_config = default_service_config_; } else { // Use ServiceConfig and ConfigSelector returned by resolver. - service_config = result.service_config; + service_config = std::move(*result.service_config); config_selector = ConfigSelector::GetFromChannelArgs(*result.args); } + // Note: The only case in which service_config is null here is if the resolver + // returned a service config error and we don't have a previous service + // config to fall back to. if (service_config != nullptr) { // Extract global config for client channel. const internal::ClientChannelGlobalParsedConfig* parsed_service_config = @@ -1306,28 +1306,21 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { } } -void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) { - if (resolver_ == nullptr) { - GRPC_ERROR_UNREF(error); - return; - } +void ClientChannel::OnResolverErrorLocked(absl::Status status) { + if (resolver_ == nullptr) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this, - grpc_error_std_string(error).c_str()); + status.ToString().c_str()); } // If we already have an LB policy from a previous resolution // result, then we continue to let it set the connectivity state. // Otherwise, we go into TRANSIENT_FAILURE. if (lb_policy_ == nullptr) { - grpc_error_handle state_error = - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver transient failure", &error, 1); - absl::Status status = grpc_error_to_absl_status(state_error); + grpc_error_handle error = absl_status_to_grpc_error(status); { MutexLock lock(&resolution_mu_); // Update resolver transient failure. - GRPC_ERROR_UNREF(resolver_transient_failure_error_); - resolver_transient_failure_error_ = state_error; + resolver_transient_failure_error_ = status; // Process calls that were queued waiting for the resolver result. for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr; call = call->next) { @@ -1339,12 +1332,12 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) { } } } + GRPC_ERROR_UNREF(error); // Update connectivity state. UpdateStateAndPickerLocked( GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure", absl::make_unique(status)); } - GRPC_ERROR_UNREF(error); } void ClientChannel::CreateOrUpdateLbPolicyLocked( @@ -1355,6 +1348,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked( LoadBalancingPolicy::UpdateArgs update_args; update_args.addresses = std::move(result.addresses); update_args.config = std::move(lb_policy_config); + update_args.resolution_note = std::move(result.resolution_note); // Add health check service name to channel args. absl::InlinedVector args_to_add; if (health_check_service_name.has_value()) { @@ -1495,8 +1489,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { // after releasing the lock to keep the critical section small. { MutexLock lock(&resolution_mu_); - GRPC_ERROR_UNREF(resolver_transient_failure_error_); - resolver_transient_failure_error_ = GRPC_ERROR_NONE; + resolver_transient_failure_error_ = absl::OkStatus(); // Update service config. received_service_config_data_ = true; // Old values will be unreffed after lock is released. @@ -2352,12 +2345,11 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem, if (GPR_UNLIKELY(!chand->received_service_config_data_)) { // If the resolver returned transient failure before returning the // first service config, fail any non-wait_for_ready calls. - grpc_error_handle resolver_error = chand->resolver_transient_failure_error_; - if (resolver_error != GRPC_ERROR_NONE && - (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == - 0) { + absl::Status resolver_error = chand->resolver_transient_failure_error_; + if (!resolver_error.ok() && (send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { MaybeRemoveCallFromResolverQueuedCallsLocked(elem); - *error = GRPC_ERROR_REF(resolver_error); + *error = absl_status_to_grpc_error(resolver_error); return true; } // Either the resolver has not yet returned a result, or it has diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 471bbe63568..5c4dc5b87e1 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -216,7 +216,7 @@ class ClientChannel { void OnResolverResultChangedLocked(Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); - void OnResolverErrorLocked(grpc_error_handle error) + void OnResolverErrorLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void CreateOrUpdateLbPolicyLocked( @@ -288,8 +288,8 @@ class ClientChannel { ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = nullptr; // Data from service config. - grpc_error_handle resolver_transient_failure_error_ - ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE; + absl::Status resolver_transient_failure_error_ + ABSL_GUARDED_BY(resolution_mu_); bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false; RefCountedPtr service_config_ ABSL_GUARDED_BY(resolution_mu_); RefCountedPtr config_selector_ diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 44363b4ac97..e3da7698d70 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -54,27 +54,27 @@ void LoadBalancingPolicy::Orphan() { // LoadBalancingPolicy::UpdateArgs // -LoadBalancingPolicy::UpdateArgs::UpdateArgs(const UpdateArgs& other) { - addresses = other.addresses; - config = other.config; - args = grpc_channel_args_copy(other.args); -} - -LoadBalancingPolicy::UpdateArgs::UpdateArgs(UpdateArgs&& other) noexcept { - addresses = std::move(other.addresses); - config = std::move(other.config); - // TODO(roth): Use std::move() once channel args is converted to C++. - args = other.args; +LoadBalancingPolicy::UpdateArgs::UpdateArgs(const UpdateArgs& other) + : addresses(other.addresses), + config(other.config), + resolution_note(other.resolution_note), + args(grpc_channel_args_copy(other.args)) {} + +LoadBalancingPolicy::UpdateArgs::UpdateArgs(UpdateArgs&& other) noexcept + : addresses(std::move(other.addresses)), + config(std::move(other.config)), + resolution_note(std::move(other.resolution_note)), + // TODO(roth): Use std::move() once channel args is converted to C++. + args(other.args) { other.args = nullptr; } LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=( const UpdateArgs& other) { - if (&other == this) { - return *this; - } + if (&other == this) return *this; addresses = other.addresses; config = other.config; + resolution_note = other.resolution_note; grpc_channel_args_destroy(args); args = grpc_channel_args_copy(other.args); return *this; @@ -84,6 +84,7 @@ LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=( UpdateArgs&& other) noexcept { addresses = std::move(other.addresses); config = std::move(other.config); + resolution_note = std::move(other.resolution_note); // TODO(roth): Use std::move() once channel args is converted to C++. grpc_channel_args_destroy(args); args = other.args; diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 4e22d6a6fbe..151be9af267 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -23,6 +23,7 @@ #include #include "absl/status/status.h" +#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/variant.h" @@ -325,8 +326,20 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Data passed to the UpdateLocked() method when new addresses and /// config are available. struct UpdateArgs { - ServerAddressList addresses; + /// A list of addresses, or an error indicating a failure to obtain the + /// list of addresses. + absl::StatusOr addresses; + /// The LB policy config. RefCountedPtr config; + /// A human-readable note providing context about the name resolution that + /// provided this update. LB policies may wish to include this message + /// in RPC failure status messages. For example, if the update has an + /// empty list of addresses, this message might say "no DNS entries + /// found for ". + std::string resolution_note; + + // TODO(roth): Before making this a public API, find a better + // abstraction for representing channel args. const grpc_channel_args* args = nullptr; // TODO(roth): Remove everything below once channel args is @@ -368,6 +381,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Updates the policy with new data from the resolver. Will be invoked /// immediately after LB policy is constructed, and then again whenever /// the resolver returns a new result. + // TODO(roth): Change this to return some indication as to whether the + // update has been accepted, so that we can indicate to the resolver + // whether it should go into backoff to retry the resolution. virtual void UpdateLocked(UpdateArgs) = 0; // NOLINT /// Tries to enter a READY connectivity state. @@ -438,4 +454,4 @@ class LoadBalancingPolicy : public InternallyRefCounted { } // namespace grpc_core -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */ +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H diff --git a/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc b/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc index c6078e0d8fd..45c3ad00f0c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc +++ b/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc @@ -69,10 +69,11 @@ MakeHierarchicalPathAttribute(std::vector path) { return absl::make_unique(std::move(path)); } -HierarchicalAddressMap MakeHierarchicalAddressMap( - const ServerAddressList& addresses) { +absl::StatusOr MakeHierarchicalAddressMap( + const absl::StatusOr& addresses) { + if (!addresses.ok()) return addresses.status(); HierarchicalAddressMap result; - for (const ServerAddress& address : addresses) { + for (const ServerAddress& address : *addresses) { const HierarchicalPathAttribute* path_attribute = static_cast( address.GetAttribute(kHierarchicalPathAttributeKey)); diff --git a/src/core/ext/filters/client_channel/lb_policy/address_filtering.h b/src/core/ext/filters/client_channel/lb_policy/address_filtering.h index c276c8e2e8d..8c400fa44c2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/address_filtering.h +++ b/src/core/ext/filters/client_channel/lb_policy/address_filtering.h @@ -23,6 +23,8 @@ #include #include +#include "absl/status/statusor.h" + #include "src/core/ext/filters/client_channel/server_address.h" // The resolver returns a flat list of addresses. When a hierarchy of @@ -92,10 +94,9 @@ MakeHierarchicalPathAttribute(std::vector path); using HierarchicalAddressMap = std::map; // Splits up the addresses into a separate list for each child. -HierarchicalAddressMap MakeHierarchicalAddressMap( - const ServerAddressList& addresses); +absl::StatusOr MakeHierarchicalAddressMap( + const absl::StatusOr& addresses); } // namespace grpc_core -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H \ - */ +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index f90634d8b30..180ee0145f5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -401,10 +401,7 @@ class GrpcLb : public LoadBalancingPolicy { void ShutdownLocked() override; // Helper functions used in UpdateLocked(). - void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses, - const grpc_channel_args& args); - static ServerAddressList AddNullLbTokenToAddresses( - const ServerAddressList& addresses); + void UpdateBalancerChannelLocked(const grpc_channel_args& args); void CancelBalancerChannelConnectivityWatchLocked(); @@ -473,7 +470,10 @@ class GrpcLb : public LoadBalancingPolicy { // Whether we're in fallback mode. bool fallback_mode_ = false; // The backend addresses from the resolver. - ServerAddressList fallback_backend_addresses_; + absl::StatusOr fallback_backend_addresses_; + // The last resolution note from our parent. + // To be passed to child policy when fallback_backend_addresses_ is empty. + std::string resolution_note_; // State for fallback-at-startup checks. // Timeout after startup after which we will go into fallback mode if // we have not received a serverlist from the balancer. @@ -1441,8 +1441,20 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { const bool is_initial_update = lb_channel_ == nullptr; config_ = args.config; GPR_ASSERT(config_ != nullptr); - ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args); - // Update the existing child policy. + // Update fallback address list. + fallback_backend_addresses_ = std::move(args.addresses); + if (fallback_backend_addresses_.ok()) { + // Add null LB token attributes. + for (ServerAddress& address : *fallback_backend_addresses_) { + address = address.WithAttribute( + kGrpcLbAddressAttributeKey, + absl::make_unique("", nullptr)); + } + } + resolution_note_ = std::move(args.resolution_note); + // Update balancer channel. + UpdateBalancerChannelLocked(*args.args); + // Update the existing child policy, if any. if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); // If this is the initial update, start the fallback-at-startup checks // and the balancer call. @@ -1471,21 +1483,7 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { // helpers for UpdateLocked() // -ServerAddressList GrpcLb::AddNullLbTokenToAddresses( - const ServerAddressList& addresses) { - ServerAddressList addresses_out; - for (const ServerAddress& address : addresses) { - addresses_out.emplace_back(address.WithAttribute( - kGrpcLbAddressAttributeKey, - absl::make_unique("", nullptr))); - } - return addresses_out; -} - -void GrpcLb::ProcessAddressesAndChannelArgsLocked( - const ServerAddressList& addresses, const grpc_channel_args& args) { - // Update fallback address list. - fallback_backend_addresses_ = AddNullLbTokenToAddresses(addresses); +void GrpcLb::UpdateBalancerChannelLocked(const grpc_channel_args& args) { // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1687,9 +1685,14 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't // received any serverlist from the balancer, we use the fallback backends // returned by the resolver. Note that the fallback backend list may be - // empty, in which case the new round_robin policy will keep the requested - // picks pending. + // empty, in which case the new child policy will fail the picks. update_args.addresses = fallback_backend_addresses_; + if (fallback_backend_addresses_.ok() && + fallback_backend_addresses_->empty()) { + update_args.resolution_note = absl::StrCat( + "grpclb in fallback mode without any balancer addresses: ", + resolution_note_); + } } else { update_args.addresses = serverlist_->GetServerAddressList( lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index ad03e6c0e2c..729449505ac 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include @@ -181,20 +179,25 @@ void PickFirst::ResetBackoffLocked() { } void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { - // Create a subchannel list from the latest_update_args_. + // Create a subchannel list from latest_update_args_. + ServerAddressList addresses; + if (latest_update_args_.addresses.ok()) { + addresses = *latest_update_args_.addresses; + } auto subchannel_list = MakeOrphanable( - this, &grpc_lb_pick_first_trace, latest_update_args_.addresses, + this, &grpc_lb_pick_first_trace, std::move(addresses), *latest_update_args_.args); // Empty update or no valid subchannels. if (subchannel_list->num_subchannels() == 0) { // Unsubscribe from all current subchannels. subchannel_list_ = std::move(subchannel_list); // Empty list. selected_ = nullptr; - // If not idle, put the channel in TRANSIENT_FAILURE. - // (If we are idle, then this will happen in ExitIdleLocked() if we - // haven't gotten a non-empty update by the time the application tries - // to start a new call.) - absl::Status status = absl::UnavailableError("Empty update"); + // Put the channel in TRANSIENT_FAILURE. + absl::Status status = + latest_update_args_.addresses.ok() + ? absl::UnavailableError(absl::StrCat( + "empty address list: ", latest_update_args_.resolution_note)) + : latest_update_args_.addresses.status(); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); @@ -258,17 +261,28 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { void PickFirst::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Pick First %p received update with %" PRIuPTR " addresses", this, - args.addresses.size()); + if (args.addresses.ok()) { + gpr_log(GPR_INFO, + "Pick First %p received update with %" PRIuPTR " addresses", this, + args.addresses->size()); + } else { + gpr_log(GPR_INFO, "Pick First %p received update with address error: %s", + this, args.addresses.status().ToString().c_str()); + } } - // Update the latest_update_args_ + // Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. grpc_arg new_arg = grpc_channel_arg_integer_create( const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); const grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args.args, &new_arg, 1); std::swap(new_args, args.args); grpc_channel_args_destroy(new_args); + // If the update contains a resolver error and we have a previous update + // that was not a resolver error, keep using the previous addresses. + if (!args.addresses.ok() && latest_update_args_.config != nullptr) { + args.addresses = std::move(latest_update_args_.addresses); + } + // Update latest_update_args_. latest_update_args_ = std::move(args); // If we are not in idle, start connection attempt immediately. // Otherwise, we defer the attempt into ExitIdleLocked(). diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index cb9fc3a3e12..c4b10eb3941 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -224,7 +224,7 @@ class PriorityLb : public LoadBalancingPolicy { // Current channel args and config from the resolver. const grpc_channel_args* args_ = nullptr; RefCountedPtr config_; - HierarchicalAddressMap addresses_; + absl::StatusOr addresses_; // Internal state. bool shutting_down_ = false; @@ -557,7 +557,11 @@ void PriorityLb::ChildPriority::UpdateLocked( // Construct update args. UpdateArgs update_args; update_args.config = std::move(config); - update_args.addresses = priority_policy_->addresses_[name_]; + if (priority_policy_->addresses_.ok()) { + update_args.addresses = (*priority_policy_->addresses_)[name_]; + } else { + update_args.addresses = priority_policy_->addresses_.status(); + } update_args.args = grpc_channel_args_copy(priority_policy_->args_); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 2d00fd29f5a..cf0c0712fd0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -581,7 +581,7 @@ void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log( GPR_INFO, - "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " + "[RH %p] connectivity changed for subchannel %p, subchannel_list %p " "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), @@ -623,7 +623,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, - "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "[RH %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "Requesting re-resolution", p, subchannel()); } @@ -685,27 +685,40 @@ void RingHash::ShutdownLocked() { void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); } void RingHash::UpdateLocked(UpdateArgs args) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, args.addresses.size()); - } config_ = std::move(args.config); - // Filter out any address with weight 0. ServerAddressList addresses; - addresses.reserve(args.addresses.size()); - for (ServerAddress& address : args.addresses) { - const ServerAddressWeightAttribute* weight_attribute = - static_cast(address.GetAttribute( - ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); - if (weight_attribute == nullptr || weight_attribute->weight() > 0) { - addresses.push_back(std::move(address)); + if (args.addresses.ok()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses", + this, args.addresses->size()); + } + // Filter out any address with weight 0. + addresses.reserve(args.addresses->size()); + for (ServerAddress& address : *args.addresses) { + const ServerAddressWeightAttribute* weight_attribute = + static_cast(address.GetAttribute( + ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); + if (weight_attribute == nullptr || weight_attribute->weight() > 0) { + addresses.emplace_back(std::move(address)); + } + } + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", + this, args.addresses.status().ToString().c_str()); } + // If we already have a subchannel list, then ignore the resolver + // failure and keep using the existing list. + if (subchannel_list_ != nullptr) return; } subchannel_list_ = MakeOrphanable( this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args); if (subchannel_list_->num_subchannels() == 0) { // If the new list is empty, immediately transition to TRANSIENT_FAILURE. - absl::Status status = absl::UnavailableError("Empty update"); + absl::Status status = + args.addresses.ok() ? absl::UnavailableError(absl::StrCat( + "empty address list: ", args.resolution_note)) + : args.addresses.status(); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 2b0483a69e8..a8c13c74e60 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -669,7 +669,7 @@ class RlsLb : public LoadBalancingPolicy { OrphanablePtr rls_channel_ ABSL_GUARDED_BY(mu_); // Accessed only from within WorkSerializer. - ServerAddressList addresses_; + absl::StatusOr addresses_; const grpc_channel_args* channel_args_ = nullptr; RefCountedPtr config_; RefCountedPtr default_child_policy_; @@ -1863,19 +1863,28 @@ void RlsLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { gpr_log(GPR_INFO, "[rlslb %p] policy updated", this); } - // Swap out config, addresses, and channel args. + // Swap out config. RefCountedPtr old_config = std::move(config_); config_ = std::move(args.config); - ServerAddressList old_addresses = std::move(addresses_); - addresses_ = std::move(args.addresses); - grpc_channel_args_destroy(channel_args_); - channel_args_ = grpc_channel_args_copy(args.args); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) && (old_config == nullptr || old_config->child_policy_config() != config_->child_policy_config())) { gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this, config_->child_policy_config().Dump().c_str()); } + // Swap out addresses. + // If the new address list is an error and we have an existing address list, + // stick with the existing addresses. + absl::StatusOr old_addresses; + if (args.addresses.ok()) { + old_addresses = std::move(addresses_); + addresses_ = std::move(args.addresses); + } else { + old_addresses = addresses_; + } + // Swap out channel args. + grpc_channel_args_destroy(channel_args_); + channel_args_ = grpc_channel_args_copy(args.args); // Determine whether we need to update all child policies. bool update_child_policies = old_config == nullptr || diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 675ecf47742..f7af964fbd2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -1,28 +1,18 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** Round Robin Policy. - * - * Before every pick, the \a get_next_ready_subchannel_index_locked function - * returns the p->subchannel_list->subchannels index for next subchannel, - * respecting the relative order of the addresses provided upon creation or - * updates. Note however that updates will start picking from the beginning of - * the updated list. */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include @@ -166,15 +156,14 @@ class RoundRobin : public LoadBalancingPolicy { void ShutdownLocked() override; - /** list of subchannels */ + // List of subchannels. OrphanablePtr subchannel_list_; - /** Latest version of the subchannel list. - * Subchannel connectivity callbacks will only promote updated subchannel - * lists if they equal \a latest_pending_subchannel_list. In other words, - * racing callbacks that reference outdated subchannel lists won't perform any - * update. */ + // Latest pending subchannel list. + // When we get an updated address list, we create a new subchannel list + // for it here, and we wait to swap it into subchannel_list_ until the new + // list becomes READY. OrphanablePtr latest_pending_subchannel_list_; - /** are we shutting down? */ + bool shutdown_ = false; }; @@ -303,31 +292,30 @@ void RoundRobin::RoundRobinSubchannelList:: RoundRobin* p = static_cast(policy()); // Only set connectivity state if this is the current subchannel list. if (p->subchannel_list_.get() != this) return; - /* In priority order. The first rule to match terminates the search (ie, if we - * are on rule n, all previous rules were unfulfilled). - * - * 1) RULE: ANY subchannel is READY => policy is READY. - * CHECK: subchannel_list->num_ready > 0. - * - * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. - * CHECK: sd->curr_connectivity_state == CONNECTING. - * - * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: subchannel_list->num_transient_failures == - * subchannel_list->num_subchannels. - */ + // In priority order. The first rule to match terminates the search (ie, if we + // are on rule n, all previous rules were unfulfilled). + // + // 1) RULE: ANY subchannel is READY => policy is READY. + // CHECK: subchannel_list->num_ready > 0. + // + // 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. + // CHECK: sd->curr_connectivity_state == CONNECTING. + // + // 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + // TRANSIENT_FAILURE. + // CHECK: subchannel_list->num_transient_failures == + // subchannel_list->num_subchannels. if (num_ready_ > 0) { - /* 1) READY */ + // 1) READY p->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, absl::Status(), absl::make_unique(p, this)); } else if (num_connecting_ > 0) { - /* 2) CONNECTING */ + // 2) CONNECTING p->channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), absl::make_unique(p->Ref(DEBUG_LOCATION, "QueuePicker"))); } else if (num_transient_failure_ == num_subchannels()) { - /* 3) TRANSIENT_FAILURE */ + // 3) TRANSIENT_FAILURE absl::Status status = absl::UnavailableError("connections to all backends failing"); p->channel_control_helper()->UpdateState( @@ -432,24 +420,38 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( } void RoundRobin::UpdateLocked(UpdateArgs args) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, args.addresses.size()); - } - // Replace latest_pending_subchannel_list_. - if (latest_pending_subchannel_list_ != nullptr) { + ServerAddressList addresses; + if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, - "[RR %p] Shutting down previous pending subchannel list %p", this, - latest_pending_subchannel_list_.get()); + gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", + this, args.addresses->size()); + } + addresses = std::move(*args.addresses); + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, + args.addresses.status().ToString().c_str()); } + // If we already have a subchannel list, then ignore the resolver + // failure and keep using the existing list. + if (subchannel_list_ != nullptr) return; + } + // Replace latest_pending_subchannel_list_. + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && + latest_pending_subchannel_list_ != nullptr) { + gpr_log(GPR_INFO, + "[RR %p] Shutting down previous pending subchannel list %p", this, + latest_pending_subchannel_list_.get()); } latest_pending_subchannel_list_ = MakeOrphanable( - this, &grpc_lb_round_robin_trace, std::move(args.addresses), *args.args); + this, &grpc_lb_round_robin_trace, std::move(addresses), *args.args); if (latest_pending_subchannel_list_->num_subchannels() == 0) { // If the new list is empty, immediately promote the new list to the // current list and transition to TRANSIENT_FAILURE. - absl::Status status = absl::UnavailableError("Empty update"); + absl::Status status = + args.addresses.ok() ? absl::UnavailableError(absl::StrCat( + "empty address list: ", args.resolution_note)) + : args.addresses.status(); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index 785975d61f6..0b02a139468 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -123,7 +123,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { void Orphan() override; void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, - ServerAddressList addresses, + absl::StatusOr addresses, const grpc_channel_args* args); void ResetBackoffLocked(); void DeactivateLocked(); @@ -296,13 +296,18 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) { } } // Update all children. - HierarchicalAddressMap address_map = + absl::StatusOr address_map = MakeHierarchicalAddressMap(args.addresses); for (const auto& p : config_->target_map()) { const std::string& name = p.first; const WeightedTargetLbConfig::ChildConfig& config = p.second; - targets_[name]->UpdateLocked(config, std::move(address_map[name]), - args.args); + absl::StatusOr addresses; + if (address_map.ok()) { + addresses = std::move((*address_map)[name]); + } else { + addresses = address_map.status(); + } + targets_[name]->UpdateLocked(config, std::move(addresses), args.args); } UpdateStateLocked(); } @@ -473,7 +478,8 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( void WeightedTargetLb::WeightedChild::UpdateLocked( const WeightedTargetLbConfig::ChildConfig& config, - ServerAddressList addresses, const grpc_channel_args* args) { + absl::StatusOr addresses, + const grpc_channel_args* args) { if (weighted_target_policy_->shutting_down_) return; // Update child weight. weight_ = config.weight; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 49f45b4054b..6f1f3c6fa0e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -238,7 +238,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy { OrphanablePtr CreateChildPolicyLocked( const grpc_channel_args* args); - void UpdateChildPolicyLocked(ServerAddressList addresses, + void UpdateChildPolicyLocked(absl::StatusOr addresses, const grpc_channel_args* args); void MaybeUpdatePickerLocked(); @@ -470,8 +470,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { config_->cluster_name(), config_->eds_service_name()); } else { // Cluster name, EDS service name, and LRS server name should never - // change, because the EDS policy above us should be swapped out if - // that happens. + // change, because the xds_cluster_resolver policy above us should be + // swapped out if that happens. GPR_ASSERT(config_->cluster_name() == old_config->cluster_name()); GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name()); GPR_ASSERT(config_->lrs_load_reporting_server_name() == @@ -541,8 +541,9 @@ OrphanablePtr XdsClusterImplLb::CreateChildPolicyLocked( return lb_policy; } -void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses, - const grpc_channel_args* args) { +void XdsClusterImplLb::UpdateChildPolicyLocked( + absl::StatusOr addresses, + const grpc_channel_args* args) { // Create policy if needed. if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index 0455204b3e5..45356d9a404 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -122,7 +122,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { void Orphan() override; void UpdateLocked(RefCountedPtr config, - const ServerAddressList& addresses, + const absl::StatusOr& addresses, const grpc_channel_args* args); void ExitIdleLocked(); void ResetBackoffLocked(); @@ -441,7 +441,8 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked( void XdsClusterManagerLb::ClusterChild::UpdateLocked( RefCountedPtr config, - const ServerAddressList& addresses, const grpc_channel_args* args) { + const absl::StatusOr& addresses, + const grpc_channel_args* args) { if (xds_cluster_manager_policy_->shutting_down_) return; // Update child weight. // Reactivate if needed. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index b420b044c09..81408e96019 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -267,9 +267,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { ~ResolverResultHandler() override {} - void ReturnResult(Resolver::Result result) override; - - void ReturnError(grpc_error_handle error) override; + void ReportResult(Resolver::Result result) override; private: RefCountedPtr discovery_mechanism_; @@ -489,13 +487,21 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() { // void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: - ReturnResult(Resolver::Result result) { - // convert result to eds update + ReportResult(Resolver::Result result) { + if (!result.addresses.ok()) { + discovery_mechanism_->parent()->OnError( + discovery_mechanism_->index(), + absl_status_to_grpc_error(result.addresses.status())); + return; + } + // Convert resolver result to EDS update. + // TODO(roth): Figure out a way to pass resolution_note through to the + // child policy. XdsEndpointResource update; XdsEndpointResource::Priority::Locality locality; locality.name = MakeRefCounted("", "", ""); locality.lb_weight = 1; - locality.endpoints = std::move(result.addresses); + locality.endpoints = std::move(*result.addresses); XdsEndpointResource::Priority priority; priority.localities.emplace(locality.name.get(), std::move(locality)); update.priorities.emplace_back(std::move(priority)); @@ -503,11 +509,6 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: discovery_mechanism_->index(), std::move(update)); } -void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: - ReturnError(grpc_error_handle error) { - discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error); -} - // // XdsClusterResolverLb public methods // diff --git a/src/core/ext/filters/client_channel/resolver.cc b/src/core/ext/filters/client_channel/resolver.cc index a5494b1666a..c6b251c91b3 100644 --- a/src/core/ext/filters/client_channel/resolver.cc +++ b/src/core/ext/filters/client_channel/resolver.cc @@ -38,35 +38,28 @@ Resolver::Resolver() // Resolver::Result // -Resolver::Result::~Result() { - GRPC_ERROR_UNREF(service_config_error); - grpc_channel_args_destroy(args); -} +Resolver::Result::~Result() { grpc_channel_args_destroy(args); } -Resolver::Result::Result(const Result& other) { - addresses = other.addresses; - service_config = other.service_config; - service_config_error = GRPC_ERROR_REF(other.service_config_error); - args = grpc_channel_args_copy(other.args); -} +Resolver::Result::Result(const Result& other) + : addresses(other.addresses), + service_config(other.service_config), + resolution_note(other.resolution_note), + args(grpc_channel_args_copy(other.args)) {} -Resolver::Result::Result(Result&& other) noexcept { - addresses = std::move(other.addresses); - service_config = std::move(other.service_config); - service_config_error = other.service_config_error; - other.service_config_error = GRPC_ERROR_NONE; - args = other.args; +Resolver::Result::Result(Result&& other) noexcept + : addresses(std::move(other.addresses)), + service_config(std::move(other.service_config)), + resolution_note(std::move(other.resolution_note)), + // TODO(roth): Use std::move() once channel args is converted to C++. + args(other.args) { other.args = nullptr; } Resolver::Result& Resolver::Result::operator=(const Result& other) { - if (&other == this) { - return *this; - } + if (&other == this) return *this; addresses = other.addresses; service_config = other.service_config; - GRPC_ERROR_UNREF(service_config_error); - service_config_error = GRPC_ERROR_REF(other.service_config_error); + resolution_note = other.resolution_note; grpc_channel_args_destroy(args); args = grpc_channel_args_copy(other.args); return *this; @@ -75,9 +68,8 @@ Resolver::Result& Resolver::Result::operator=(const Result& other) { Resolver::Result& Resolver::Result::operator=(Result&& other) noexcept { addresses = std::move(other.addresses); service_config = std::move(other.service_config); - GRPC_ERROR_UNREF(service_config_error); - service_config_error = other.service_config_error; - other.service_config_error = GRPC_ERROR_NONE; + resolution_note = std::move(other.resolution_note); + // TODO(roth): Use std::move() once channel args is converted to C++. grpc_channel_args_destroy(args); args = other.args; other.args = nullptr; diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 028baa9158a..f9400b91d51 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -1,26 +1,26 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H #include +#include "absl/status/statusor.h" + #include #include "src/core/ext/filters/client_channel/server_address.h" @@ -53,13 +53,23 @@ class Resolver : public InternallyRefCounted { public: /// Results returned by the resolver. struct Result { - ServerAddressList addresses; - RefCountedPtr service_config; - grpc_error_handle service_config_error = GRPC_ERROR_NONE; + /// A list of addresses, or an error. + absl::StatusOr addresses; + /// A service config, or an error. + absl::StatusOr> service_config = nullptr; + /// An optional human-readable note describing context about the resolution, + /// to be passed along to the LB policy for inclusion in RPC failure status + /// messages in cases where neither \a addresses nor \a service_config + /// has a non-OK status. For example, a resolver that returns an empty + /// address list but a valid service config may set to this to something + /// like "no DNS entries found for ". + std::string resolution_note; + // TODO(roth): Before making this a public API, figure out a way to + // avoid exposing channel args this way. const grpc_channel_args* args = nullptr; - // TODO(roth): Remove everything below once grpc_error and - // grpc_channel_args are convert to copyable and movable C++ objects. + // TODO(roth): Remove everything below once grpc_channel_args is + // converted to a copyable and movable C++ object. Result() = default; ~Result(); Result(const Result& other); @@ -74,17 +84,11 @@ class Resolver : public InternallyRefCounted { public: virtual ~ResultHandler() {} - /// Returns a result to the channel. - /// Takes ownership of \a result.args. - virtual void ReturnResult(Result result) = 0; // NOLINT - - /// Returns a transient error to the channel. - /// If the resolver does not set the GRPC_ERROR_INT_GRPC_STATUS - /// attribute on the error, calls will be failed with status UNKNOWN. - virtual void ReturnError(grpc_error_handle error) = 0; - - // TODO(yashkt): As part of the service config error handling - // changes, add a method to parse the service config JSON string. + /// Reports a result to the channel. + // TODO(roth): Add a mechanism for the resolver to get back a signal + // indicating if the result was accepted by the LB policy, so that it + // knows whether to go into backoff to retry to resolution. + virtual void ReportResult(Result result) = 0; // NOLINT }; // Not copyable nor movable. @@ -113,9 +117,6 @@ class Resolver : public InternallyRefCounted { /// Resets the re-resolution backoff, if any. /// This needs to be implemented only by pull-based implementations; /// for push-based implementations, it will be a no-op. - /// TODO(roth): Pull the backoff code out of resolver and into - /// client_channel, so that it can be shared across resolver - /// implementations. At that point, this method can go away. virtual void ResetBackoffLocked() {} // Note: This must be invoked while holding the work_serializer. @@ -133,4 +134,4 @@ class Resolver : public InternallyRefCounted { } // namespace grpc_core -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H */ +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H diff --git a/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc b/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc index 68f21c80e1a..1a346c11909 100644 --- a/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc @@ -46,7 +46,7 @@ class BinderResolver : public Resolver { result.addresses = std::move(addresses_); result.args = channel_args_; channel_args_ = nullptr; - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); } void ShutdownLocked() override {} diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 2284872184b..6df35cbb964 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include @@ -50,6 +48,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/json/json.h" +#include "src/core/lib/transport/error_utils.h" #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 @@ -320,21 +319,35 @@ void AresDnsResolver::OnResolvedLocked(grpc_error_handle error) { GRPC_ERROR_UNREF(error); return; } + // TODO(roth): Change logic to be able to report failures for addresses + // and service config independently of each other. if (addresses_ != nullptr || balancer_addresses_ != nullptr) { Result result; if (addresses_ != nullptr) { result.addresses = std::move(*addresses_); + } else { + result.addresses = ServerAddressList(); } if (service_config_json_ != nullptr) { - std::string service_config_string = ChooseServiceConfig( - service_config_json_, &result.service_config_error); + grpc_error_handle service_config_error = GRPC_ERROR_NONE; + std::string service_config_string = + ChooseServiceConfig(service_config_json_, &service_config_error); gpr_free(service_config_json_); - if (result.service_config_error == GRPC_ERROR_NONE && + RefCountedPtr service_config; + if (service_config_error == GRPC_ERROR_NONE && !service_config_string.empty()) { GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", this, service_config_string.c_str()); - result.service_config = ServiceConfig::Create( - channel_args_, service_config_string, &result.service_config_error); + service_config = ServiceConfig::Create( + channel_args_, service_config_string, &service_config_error); + } + if (service_config_error != GRPC_ERROR_NONE) { + result.service_config = absl::UnavailableError( + absl::StrCat("failed to parse service config: ", + grpc_error_std_string(service_config_error))); + GRPC_ERROR_UNREF(service_config_error); + } else { + result.service_config = std::move(service_config); } } absl::InlinedVector new_args; @@ -344,7 +357,7 @@ void AresDnsResolver::OnResolvedLocked(grpc_error_handle error) { } result.args = grpc_channel_args_copy_and_add(channel_args_, new_args.data(), new_args.size()); - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); addresses_.reset(); balancer_addresses_.reset(); // Reset backoff state so that we start from the beginning when the @@ -353,12 +366,14 @@ void AresDnsResolver::OnResolvedLocked(grpc_error_handle error) { } else { GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this, grpc_error_std_string(error).c_str()); - std::string error_message = - absl::StrCat("DNS resolution failed for ", name_to_resolve_); - result_handler_->ReturnError(grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(error_message.c_str(), - &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + std::string error_message; + grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message); + absl::Status status = absl::UnavailableError(absl::StrCat( + "DNS resolution failed for ", name_to_resolve_, ": ", error_message)); + Result result; + result.addresses = status; + result.service_config = status; + result_handler_->ReportResult(std::move(result)); // Set retry timer. // InvalidateNow to avoid getting stuck re-initializing this timer // in a loop while draining the currently-held WorkSerializer. diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index c1bda75f78d..2bafca59fc2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include @@ -179,15 +177,16 @@ void NativeDnsResolver::OnResolvedLocked(grpc_error_handle error) { return; } if (addresses_ != nullptr) { - Result result; + ServerAddressList addresses; for (size_t i = 0; i < addresses_->naddrs; ++i) { - result.addresses.emplace_back(&addresses_->addrs[i].addr, - addresses_->addrs[i].len, - nullptr /* args */); + addresses.emplace_back(&addresses_->addrs[i].addr, + addresses_->addrs[i].len, nullptr /* args */); } grpc_resolved_addresses_destroy(addresses_); + Result result; + result.addresses = std::move(addresses); result.args = grpc_channel_args_copy(channel_args_); - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); // Reset backoff state so that we start from the beginning when the // next request gets triggered. backoff_.Reset(); @@ -195,12 +194,12 @@ void NativeDnsResolver::OnResolvedLocked(grpc_error_handle error) { gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_std_string(error).c_str()); // Return transient error. - std::string error_message = - absl::StrCat("DNS resolution failed for service: ", name_to_resolve_); - result_handler_->ReturnError(grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(error_message.c_str(), - &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + std::string error_message; + grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message); + Result result; + result.addresses = absl::UnavailableError(absl::StrCat( + "DNS resolution failed for ", name_to_resolve_, ": ", error_message)); + result_handler_->ReportResult(std::move(result)); // Set up for retry. // InvalidateNow to avoid getting stuck re-initializing this timer // in a loop while draining the currently-held WorkSerializer. diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index dceef6f096c..c71c20bd328 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -142,23 +142,22 @@ void FakeResolver::MaybeSendResultLocked() { if (!started_ || shutdown_) return; if (return_failure_) { // TODO(roth): Change resolver result generator to be able to inject - // the error to be returned. - result_handler_->ReturnError(grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver transient failure"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + // the error to be returned and to be able to independently set errors + // for addresses and service config. + Result result; + result.addresses = absl::UnavailableError("Resolver transient failure"); + result.service_config = result.addresses.status(); + result_handler_->ReportResult(std::move(result)); return_failure_ = false; } else if (has_next_result_) { Result result; result.addresses = std::move(next_result_.addresses); result.service_config = std::move(next_result_.service_config); - // TODO(roth): Use std::move() once grpc_error is converted to C++. - result.service_config_error = next_result_.service_config_error; - next_result_.service_config_error = GRPC_ERROR_NONE; // When both next_results_ and channel_args_ contain an arg with the same // name, only the one in next_results_ will be kept since next_results_ is // before channel_args_. result.args = grpc_channel_args_union(next_result_.args, channel_args_); - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); has_next_result_ = false; } } diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 46efbb06b21..245752392e0 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -73,7 +73,7 @@ void SockaddrResolver::StartLocked() { // TODO(roth): Use std::move() once channel args is converted to C++. result.args = channel_args_; channel_args_ = nullptr; - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); } // diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index fd8a9d304bc..42407fa08d8 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -35,6 +35,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/timeout_encoding.h" namespace grpc_core { @@ -278,8 +279,7 @@ class XdsResolver : public Resolver { void OnError(grpc_error_handle error); void OnResourceDoesNotExist(); - grpc_error_handle CreateServiceConfig( - RefCountedPtr* service_config); + absl::StatusOr> CreateServiceConfig(); void GenerateResult(); void MaybeRemoveUnusedClusters(); @@ -682,7 +682,12 @@ void XdsResolver::StartLocked() { "Failed to create xds client -- channel will remain in " "TRANSIENT_FAILURE: %s", grpc_error_std_string(error).c_str()); - result_handler_->ReturnError(error); + std::string error_message; + grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message); + Result result; + result.service_config = absl::UnavailableError( + absl::StrCat("Failed to create XdsClient: ", error_message)); + result_handler_->ReportResult(std::move(result)); return; } grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), @@ -800,8 +805,9 @@ void XdsResolver::OnError(grpc_error_handle error) { Result result; grpc_arg new_arg = xds_client_->MakeChannelArg(); result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1); - result.service_config_error = error; - result_handler_->ReturnResult(std::move(result)); + result.service_config = grpc_error_to_absl_status(error); + result_handler_->ReportResult(std::move(result)); + GRPC_ERROR_UNREF(error); } void XdsResolver::OnResourceDoesNotExist() { @@ -814,15 +820,15 @@ void XdsResolver::OnResourceDoesNotExist() { } current_virtual_host_.routes.clear(); Result result; - result.service_config = - ServiceConfig::Create(args_, "{}", &result.service_config_error); - GPR_ASSERT(result.service_config != nullptr); + grpc_error_handle error = GRPC_ERROR_NONE; + result.service_config = ServiceConfig::Create(args_, "{}", &error); + GPR_ASSERT(*result.service_config != nullptr); result.args = grpc_channel_args_copy(args_); - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); } -grpc_error_handle XdsResolver::CreateServiceConfig( - RefCountedPtr* service_config) { +absl::StatusOr> +XdsResolver::CreateServiceConfig() { std::vector clusters; for (const auto& cluster : cluster_state_map_) { clusters.push_back( @@ -849,8 +855,13 @@ grpc_error_handle XdsResolver::CreateServiceConfig( "}"); std::string json = absl::StrJoin(config_parts, ""); grpc_error_handle error = GRPC_ERROR_NONE; - *service_config = ServiceConfig::Create(args_, json.c_str(), &error); - return error; + absl::StatusOr> result = + ServiceConfig::Create(args_, json.c_str(), &error); + if (error != GRPC_ERROR_NONE) { + result = grpc_error_to_absl_status(error); + GRPC_ERROR_UNREF(error); + } + return result; } void XdsResolver::GenerateResult() { @@ -865,15 +876,12 @@ void XdsResolver::GenerateResult() { return; } Result result; - error = CreateServiceConfig(&result.service_config); - if (error != GRPC_ERROR_NONE) { - OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE)); - return; - } + result.service_config = CreateServiceConfig(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this, - result.service_config->json_string().c_str()); + result.service_config.ok() + ? (*result.service_config)->json_string().c_str() + : result.service_config.status().ToString().c_str()); } grpc_arg new_args[] = { xds_client_->MakeChannelArg(), @@ -881,7 +889,7 @@ void XdsResolver::GenerateResult() { }; result.args = grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args)); - result_handler_->ReturnResult(std::move(result)); + result_handler_->ReportResult(std::move(result)); } void XdsResolver::MaybeRemoveUnusedClusters() { diff --git a/test/core/client_channel/resolvers/binder_resolver_test.cc b/test/core/client_channel/resolvers/binder_resolver_test.cc index 10431378625..30b12b3d5ea 100644 --- a/test/core/client_channel/resolvers/binder_resolver_test.cc +++ b/test/core/client_channel/resolvers/binder_resolver_test.cc @@ -65,10 +65,11 @@ class BinderResolverTest : public ::testing::Test { explicit ResultHandler(const std::string& expected_binder_id) : expect_result_(true), expected_binder_id_(expected_binder_id) {} - void ReturnResult(grpc_core::Resolver::Result result) override { + void ReportResult(grpc_core::Resolver::Result result) override { EXPECT_TRUE(expect_result_); - ASSERT_TRUE(result.addresses.size() == 1); - grpc_core::ServerAddress addr = result.addresses[0]; + ASSERT_TRUE(result.addresses.ok()); + ASSERT_EQ(result.addresses->size(), 1); + grpc_core::ServerAddress addr = (*result.addresses)[0]; const struct sockaddr_un* un = reinterpret_cast(addr.address().addr); EXPECT_EQ(addr.address().len, @@ -77,12 +78,8 @@ class BinderResolverTest : public ::testing::Test { EXPECT_EQ(un->sun_path, expected_binder_id_); } - void ReturnError(grpc_error_handle error) override { - GRPC_ERROR_UNREF(error); - } - private: - // Whether we expect ReturnResult function to be invoked + // Whether we expect ReportResult function to be invoked bool expect_result_ = false; std::string expected_binder_id_; diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index d82d92d459f..9486ad9ea05 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -113,31 +113,20 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { public: struct ResolverOutput { grpc_core::Resolver::Result result; - grpc_error_handle error = GRPC_ERROR_NONE; gpr_event ev; ResolverOutput() { gpr_event_init(&ev); } - ~ResolverOutput() { GRPC_ERROR_UNREF(error); } }; void SetOutput(ResolverOutput* output) { gpr_atm_rel_store(&output_, reinterpret_cast(output)); } - void ReturnResult(grpc_core::Resolver::Result result) override { + void ReportResult(grpc_core::Resolver::Result result) override { ResolverOutput* output = reinterpret_cast(gpr_atm_acq_load(&output_)); GPR_ASSERT(output != nullptr); output->result = std::move(result); - output->error = GRPC_ERROR_NONE; - gpr_event_set(&output->ev, reinterpret_cast(1)); - } - - void ReturnError(grpc_error_handle error) override { - ResolverOutput* output = - reinterpret_cast(gpr_atm_acq_load(&output_)); - GPR_ASSERT(output != nullptr); - output->error = error; gpr_event_set(&output->ev, reinterpret_cast(1)); } @@ -180,15 +169,14 @@ int main(int argc, char** argv) { resolver->StartLocked(); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(wait_loop(5, &output1.ev)); - GPR_ASSERT(output1.result.addresses.empty()); - GPR_ASSERT(output1.error != GRPC_ERROR_NONE); + GPR_ASSERT(!output1.result.addresses.ok()); ResultHandler::ResolverOutput output2; result_handler->SetOutput(&output2); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(wait_loop(30, &output2.ev)); - GPR_ASSERT(!output2.result.addresses.empty()); - GPR_ASSERT(output2.error == GRPC_ERROR_NONE); + GPR_ASSERT(output2.result.addresses.ok()); + GPR_ASSERT(!output2.result.addresses->empty()); } grpc_shutdown(); diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index ed5402216ad..66bdea2e5b2 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -202,7 +202,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { state_ = state; } - void ReturnResult(grpc_core::Resolver::Result /*result*/) override { + void ReportResult(grpc_core::Resolver::Result /*result*/) override { GPR_ASSERT(result_cb_ != nullptr); GPR_ASSERT(state_ != nullptr); ResultCallback cb = result_cb_; @@ -212,12 +212,6 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { cb(state); } - void ReturnError(grpc_error_handle error) override { - gpr_log(GPR_ERROR, "resolver returned error: %s", - grpc_error_std_string(error).c_str()); - GPR_ASSERT(false); - } - private: ResultCallback result_cb_ = nullptr; OnResolutionCallbackArg* state_ = nullptr; diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index 1e176c493db..d7e5c49e8d6 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -32,8 +32,7 @@ static std::shared_ptr* g_work_serializer; class TestResultHandler : public grpc_core::Resolver::ResultHandler { - void ReturnResult(grpc_core::Resolver::Result /*result*/) override {} - void ReturnError(grpc_error_handle /*error*/) override {} + void ReportResult(grpc_core::Resolver::Result /*result*/) override {} }; static void test_succeeds(grpc_core::ResolverFactory* factory, diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc index 010fb1cf879..ae8ed5ac3c9 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.cc +++ b/test/core/client_channel/resolvers/fake_resolver_test.cc @@ -45,21 +45,20 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { ev_ = ev; } - void ReturnResult(grpc_core::Resolver::Result actual) override { + void ReportResult(grpc_core::Resolver::Result actual) override { GPR_ASSERT(ev_ != nullptr); // We only check the addresses, because that's the only thing // explicitly set by the test via // FakeResolverResponseGenerator::SetResponse(). - GPR_ASSERT(actual.addresses.size() == expected_.addresses.size()); - for (size_t i = 0; i < expected_.addresses.size(); ++i) { - GPR_ASSERT(actual.addresses[i] == expected_.addresses[i]); + GPR_ASSERT(actual.addresses.ok()); + GPR_ASSERT(actual.addresses->size() == expected_.addresses->size()); + for (size_t i = 0; i < expected_.addresses->size(); ++i) { + GPR_ASSERT((*actual.addresses)[i] == (*expected_.addresses)[i]); } gpr_event_set(ev_, reinterpret_cast(1)); ev_ = nullptr; } - void ReturnError(grpc_error_handle /*error*/) override {} - private: grpc_core::Resolver::Result expected_; gpr_event* ev_ = nullptr; @@ -89,7 +88,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() { static size_t test_counter = 0; const size_t num_addresses = 2; // Create address list. - grpc_core::Resolver::Result result; + grpc_core::ServerAddressList addresses; for (size_t i = 0; i < num_addresses; ++i) { std::string uri_string = absl::StrFormat("ipv4:127.0.0.1:100%" PRIuPTR, test_counter * num_addresses + i); @@ -98,11 +97,12 @@ static grpc_core::Resolver::Result create_new_resolver_result() { grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*uri, &address)); absl::InlinedVector args_to_add; - result.addresses.emplace_back( - address.addr, address.len, - grpc_channel_args_copy_and_add(nullptr, nullptr, 0)); + addresses.emplace_back(address.addr, address.len, + grpc_channel_args_copy_and_add(nullptr, nullptr, 0)); } ++test_counter; + grpc_core::Resolver::Result result; + result.addresses = std::move(addresses); return result; } diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc index bdb2830b246..53543483b86 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc @@ -32,11 +32,7 @@ static std::shared_ptr* g_work_serializer; class ResultHandler : public grpc_core::Resolver::ResultHandler { public: - void ReturnResult(grpc_core::Resolver::Result /*result*/) override {} - - void ReturnError(grpc_error_handle error) override { - GRPC_ERROR_UNREF(error); - } + void ReportResult(grpc_core::Resolver::Result /*result*/) override {} }; static void test_succeeds(grpc_core::ResolverFactory* factory, diff --git a/test/core/iomgr/stranded_event_test.cc b/test/core/iomgr/stranded_event_test.cc index 3634b2cc494..6766297209e 100644 --- a/test/core/iomgr/stranded_event_test.cc +++ b/test/core/iomgr/stranded_event_test.cc @@ -293,6 +293,7 @@ class TestServer { grpc_core::Resolver::Result BuildResolverResponse( const std::vector& addresses) { grpc_core::Resolver::Result result; + result.addresses = grpc_core::ServerAddressList(); for (const auto& address_str : addresses) { absl::StatusOr uri = grpc_core::URI::Parse(address_str); if (!uri.ok()) { @@ -302,7 +303,7 @@ grpc_core::Resolver::Result BuildResolverResponse( } grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*uri, &address)); - result.addresses.emplace_back(address.addr, address.len, nullptr); + result.addresses->emplace_back(address.addr, address.len, nullptr); } return result; } diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index d1a6ec7c415..c469badbb1a 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -403,6 +403,7 @@ TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) { grpc_core::Resolver::Result BuildResolverResult( const std::vector& addresses) { grpc_core::Resolver::Result result; + result.addresses = grpc_core::ServerAddressList(); for (const auto& address_str : addresses) { absl::StatusOr uri = grpc_core::URI::Parse(address_str); if (!uri.ok()) { @@ -412,7 +413,7 @@ grpc_core::Resolver::Result BuildResolverResult( } grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*uri, &address)); - result.addresses.emplace_back(address.addr, address.len, nullptr); + result.addresses->emplace_back(address.addr, address.len, nullptr); } return result; } diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 067ca942944..056597a24d5 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -447,11 +447,11 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { config->address().c_str()); auto uri = URI::Parse(config->address()); args.config.reset(); - args.addresses.clear(); + args.addresses = ServerAddressList(); if (uri.ok()) { grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*uri, &address)); - args.addresses.emplace_back(address, /*args=*/nullptr); + args.addresses->emplace_back(address, /*args=*/nullptr); } else { gpr_log(GPR_ERROR, "%s: could not parse URI (%s), using empty address list", diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index d8f9f75bebd..e66d8709a67 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -199,6 +199,7 @@ class FakeResolverResponseGeneratorWrapper { std::unique_ptr attribute = nullptr) { grpc_core::Resolver::Result result; + result.addresses = grpc_core::ServerAddressList(); for (const int& port : ports) { absl::StatusOr lb_uri = grpc_core::URI::Parse( absl::StrCat(ipv6_only ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port)); @@ -211,13 +212,14 @@ class FakeResolverResponseGeneratorWrapper { if (attribute != nullptr) { attributes[attribute_key] = attribute->Copy(); } - result.addresses.emplace_back(address.addr, address.len, - nullptr /* args */, std::move(attributes)); + result.addresses->emplace_back(address.addr, address.len, + nullptr /* args */, std::move(attributes)); } if (service_config_json != nullptr) { + grpc_error_handle error = GRPC_ERROR_NONE; result.service_config = grpc_core::ServiceConfig::Create( - nullptr, service_config_json, &result.service_config_error); - GPR_ASSERT(result.service_config != nullptr); + nullptr, service_config_json, &error); + GPR_ASSERT(*result.service_config != nullptr); } return result; } diff --git a/test/cpp/end2end/rls_end2end_test.cc b/test/cpp/end2end/rls_end2end_test.cc index b34a7ec4cdc..fd503d524fa 100644 --- a/test/cpp/end2end/rls_end2end_test.cc +++ b/test/cpp/end2end/rls_end2end_test.cc @@ -228,13 +228,13 @@ class FakeResolverResponseGeneratorWrapper { static grpc_core::Resolver::Result BuildFakeResults( absl::string_view service_config_json) { grpc_core::Resolver::Result result; - result.service_config_error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; result.service_config = grpc_core::ServiceConfig::Create( - result.args, service_config_json, &result.service_config_error); - EXPECT_EQ(result.service_config_error, GRPC_ERROR_NONE) + result.args, service_config_json, &error); + EXPECT_EQ(error, GRPC_ERROR_NONE) << "JSON: " << service_config_json - << "Error: " << grpc_error_std_string(result.service_config_error); - EXPECT_NE(result.service_config, nullptr); + << "Error: " << grpc_error_std_string(error); + EXPECT_NE(*result.service_config, nullptr); return result; } diff --git a/test/cpp/end2end/service_config_end2end_test.cc b/test/cpp/end2end/service_config_end2end_test.cc index bd64e1722f8..2de452fa575 100644 --- a/test/cpp/end2end/service_config_end2end_test.cc +++ b/test/cpp/end2end/service_config_end2end_test.cc @@ -55,6 +55,7 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/core/lib/transport/error_utils.h" #include "src/cpp/client/secure_credentials.h" #include "src/cpp/server/secure_server_credentials.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -173,6 +174,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test { grpc_core::Resolver::Result BuildFakeResults(const std::vector& ports) { grpc_core::Resolver::Result result; + result.addresses = grpc_core::ServerAddressList(); for (const int& port : ports) { std::string lb_uri_str = absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port); @@ -180,8 +182,8 @@ class ServiceConfigEnd2endTest : public ::testing::Test { GPR_ASSERT(lb_uri.ok()); grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*lb_uri, &address)); - result.addresses.emplace_back(address.addr, address.len, - nullptr /* args */); + result.addresses->emplace_back(address.addr, address.len, + nullptr /* args */); } return result; } @@ -195,16 +197,18 @@ class ServiceConfigEnd2endTest : public ::testing::Test { void SetNextResolutionValidServiceConfig(const std::vector& ports) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result = BuildFakeResults(ports); - result.service_config = grpc_core::ServiceConfig::Create( - nullptr, "{}", &result.service_config_error); + grpc_error_handle error = GRPC_ERROR_NONE; + result.service_config = + grpc_core::ServiceConfig::Create(nullptr, "{}", &error); + ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error); response_generator_->SetResponse(result); } void SetNextResolutionInvalidServiceConfig(const std::vector& ports) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result = BuildFakeResults(ports); - result.service_config = grpc_core::ServiceConfig::Create( - nullptr, "{", &result.service_config_error); + result.service_config = + absl::InvalidArgumentError("error parsing service config"); response_generator_->SetResponse(result); } @@ -212,8 +216,13 @@ class ServiceConfigEnd2endTest : public ::testing::Test { const char* svc_cfg) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result = BuildFakeResults(ports); - result.service_config = grpc_core::ServiceConfig::Create( - nullptr, svc_cfg, &result.service_config_error); + grpc_error_handle error = GRPC_ERROR_NONE; + result.service_config = + grpc_core::ServiceConfig::Create(nullptr, svc_cfg, &error); + if (error != GRPC_ERROR_NONE) { + result.service_config = grpc_error_to_absl_status(error); + GRPC_ERROR_UNREF(error); + } response_generator_->SetResponse(result); } diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 47b4c8607e0..8bd38efc88d 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -145,12 +145,10 @@ class AssertFailureResultHandler : public grpc_core::Resolver::ResultHandler { gpr_mu_unlock(args_->mu); } - void ReturnResult(grpc_core::Resolver::Result /*result*/) override { + void ReportResult(grpc_core::Resolver::Result /*result*/) override { GPR_ASSERT(false); } - void ReturnError(grpc_error_handle /*error*/) override { GPR_ASSERT(false); } - private: ArgsStruct* args_; }; diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index a230b5a49fe..8a453bca62e 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -264,19 +264,18 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) { } void CheckServiceConfigResultLocked(const char* service_config_json, - grpc_error_handle service_config_error, + absl::Status service_config_error, ArgsStruct* args) { if (!args->expected_service_config_string.empty()) { - GPR_ASSERT(service_config_json != nullptr); + ASSERT_NE(service_config_json, nullptr); EXPECT_EQ(service_config_json, args->expected_service_config_string); } if (args->expected_service_config_error.empty()) { - EXPECT_EQ(service_config_error, GRPC_ERROR_NONE); + EXPECT_TRUE(service_config_error.ok()); } else { - EXPECT_THAT(grpc_error_std_string(service_config_error), + EXPECT_THAT(service_config_error.ToString(), testing::HasSubstr(args->expected_service_config_error)); } - GRPC_ERROR_UNREF(service_config_error); } void CheckLBPolicyResultLocked(const grpc_channel_args* channel_args, @@ -430,7 +429,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { explicit ResultHandler(ArgsStruct* args) : args_(args) {} - void ReturnResult(grpc_core::Resolver::Result result) override { + void ReportResult(grpc_core::Resolver::Result result) override { CheckResult(result); gpr_atm_rel_store(&args_->done_atm, 1); gpr_mu_lock(args_->mu); @@ -439,12 +438,6 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { gpr_mu_unlock(args_->mu); } - void ReturnError(grpc_error_handle error) override { - gpr_log(GPR_ERROR, "resolver returned error: %s", - grpc_error_std_string(error).c_str()); - GPR_ASSERT(false); - } - virtual void CheckResult(const grpc_core::Resolver::Result& /*result*/) {} protected: @@ -465,9 +458,10 @@ class CheckingResultHandler : public ResultHandler { explicit CheckingResultHandler(ArgsStruct* args) : ResultHandler(args) {} void CheckResult(const grpc_core::Resolver::Result& result) override { + ASSERT_TRUE(result.addresses.ok()) << result.addresses.status().ToString(); ArgsStruct* args = args_struct(); std::vector found_lb_addrs; - AddActualAddresses(result.addresses, /*is_balancer=*/false, + AddActualAddresses(*result.addresses, /*is_balancer=*/false, &found_lb_addrs); const grpc_core::ServerAddressList* balancer_addresses = grpc_core::FindGrpclbBalancerAddressesInChannelArgs(*result.args); @@ -478,7 +472,7 @@ class CheckingResultHandler : public ResultHandler { gpr_log(GPR_INFO, "found %" PRIdPTR " backend addresses and %" PRIdPTR " balancer addresses", - result.addresses.size(), + result.addresses->size(), balancer_addresses == nullptr ? 0L : balancer_addresses->size()); if (args->expected_addrs.size() != found_lb_addrs.size()) { gpr_log(GPR_DEBUG, @@ -499,12 +493,17 @@ class CheckingResultHandler : public ResultHandler { absl::GetFlag(FLAGS_do_ordered_address_comparison).c_str()); GPR_ASSERT(0); } - const char* service_config_json = - result.service_config == nullptr - ? nullptr - : result.service_config->json_string().c_str(); - CheckServiceConfigResultLocked( - service_config_json, GRPC_ERROR_REF(result.service_config_error), args); + if (!result.service_config.ok()) { + CheckServiceConfigResultLocked(nullptr, result.service_config.status(), + args); + } else { + const char* service_config_json = + *result.service_config == nullptr + ? nullptr + : (*result.service_config)->json_string().c_str(); + CheckServiceConfigResultLocked(service_config_json, absl::OkStatus(), + args); + } if (args->expected_service_config_string.empty()) { CheckLBPolicyResultLocked(result.args, args); } diff --git a/test/cpp/naming/resolver_component_tests_runner.py b/test/cpp/naming/resolver_component_tests_runner.py index fbf0d22183a..47cca33839c 100755 --- a/test/cpp/naming/resolver_component_tests_runner.py +++ b/test/cpp/naming/resolver_component_tests_runner.py @@ -612,7 +612,7 @@ current_test_subprocess = subprocess.Popen([ '--target_name', 'ipv4-config-causing-fallback-to-tcp-inject-broken-nameservers.resolver-tests-version-4.grpctestingexp.', '--do_ordered_address_comparison', 'False', '--expected_addrs', '1.2.3.4:443,False', - '--expected_chosen_service_config', '{"loadBalancingPolicy":["round_robin"]}', + '--expected_chosen_service_config', '', '--expected_service_config_error', 'field:loadBalancingPolicy error:type should be string', '--expected_lb_policy', '', '--enable_srv_queries', 'True', diff --git a/test/cpp/naming/resolver_test_record_groups.yaml b/test/cpp/naming/resolver_test_record_groups.yaml index af89fb73637..8121b67faaa 100644 --- a/test/cpp/naming/resolver_test_record_groups.yaml +++ b/test/cpp/naming/resolver_test_record_groups.yaml @@ -464,7 +464,7 @@ resolver_component_tests: - expected_addrs: - {address: '1.2.3.4:443', is_balancer: false} do_ordered_address_comparison: false - expected_chosen_service_config: '{"loadBalancingPolicy":["round_robin"]}' + expected_chosen_service_config: null expected_service_config_error: 'field:loadBalancingPolicy error:type should be string' expected_lb_policy: null enable_srv_queries: true