improve RPC status messages when name resolution fails (#28091)

* improve RPC status messages when name resolution fails

* fix tests

* clang-format

* fix rls_end2end_test

* fix build

* fix service_config_end2end_test

* fix too_many_pings_test

* fix tests

* fix client_channel to propagate resolution_note to LB policy

* improve comment
pull/28301/head
Mark D. Roth 3 years ago committed by GitHub
parent 858eff5ab9
commit ac70281e74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      src/core/ext/filters/client_channel/client_channel.cc
  2. 6
      src/core/ext/filters/client_channel/client_channel.h
  3. 27
      src/core/ext/filters/client_channel/lb_policy.cc
  4. 20
      src/core/ext/filters/client_channel/lb_policy.h
  5. 7
      src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
  6. 9
      src/core/ext/filters/client_channel/lb_policy/address_filtering.h
  7. 51
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  8. 66
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  9. 8
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  10. 33
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  11. 21
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  12. 112
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  13. 16
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  14. 9
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  15. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  16. 23
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  17. 40
      src/core/ext/filters/client_channel/resolver.cc
  18. 75
      src/core/ext/filters/client_channel/resolver.h
  19. 2
      src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc
  20. 73
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  21. 55
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  22. 15
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  23. 2
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
  24. 50
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  25. 13
      test/core/client_channel/resolvers/binder_resolver_test.cc
  26. 20
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  27. 8
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  28. 3
      test/core/client_channel/resolvers/dns_resolver_test.cc
  29. 18
      test/core/client_channel/resolvers/fake_resolver_test.cc
  30. 6
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc
  31. 3
      test/core/iomgr/stranded_event_test.cc
  32. 3
      test/core/transport/chttp2/too_many_pings_test.cc
  33. 4
      test/core/util/test_lb_policies.cc
  34. 8
      test/cpp/end2end/client_lb_end2end_test.cc
  35. 10
      test/cpp/end2end/rls_end2end_test.cc
  36. 23
      test/cpp/end2end/service_config_end2end_test.cc
  37. 4
      test/cpp/naming/cancel_ares_query_test.cc
  38. 35
      test/cpp/naming/resolver_component_test.cc
  39. 2
      test/cpp/naming/resolver_component_tests_runner.py
  40. 2
      test/cpp/naming/resolver_test_record_groups.yaml

@ -416,16 +416,11 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
}
void ReturnResult(Resolver::Result result) override
void ReportResult(Resolver::Result result) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
chand_->OnResolverResultChangedLocked(std::move(result));
}
void ReturnError(grpc_error_handle error) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
chand_->OnResolverErrorLocked(error);
}
private:
ClientChannel* chand_;
};
@ -1121,7 +1116,6 @@ ClientChannel::~ClientChannel() {
}
DestroyResolverAndLbPolicyLocked();
grpc_channel_args_destroy(channel_args_);
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
@ -1202,26 +1196,29 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
//
// We track a list of strings to eventually be concatenated and traced.
absl::InlinedVector<const char*, 3> trace_strings;
if (result.addresses.empty() && previous_resolution_contained_addresses_) {
const bool resolution_contains_addresses =
result.addresses.ok() && !result.addresses->empty();
if (!resolution_contains_addresses &&
previous_resolution_contained_addresses_) {
trace_strings.push_back("Address list became empty");
} else if (!result.addresses.empty() &&
} else if (resolution_contains_addresses &&
!previous_resolution_contained_addresses_) {
trace_strings.push_back("Address list became non-empty");
}
previous_resolution_contained_addresses_ = !result.addresses.empty();
previous_resolution_contained_addresses_ = resolution_contains_addresses;
std::string service_config_error_string_storage;
if (result.service_config_error != GRPC_ERROR_NONE) {
if (!result.service_config.ok()) {
service_config_error_string_storage =
grpc_error_std_string(result.service_config_error);
result.service_config.status().ToString();
trace_strings.push_back(service_config_error_string_storage.c_str());
}
// Choose the service config.
RefCountedPtr<ServiceConfig> service_config;
RefCountedPtr<ConfigSelector> config_selector;
if (result.service_config_error != GRPC_ERROR_NONE) {
if (!result.service_config.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
this, grpc_error_std_string(result.service_config_error).c_str());
this, result.service_config.status().ToString().c_str());
}
// If the service config was invalid, then fallback to the
// previously returned service config.
@ -1235,13 +1232,13 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
service_config = saved_service_config_;
config_selector = saved_config_selector_;
} else {
// We received an invalid service config and we don't have a
// We received a service config error and we don't have a
// previous service config to fall back to. Put the channel into
// TRANSIENT_FAILURE.
OnResolverErrorLocked(GRPC_ERROR_REF(result.service_config_error));
OnResolverErrorLocked(result.service_config.status());
trace_strings.push_back("no valid service config");
}
} else if (result.service_config == nullptr) {
} else if (*result.service_config == nullptr) {
// Resolver did not return any service config.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
@ -1252,9 +1249,12 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
service_config = default_service_config_;
} else {
// Use ServiceConfig and ConfigSelector returned by resolver.
service_config = result.service_config;
service_config = std::move(*result.service_config);
config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
}
// Note: The only case in which service_config is null here is if the resolver
// returned a service config error and we don't have a previous service
// config to fall back to.
if (service_config != nullptr) {
// Extract global config for client channel.
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
@ -1306,28 +1306,21 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
}
}
void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
if (resolver_ == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
void ClientChannel::OnResolverErrorLocked(absl::Status status) {
if (resolver_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
grpc_error_std_string(error).c_str());
status.ToString().c_str());
}
// If we already have an LB policy from a previous resolution
// result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE.
if (lb_policy_ == nullptr) {
grpc_error_handle state_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
absl::Status status = grpc_error_to_absl_status(state_error);
grpc_error_handle error = absl_status_to_grpc_error(status);
{
MutexLock lock(&resolution_mu_);
// Update resolver transient failure.
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
resolver_transient_failure_error_ = state_error;
resolver_transient_failure_error_ = status;
// Process calls that were queued waiting for the resolver result.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
@ -1339,12 +1332,12 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
}
}
}
GRPC_ERROR_UNREF(error);
// Update connectivity state.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(status));
}
GRPC_ERROR_UNREF(error);
}
void ClientChannel::CreateOrUpdateLbPolicyLocked(
@ -1355,6 +1348,7 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked(
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
update_args.resolution_note = std::move(result.resolution_note);
// Add health check service name to channel args.
absl::InlinedVector<grpc_arg, 1> args_to_add;
if (health_check_service_name.has_value()) {
@ -1495,8 +1489,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
// after releasing the lock to keep the critical section small.
{
MutexLock lock(&resolution_mu_);
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
resolver_transient_failure_error_ = GRPC_ERROR_NONE;
resolver_transient_failure_error_ = absl::OkStatus();
// Update service config.
received_service_config_data_ = true;
// Old values will be unreffed after lock is released.
@ -2352,12 +2345,11 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
// If the resolver returned transient failure before returning the
// first service config, fail any non-wait_for_ready calls.
grpc_error_handle resolver_error = chand->resolver_transient_failure_error_;
if (resolver_error != GRPC_ERROR_NONE &&
(send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
0) {
absl::Status resolver_error = chand->resolver_transient_failure_error_;
if (!resolver_error.ok() && (send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
*error = GRPC_ERROR_REF(resolver_error);
*error = absl_status_to_grpc_error(resolver_error);
return true;
}
// Either the resolver has not yet returned a result, or it has

@ -216,7 +216,7 @@ class ClientChannel {
void OnResolverResultChangedLocked(Resolver::Result result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void OnResolverErrorLocked(grpc_error_handle error)
void OnResolverErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void CreateOrUpdateLbPolicyLocked(
@ -288,8 +288,8 @@ class ClientChannel {
ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) =
nullptr;
// Data from service config.
grpc_error_handle resolver_transient_failure_error_
ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE;
absl::Status resolver_transient_failure_error_
ABSL_GUARDED_BY(resolution_mu_);
bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false;
RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_);
RefCountedPtr<ConfigSelector> config_selector_

@ -54,27 +54,27 @@ void LoadBalancingPolicy::Orphan() {
// LoadBalancingPolicy::UpdateArgs
//
LoadBalancingPolicy::UpdateArgs::UpdateArgs(const UpdateArgs& other) {
addresses = other.addresses;
config = other.config;
args = grpc_channel_args_copy(other.args);
}
LoadBalancingPolicy::UpdateArgs::UpdateArgs(UpdateArgs&& other) noexcept {
addresses = std::move(other.addresses);
config = std::move(other.config);
LoadBalancingPolicy::UpdateArgs::UpdateArgs(const UpdateArgs& other)
: addresses(other.addresses),
config(other.config),
resolution_note(other.resolution_note),
args(grpc_channel_args_copy(other.args)) {}
LoadBalancingPolicy::UpdateArgs::UpdateArgs(UpdateArgs&& other) noexcept
: addresses(std::move(other.addresses)),
config(std::move(other.config)),
resolution_note(std::move(other.resolution_note)),
// TODO(roth): Use std::move() once channel args is converted to C++.
args = other.args;
args(other.args) {
other.args = nullptr;
}
LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=(
const UpdateArgs& other) {
if (&other == this) {
return *this;
}
if (&other == this) return *this;
addresses = other.addresses;
config = other.config;
resolution_note = other.resolution_note;
grpc_channel_args_destroy(args);
args = grpc_channel_args_copy(other.args);
return *this;
@ -84,6 +84,7 @@ LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=(
UpdateArgs&& other) noexcept {
addresses = std::move(other.addresses);
config = std::move(other.config);
resolution_note = std::move(other.resolution_note);
// TODO(roth): Use std::move() once channel args is converted to C++.
grpc_channel_args_destroy(args);
args = other.args;

@ -23,6 +23,7 @@
#include <iterator>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
@ -325,8 +326,20 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Data passed to the UpdateLocked() method when new addresses and
/// config are available.
struct UpdateArgs {
ServerAddressList addresses;
/// A list of addresses, or an error indicating a failure to obtain the
/// list of addresses.
absl::StatusOr<ServerAddressList> addresses;
/// The LB policy config.
RefCountedPtr<Config> config;
/// A human-readable note providing context about the name resolution that
/// provided this update. LB policies may wish to include this message
/// in RPC failure status messages. For example, if the update has an
/// empty list of addresses, this message might say "no DNS entries
/// found for <name>".
std::string resolution_note;
// TODO(roth): Before making this a public API, find a better
// abstraction for representing channel args.
const grpc_channel_args* args = nullptr;
// TODO(roth): Remove everything below once channel args is
@ -368,6 +381,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Updates the policy with new data from the resolver. Will be invoked
/// immediately after LB policy is constructed, and then again whenever
/// the resolver returns a new result.
// TODO(roth): Change this to return some indication as to whether the
// update has been accepted, so that we can indicate to the resolver
// whether it should go into backoff to retry the resolution.
virtual void UpdateLocked(UpdateArgs) = 0; // NOLINT
/// Tries to enter a READY connectivity state.
@ -438,4 +454,4 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H

@ -69,10 +69,11 @@ MakeHierarchicalPathAttribute(std::vector<std::string> path) {
return absl::make_unique<HierarchicalPathAttribute>(std::move(path));
}
HierarchicalAddressMap MakeHierarchicalAddressMap(
const ServerAddressList& addresses) {
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
const absl::StatusOr<ServerAddressList>& addresses) {
if (!addresses.ok()) return addresses.status();
HierarchicalAddressMap result;
for (const ServerAddress& address : addresses) {
for (const ServerAddress& address : *addresses) {
const HierarchicalPathAttribute* path_attribute =
static_cast<const HierarchicalPathAttribute*>(
address.GetAttribute(kHierarchicalPathAttributeKey));

@ -23,6 +23,8 @@
#include <string>
#include <vector>
#include "absl/status/statusor.h"
#include "src/core/ext/filters/client_channel/server_address.h"
// The resolver returns a flat list of addresses. When a hierarchy of
@ -92,10 +94,9 @@ MakeHierarchicalPathAttribute(std::vector<std::string> path);
using HierarchicalAddressMap = std::map<std::string, ServerAddressList>;
// Splits up the addresses into a separate list for each child.
HierarchicalAddressMap MakeHierarchicalAddressMap(
const ServerAddressList& addresses);
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
const absl::StatusOr<ServerAddressList>& addresses);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H \
*/
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H

@ -401,10 +401,7 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
// Helper functions used in UpdateLocked().
void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
const grpc_channel_args& args);
static ServerAddressList AddNullLbTokenToAddresses(
const ServerAddressList& addresses);
void UpdateBalancerChannelLocked(const grpc_channel_args& args);
void CancelBalancerChannelConnectivityWatchLocked();
@ -473,7 +470,10 @@ class GrpcLb : public LoadBalancingPolicy {
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// The backend addresses from the resolver.
ServerAddressList fallback_backend_addresses_;
absl::StatusOr<ServerAddressList> fallback_backend_addresses_;
// The last resolution note from our parent.
// To be passed to child policy when fallback_backend_addresses_ is empty.
std::string resolution_note_;
// State for fallback-at-startup checks.
// Timeout after startup after which we will go into fallback mode if
// we have not received a serverlist from the balancer.
@ -1441,8 +1441,20 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
// Update the existing child policy.
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
// Add null LB token attributes.
for (ServerAddress& address : *fallback_backend_addresses_) {
address = address.WithAttribute(
kGrpcLbAddressAttributeKey,
absl::make_unique<TokenAndClientStatsAttribute>("", nullptr));
}
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
UpdateBalancerChannelLocked(*args.args);
// Update the existing child policy, if any.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks
// and the balancer call.
@ -1471,21 +1483,7 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// helpers for UpdateLocked()
//
ServerAddressList GrpcLb::AddNullLbTokenToAddresses(
const ServerAddressList& addresses) {
ServerAddressList addresses_out;
for (const ServerAddress& address : addresses) {
addresses_out.emplace_back(address.WithAttribute(
kGrpcLbAddressAttributeKey,
absl::make_unique<TokenAndClientStatsAttribute>("", nullptr)));
}
return addresses_out;
}
void GrpcLb::ProcessAddressesAndChannelArgsLocked(
const ServerAddressList& addresses, const grpc_channel_args& args) {
// Update fallback address list.
fallback_backend_addresses_ = AddNullLbTokenToAddresses(addresses);
void GrpcLb::UpdateBalancerChannelLocked(const grpc_channel_args& args) {
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
@ -1687,9 +1685,14 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
// empty, in which case the new child policy will fail the picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
update_args.resolution_note = absl::StrCat(
"grpclb in fallback mode without any balancer addresses: ",
resolution_note_);
}
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());

@ -1,20 +1,18 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
@ -181,20 +179,25 @@ void PickFirst::ResetBackoffLocked() {
}
void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Create a subchannel list from the latest_update_args_.
// Create a subchannel list from latest_update_args_.
ServerAddressList addresses;
if (latest_update_args_.addresses.ok()) {
addresses = *latest_update_args_.addresses;
}
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, latest_update_args_.addresses,
this, &grpc_lb_pick_first_trace, std::move(addresses),
*latest_update_args_.args);
// Empty update or no valid subchannels.
if (subchannel_list->num_subchannels() == 0) {
// Unsubscribe from all current subchannels.
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
// If not idle, put the channel in TRANSIENT_FAILURE.
// (If we are idle, then this will happen in ExitIdleLocked() if we
// haven't gotten a non-empty update by the time the application tries
// to start a new call.)
absl::Status status = absl::UnavailableError("Empty update");
// Put the channel in TRANSIENT_FAILURE.
absl::Status status =
latest_update_args_.addresses.ok()
? absl::UnavailableError(absl::StrCat(
"empty address list: ", latest_update_args_.resolution_note))
: latest_update_args_.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
@ -258,17 +261,28 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
void PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
args.addresses.size());
args.addresses->size());
} else {
gpr_log(GPR_INFO, "Pick First %p received update with address error: %s",
this, args.addresses.status().ToString().c_str());
}
// Update the latest_update_args_
}
// Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
const grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
std::swap(new_args, args.args);
grpc_channel_args_destroy(new_args);
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.
if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
args.addresses = std::move(latest_update_args_.addresses);
}
// Update latest_update_args_.
latest_update_args_ = std::move(args);
// If we are not in idle, start connection attempt immediately.
// Otherwise, we defer the attempt into ExitIdleLocked().

@ -224,7 +224,7 @@ class PriorityLb : public LoadBalancingPolicy {
// Current channel args and config from the resolver.
const grpc_channel_args* args_ = nullptr;
RefCountedPtr<PriorityLbConfig> config_;
HierarchicalAddressMap addresses_;
absl::StatusOr<HierarchicalAddressMap> addresses_;
// Internal state.
bool shutting_down_ = false;
@ -557,7 +557,11 @@ void PriorityLb::ChildPriority::UpdateLocked(
// Construct update args.
UpdateArgs update_args;
update_args.config = std::move(config);
update_args.addresses = priority_policy_->addresses_[name_];
if (priority_policy_->addresses_.ok()) {
update_args.addresses = (*priority_policy_->addresses_)[name_];
} else {
update_args.addresses = priority_policy_->addresses_.status();
}
update_args.args = grpc_channel_args_copy(priority_policy_->args_);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {

@ -581,7 +581,7 @@ void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(
GPR_INFO,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
"[RH %p] connectivity changed for subchannel %p, subchannel_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
@ -623,7 +623,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"[RH %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
p, subchannel());
}
@ -685,27 +685,40 @@ void RingHash::ShutdownLocked() {
void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
void RingHash::UpdateLocked(UpdateArgs args) {
config_ = std::move(args.config);
ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses.size());
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
config_ = std::move(args.config);
// Filter out any address with weight 0.
ServerAddressList addresses;
addresses.reserve(args.addresses.size());
for (ServerAddress& address : args.addresses) {
addresses.reserve(args.addresses->size());
for (ServerAddress& address : *args.addresses) {
const ServerAddressWeightAttribute* weight_attribute =
static_cast<const ServerAddressWeightAttribute*>(address.GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
if (weight_attribute == nullptr || weight_attribute->weight() > 0) {
addresses.push_back(std::move(address));
addresses.emplace_back(std::move(address));
}
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
this, args.addresses.status().ToString().c_str());
}
// If we already have a subchannel list, then ignore the resolver
// failure and keep using the existing list.
if (subchannel_list_ != nullptr) return;
}
subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args);
if (subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately transition to TRANSIENT_FAILURE.
absl::Status status = absl::UnavailableError("Empty update");
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));

@ -669,7 +669,7 @@ class RlsLb : public LoadBalancingPolicy {
OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
// Accessed only from within WorkSerializer.
ServerAddressList addresses_;
absl::StatusOr<ServerAddressList> addresses_;
const grpc_channel_args* channel_args_ = nullptr;
RefCountedPtr<RlsLbConfig> config_;
RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
@ -1863,19 +1863,28 @@ void RlsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
}
// Swap out config, addresses, and channel args.
// Swap out config.
RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
config_ = std::move(args.config);
ServerAddressList old_addresses = std::move(addresses_);
addresses_ = std::move(args.addresses);
grpc_channel_args_destroy(channel_args_);
channel_args_ = grpc_channel_args_copy(args.args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) &&
(old_config == nullptr ||
old_config->child_policy_config() != config_->child_policy_config())) {
gpr_log(GPR_INFO, "[rlslb %p] updated child policy config: %s", this,
config_->child_policy_config().Dump().c_str());
}
// Swap out addresses.
// If the new address list is an error and we have an existing address list,
// stick with the existing addresses.
absl::StatusOr<ServerAddressList> old_addresses;
if (args.addresses.ok()) {
old_addresses = std::move(addresses_);
addresses_ = std::move(args.addresses);
} else {
old_addresses = addresses_;
}
// Swap out channel args.
grpc_channel_args_destroy(channel_args_);
channel_args_ = grpc_channel_args_copy(args.args);
// Determine whether we need to update all child policies.
bool update_child_policies =
old_config == nullptr ||

@ -1,28 +1,18 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/** Round Robin Policy.
*
* Before every pick, the \a get_next_ready_subchannel_index_locked function
* returns the p->subchannel_list->subchannels index for next subchannel,
* respecting the relative order of the addresses provided upon creation or
* updates. Note however that updates will start picking from the beginning of
* the updated list. */
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
@ -166,15 +156,14 @@ class RoundRobin : public LoadBalancingPolicy {
void ShutdownLocked() override;
/** list of subchannels */
// List of subchannels.
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
// Latest pending subchannel list.
// When we get an updated address list, we create a new subchannel list
// for it here, and we wait to swap it into subchannel_list_ until the new
// list becomes READY.
OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** are we shutting down? */
bool shutdown_ = false;
};
@ -303,31 +292,30 @@ void RoundRobin::RoundRobinSubchannelList::
RoundRobin* p = static_cast<RoundRobin*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
* 1) RULE: ANY subchannel is READY => policy is READY.
* CHECK: subchannel_list->num_ready > 0.
*
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
// In priority order. The first rule to match terminates the search (ie, if we
// are on rule n, all previous rules were unfulfilled).
//
// 1) RULE: ANY subchannel is READY => policy is READY.
// CHECK: subchannel_list->num_ready > 0.
//
// 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
// CHECK: sd->curr_connectivity_state == CONNECTING.
//
// 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
// TRANSIENT_FAILURE.
// CHECK: subchannel_list->num_transient_failures ==
// subchannel_list->num_subchannels.
if (num_ready_ > 0) {
/* 1) READY */
// 1) READY
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(), absl::make_unique<Picker>(p, this));
} else if (num_connecting_ > 0) {
/* 2) CONNECTING */
// 2) CONNECTING
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
} else if (num_transient_failure_ == num_subchannels()) {
/* 3) TRANSIENT_FAILURE */
// 3) TRANSIENT_FAILURE
absl::Status status =
absl::UnavailableError("connections to all backends failing");
p->channel_control_helper()->UpdateState(
@ -432,24 +420,38 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
}
void RoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses.size());
this, args.addresses->size());
}
// Replace latest_pending_subchannel_list_.
if (latest_pending_subchannel_list_ != nullptr) {
addresses = std::move(*args.addresses);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
// If we already have a subchannel list, then ignore the resolver
// failure and keep using the existing list.
if (subchannel_list_ != nullptr) return;
}
// Replace latest_pending_subchannel_list_.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO,
"[RR %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, std::move(args.addresses), *args.args);
this, &grpc_lb_round_robin_trace, std::move(addresses), *args.args);
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.
absl::Status status = absl::UnavailableError("Empty update");
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));

@ -123,7 +123,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void Orphan() override;
void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
ServerAddressList addresses,
absl::StatusOr<ServerAddressList> addresses,
const grpc_channel_args* args);
void ResetBackoffLocked();
void DeactivateLocked();
@ -296,13 +296,18 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
}
}
// Update all children.
HierarchicalAddressMap address_map =
absl::StatusOr<HierarchicalAddressMap> address_map =
MakeHierarchicalAddressMap(args.addresses);
for (const auto& p : config_->target_map()) {
const std::string& name = p.first;
const WeightedTargetLbConfig::ChildConfig& config = p.second;
targets_[name]->UpdateLocked(config, std::move(address_map[name]),
args.args);
absl::StatusOr<ServerAddressList> addresses;
if (address_map.ok()) {
addresses = std::move((*address_map)[name]);
} else {
addresses = address_map.status();
}
targets_[name]->UpdateLocked(config, std::move(addresses), args.args);
}
UpdateStateLocked();
}
@ -473,7 +478,8 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
void WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
ServerAddressList addresses, const grpc_channel_args* args) {
absl::StatusOr<ServerAddressList> addresses,
const grpc_channel_args* args) {
if (weighted_target_policy_->shutting_down_) return;
// Update child weight.
weight_ = config.weight;

@ -238,7 +238,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
void UpdateChildPolicyLocked(ServerAddressList addresses,
void UpdateChildPolicyLocked(absl::StatusOr<ServerAddressList> addresses,
const grpc_channel_args* args);
void MaybeUpdatePickerLocked();
@ -470,8 +470,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
config_->cluster_name(), config_->eds_service_name());
} else {
// Cluster name, EDS service name, and LRS server name should never
// change, because the EDS policy above us should be swapped out if
// that happens.
// change, because the xds_cluster_resolver policy above us should be
// swapped out if that happens.
GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
GPR_ASSERT(config_->lrs_load_reporting_server_name() ==
@ -541,7 +541,8 @@ OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
return lb_policy;
}
void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses,
void XdsClusterImplLb::UpdateChildPolicyLocked(
absl::StatusOr<ServerAddressList> addresses,
const grpc_channel_args* args) {
// Create policy if needed.
if (child_policy_ == nullptr) {

@ -122,7 +122,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
void Orphan() override;
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses,
const absl::StatusOr<ServerAddressList>& addresses,
const grpc_channel_args* args);
void ExitIdleLocked();
void ResetBackoffLocked();
@ -441,7 +441,8 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
void XdsClusterManagerLb::ClusterChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses, const grpc_channel_args* args) {
const absl::StatusOr<ServerAddressList>& addresses,
const grpc_channel_args* args) {
if (xds_cluster_manager_policy_->shutting_down_) return;
// Update child weight.
// Reactivate if needed.

@ -267,9 +267,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
~ResolverResultHandler() override {}
void ReturnResult(Resolver::Result result) override;
void ReturnError(grpc_error_handle error) override;
void ReportResult(Resolver::Result result) override;
private:
RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_;
@ -489,13 +487,21 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() {
//
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
ReturnResult(Resolver::Result result) {
// convert result to eds update
ReportResult(Resolver::Result result) {
if (!result.addresses.ok()) {
discovery_mechanism_->parent()->OnError(
discovery_mechanism_->index(),
absl_status_to_grpc_error(result.addresses.status()));
return;
}
// Convert resolver result to EDS update.
// TODO(roth): Figure out a way to pass resolution_note through to the
// child policy.
XdsEndpointResource update;
XdsEndpointResource::Priority::Locality locality;
locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
locality.lb_weight = 1;
locality.endpoints = std::move(result.addresses);
locality.endpoints = std::move(*result.addresses);
XdsEndpointResource::Priority priority;
priority.localities.emplace(locality.name.get(), std::move(locality));
update.priorities.emplace_back(std::move(priority));
@ -503,11 +509,6 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
discovery_mechanism_->index(), std::move(update));
}
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
ReturnError(grpc_error_handle error) {
discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error);
}
//
// XdsClusterResolverLb public methods
//

@ -38,35 +38,28 @@ Resolver::Resolver()
// Resolver::Result
//
Resolver::Result::~Result() {
GRPC_ERROR_UNREF(service_config_error);
grpc_channel_args_destroy(args);
}
Resolver::Result::~Result() { grpc_channel_args_destroy(args); }
Resolver::Result::Result(const Result& other) {
addresses = other.addresses;
service_config = other.service_config;
service_config_error = GRPC_ERROR_REF(other.service_config_error);
args = grpc_channel_args_copy(other.args);
}
Resolver::Result::Result(const Result& other)
: addresses(other.addresses),
service_config(other.service_config),
resolution_note(other.resolution_note),
args(grpc_channel_args_copy(other.args)) {}
Resolver::Result::Result(Result&& other) noexcept {
addresses = std::move(other.addresses);
service_config = std::move(other.service_config);
service_config_error = other.service_config_error;
other.service_config_error = GRPC_ERROR_NONE;
args = other.args;
Resolver::Result::Result(Result&& other) noexcept
: addresses(std::move(other.addresses)),
service_config(std::move(other.service_config)),
resolution_note(std::move(other.resolution_note)),
// TODO(roth): Use std::move() once channel args is converted to C++.
args(other.args) {
other.args = nullptr;
}
Resolver::Result& Resolver::Result::operator=(const Result& other) {
if (&other == this) {
return *this;
}
if (&other == this) return *this;
addresses = other.addresses;
service_config = other.service_config;
GRPC_ERROR_UNREF(service_config_error);
service_config_error = GRPC_ERROR_REF(other.service_config_error);
resolution_note = other.resolution_note;
grpc_channel_args_destroy(args);
args = grpc_channel_args_copy(other.args);
return *this;
@ -75,9 +68,8 @@ Resolver::Result& Resolver::Result::operator=(const Result& other) {
Resolver::Result& Resolver::Result::operator=(Result&& other) noexcept {
addresses = std::move(other.addresses);
service_config = std::move(other.service_config);
GRPC_ERROR_UNREF(service_config_error);
service_config_error = other.service_config_error;
other.service_config_error = GRPC_ERROR_NONE;
resolution_note = std::move(other.resolution_note);
// TODO(roth): Use std::move() once channel args is converted to C++.
grpc_channel_args_destroy(args);
args = other.args;
other.args = nullptr;

@ -1,26 +1,26 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H
#include <grpc/support/port_platform.h>
#include "absl/status/statusor.h"
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/ext/filters/client_channel/server_address.h"
@ -53,13 +53,23 @@ class Resolver : public InternallyRefCounted<Resolver> {
public:
/// Results returned by the resolver.
struct Result {
ServerAddressList addresses;
RefCountedPtr<ServiceConfig> service_config;
grpc_error_handle service_config_error = GRPC_ERROR_NONE;
/// A list of addresses, or an error.
absl::StatusOr<ServerAddressList> addresses;
/// A service config, or an error.
absl::StatusOr<RefCountedPtr<ServiceConfig>> service_config = nullptr;
/// An optional human-readable note describing context about the resolution,
/// to be passed along to the LB policy for inclusion in RPC failure status
/// messages in cases where neither \a addresses nor \a service_config
/// has a non-OK status. For example, a resolver that returns an empty
/// address list but a valid service config may set to this to something
/// like "no DNS entries found for <name>".
std::string resolution_note;
// TODO(roth): Before making this a public API, figure out a way to
// avoid exposing channel args this way.
const grpc_channel_args* args = nullptr;
// TODO(roth): Remove everything below once grpc_error and
// grpc_channel_args are convert to copyable and movable C++ objects.
// TODO(roth): Remove everything below once grpc_channel_args is
// converted to a copyable and movable C++ object.
Result() = default;
~Result();
Result(const Result& other);
@ -74,17 +84,11 @@ class Resolver : public InternallyRefCounted<Resolver> {
public:
virtual ~ResultHandler() {}
/// Returns a result to the channel.
/// Takes ownership of \a result.args.
virtual void ReturnResult(Result result) = 0; // NOLINT
/// Returns a transient error to the channel.
/// If the resolver does not set the GRPC_ERROR_INT_GRPC_STATUS
/// attribute on the error, calls will be failed with status UNKNOWN.
virtual void ReturnError(grpc_error_handle error) = 0;
// TODO(yashkt): As part of the service config error handling
// changes, add a method to parse the service config JSON string.
/// Reports a result to the channel.
// TODO(roth): Add a mechanism for the resolver to get back a signal
// indicating if the result was accepted by the LB policy, so that it
// knows whether to go into backoff to retry to resolution.
virtual void ReportResult(Result result) = 0; // NOLINT
};
// Not copyable nor movable.
@ -113,9 +117,6 @@ class Resolver : public InternallyRefCounted<Resolver> {
/// Resets the re-resolution backoff, if any.
/// This needs to be implemented only by pull-based implementations;
/// for push-based implementations, it will be a no-op.
/// TODO(roth): Pull the backoff code out of resolver and into
/// client_channel, so that it can be shared across resolver
/// implementations. At that point, this method can go away.
virtual void ResetBackoffLocked() {}
// Note: This must be invoked while holding the work_serializer.
@ -133,4 +134,4 @@ class Resolver : public InternallyRefCounted<Resolver> {
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H */
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_H

@ -46,7 +46,7 @@ class BinderResolver : public Resolver {
result.addresses = std::move(addresses_);
result.args = channel_args_;
channel_args_ = nullptr;
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
}
void ShutdownLocked() override {}

@ -1,20 +1,18 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
@ -50,6 +48,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/transport/error_utils.h"
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
@ -320,21 +319,35 @@ void AresDnsResolver::OnResolvedLocked(grpc_error_handle error) {
GRPC_ERROR_UNREF(error);
return;
}
// TODO(roth): Change logic to be able to report failures for addresses
// and service config independently of each other.
if (addresses_ != nullptr || balancer_addresses_ != nullptr) {
Result result;
if (addresses_ != nullptr) {
result.addresses = std::move(*addresses_);
} else {
result.addresses = ServerAddressList();
}
if (service_config_json_ != nullptr) {
std::string service_config_string = ChooseServiceConfig(
service_config_json_, &result.service_config_error);
grpc_error_handle service_config_error = GRPC_ERROR_NONE;
std::string service_config_string =
ChooseServiceConfig(service_config_json_, &service_config_error);
gpr_free(service_config_json_);
if (result.service_config_error == GRPC_ERROR_NONE &&
RefCountedPtr<ServiceConfig> service_config;
if (service_config_error == GRPC_ERROR_NONE &&
!service_config_string.empty()) {
GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s",
this, service_config_string.c_str());
result.service_config = ServiceConfig::Create(
channel_args_, service_config_string, &result.service_config_error);
service_config = ServiceConfig::Create(
channel_args_, service_config_string, &service_config_error);
}
if (service_config_error != GRPC_ERROR_NONE) {
result.service_config = absl::UnavailableError(
absl::StrCat("failed to parse service config: ",
grpc_error_std_string(service_config_error)));
GRPC_ERROR_UNREF(service_config_error);
} else {
result.service_config = std::move(service_config);
}
}
absl::InlinedVector<grpc_arg, 1> new_args;
@ -344,7 +357,7 @@ void AresDnsResolver::OnResolvedLocked(grpc_error_handle error) {
}
result.args = grpc_channel_args_copy_and_add(channel_args_, new_args.data(),
new_args.size());
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
addresses_.reset();
balancer_addresses_.reset();
// Reset backoff state so that we start from the beginning when the
@ -353,12 +366,14 @@ void AresDnsResolver::OnResolvedLocked(grpc_error_handle error) {
} else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this,
grpc_error_std_string(error).c_str());
std::string error_message =
absl::StrCat("DNS resolution failed for ", name_to_resolve_);
result_handler_->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(error_message.c_str(),
&error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
std::string error_message;
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message);
absl::Status status = absl::UnavailableError(absl::StrCat(
"DNS resolution failed for ", name_to_resolve_, ": ", error_message));
Result result;
result.addresses = status;
result.service_config = status;
result_handler_->ReportResult(std::move(result));
// Set retry timer.
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.

@ -1,20 +1,18 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
@ -179,15 +177,16 @@ void NativeDnsResolver::OnResolvedLocked(grpc_error_handle error) {
return;
}
if (addresses_ != nullptr) {
Result result;
ServerAddressList addresses;
for (size_t i = 0; i < addresses_->naddrs; ++i) {
result.addresses.emplace_back(&addresses_->addrs[i].addr,
addresses_->addrs[i].len,
nullptr /* args */);
addresses.emplace_back(&addresses_->addrs[i].addr,
addresses_->addrs[i].len, nullptr /* args */);
}
grpc_resolved_addresses_destroy(addresses_);
Result result;
result.addresses = std::move(addresses);
result.args = grpc_channel_args_copy(channel_args_);
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
backoff_.Reset();
@ -195,12 +194,12 @@ void NativeDnsResolver::OnResolvedLocked(grpc_error_handle error) {
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_std_string(error).c_str());
// Return transient error.
std::string error_message =
absl::StrCat("DNS resolution failed for service: ", name_to_resolve_);
result_handler_->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(error_message.c_str(),
&error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
std::string error_message;
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message);
Result result;
result.addresses = absl::UnavailableError(absl::StrCat(
"DNS resolution failed for ", name_to_resolve_, ": ", error_message));
result_handler_->ReportResult(std::move(result));
// Set up for retry.
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.

@ -142,23 +142,22 @@ void FakeResolver::MaybeSendResultLocked() {
if (!started_ || shutdown_) return;
if (return_failure_) {
// TODO(roth): Change resolver result generator to be able to inject
// the error to be returned.
result_handler_->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver transient failure"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
// the error to be returned and to be able to independently set errors
// for addresses and service config.
Result result;
result.addresses = absl::UnavailableError("Resolver transient failure");
result.service_config = result.addresses.status();
result_handler_->ReportResult(std::move(result));
return_failure_ = false;
} else if (has_next_result_) {
Result result;
result.addresses = std::move(next_result_.addresses);
result.service_config = std::move(next_result_.service_config);
// TODO(roth): Use std::move() once grpc_error is converted to C++.
result.service_config_error = next_result_.service_config_error;
next_result_.service_config_error = GRPC_ERROR_NONE;
// When both next_results_ and channel_args_ contain an arg with the same
// name, only the one in next_results_ will be kept since next_results_ is
// before channel_args_.
result.args = grpc_channel_args_union(next_result_.args, channel_args_);
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
has_next_result_ = false;
}
}

@ -73,7 +73,7 @@ void SockaddrResolver::StartLocked() {
// TODO(roth): Use std::move() once channel args is converted to C++.
result.args = channel_args_;
channel_args_ = nullptr;
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
}
//

@ -35,6 +35,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/timeout_encoding.h"
namespace grpc_core {
@ -278,8 +279,7 @@ class XdsResolver : public Resolver {
void OnError(grpc_error_handle error);
void OnResourceDoesNotExist();
grpc_error_handle CreateServiceConfig(
RefCountedPtr<ServiceConfig>* service_config);
absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateServiceConfig();
void GenerateResult();
void MaybeRemoveUnusedClusters();
@ -682,7 +682,12 @@ void XdsResolver::StartLocked() {
"Failed to create xds client -- channel will remain in "
"TRANSIENT_FAILURE: %s",
grpc_error_std_string(error).c_str());
result_handler_->ReturnError(error);
std::string error_message;
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message);
Result result;
result.service_config = absl::UnavailableError(
absl::StrCat("Failed to create XdsClient: ", error_message));
result_handler_->ReportResult(std::move(result));
return;
}
grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
@ -800,8 +805,9 @@ void XdsResolver::OnError(grpc_error_handle error) {
Result result;
grpc_arg new_arg = xds_client_->MakeChannelArg();
result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
result.service_config_error = error;
result_handler_->ReturnResult(std::move(result));
result.service_config = grpc_error_to_absl_status(error);
result_handler_->ReportResult(std::move(result));
GRPC_ERROR_UNREF(error);
}
void XdsResolver::OnResourceDoesNotExist() {
@ -814,15 +820,15 @@ void XdsResolver::OnResourceDoesNotExist() {
}
current_virtual_host_.routes.clear();
Result result;
result.service_config =
ServiceConfig::Create(args_, "{}", &result.service_config_error);
GPR_ASSERT(result.service_config != nullptr);
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config = ServiceConfig::Create(args_, "{}", &error);
GPR_ASSERT(*result.service_config != nullptr);
result.args = grpc_channel_args_copy(args_);
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
}
grpc_error_handle XdsResolver::CreateServiceConfig(
RefCountedPtr<ServiceConfig>* service_config) {
absl::StatusOr<RefCountedPtr<ServiceConfig>>
XdsResolver::CreateServiceConfig() {
std::vector<std::string> clusters;
for (const auto& cluster : cluster_state_map_) {
clusters.push_back(
@ -849,8 +855,13 @@ grpc_error_handle XdsResolver::CreateServiceConfig(
"}");
std::string json = absl::StrJoin(config_parts, "");
grpc_error_handle error = GRPC_ERROR_NONE;
*service_config = ServiceConfig::Create(args_, json.c_str(), &error);
return error;
absl::StatusOr<RefCountedPtr<ServiceConfig>> result =
ServiceConfig::Create(args_, json.c_str(), &error);
if (error != GRPC_ERROR_NONE) {
result = grpc_error_to_absl_status(error);
GRPC_ERROR_UNREF(error);
}
return result;
}
void XdsResolver::GenerateResult() {
@ -865,15 +876,12 @@ void XdsResolver::GenerateResult() {
return;
}
Result result;
error = CreateServiceConfig(&result.service_config);
if (error != GRPC_ERROR_NONE) {
OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE));
return;
}
result.service_config = CreateServiceConfig();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
result.service_config->json_string().c_str());
result.service_config.ok()
? (*result.service_config)->json_string().c_str()
: result.service_config.status().ToString().c_str());
}
grpc_arg new_args[] = {
xds_client_->MakeChannelArg(),
@ -881,7 +889,7 @@ void XdsResolver::GenerateResult() {
};
result.args =
grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
result_handler_->ReturnResult(std::move(result));
result_handler_->ReportResult(std::move(result));
}
void XdsResolver::MaybeRemoveUnusedClusters() {

@ -65,10 +65,11 @@ class BinderResolverTest : public ::testing::Test {
explicit ResultHandler(const std::string& expected_binder_id)
: expect_result_(true), expected_binder_id_(expected_binder_id) {}
void ReturnResult(grpc_core::Resolver::Result result) override {
void ReportResult(grpc_core::Resolver::Result result) override {
EXPECT_TRUE(expect_result_);
ASSERT_TRUE(result.addresses.size() == 1);
grpc_core::ServerAddress addr = result.addresses[0];
ASSERT_TRUE(result.addresses.ok());
ASSERT_EQ(result.addresses->size(), 1);
grpc_core::ServerAddress addr = (*result.addresses)[0];
const struct sockaddr_un* un =
reinterpret_cast<const struct sockaddr_un*>(addr.address().addr);
EXPECT_EQ(addr.address().len,
@ -77,12 +78,8 @@ class BinderResolverTest : public ::testing::Test {
EXPECT_EQ(un->sun_path, expected_binder_id_);
}
void ReturnError(grpc_error_handle error) override {
GRPC_ERROR_UNREF(error);
}
private:
// Whether we expect ReturnResult function to be invoked
// Whether we expect ReportResult function to be invoked
bool expect_result_ = false;
std::string expected_binder_id_;

@ -113,31 +113,20 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
struct ResolverOutput {
grpc_core::Resolver::Result result;
grpc_error_handle error = GRPC_ERROR_NONE;
gpr_event ev;
ResolverOutput() { gpr_event_init(&ev); }
~ResolverOutput() { GRPC_ERROR_UNREF(error); }
};
void SetOutput(ResolverOutput* output) {
gpr_atm_rel_store(&output_, reinterpret_cast<gpr_atm>(output));
}
void ReturnResult(grpc_core::Resolver::Result result) override {
void ReportResult(grpc_core::Resolver::Result result) override {
ResolverOutput* output =
reinterpret_cast<ResolverOutput*>(gpr_atm_acq_load(&output_));
GPR_ASSERT(output != nullptr);
output->result = std::move(result);
output->error = GRPC_ERROR_NONE;
gpr_event_set(&output->ev, reinterpret_cast<void*>(1));
}
void ReturnError(grpc_error_handle error) override {
ResolverOutput* output =
reinterpret_cast<ResolverOutput*>(gpr_atm_acq_load(&output_));
GPR_ASSERT(output != nullptr);
output->error = error;
gpr_event_set(&output->ev, reinterpret_cast<void*>(1));
}
@ -180,15 +169,14 @@ int main(int argc, char** argv) {
resolver->StartLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(wait_loop(5, &output1.ev));
GPR_ASSERT(output1.result.addresses.empty());
GPR_ASSERT(output1.error != GRPC_ERROR_NONE);
GPR_ASSERT(!output1.result.addresses.ok());
ResultHandler::ResolverOutput output2;
result_handler->SetOutput(&output2);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(wait_loop(30, &output2.ev));
GPR_ASSERT(!output2.result.addresses.empty());
GPR_ASSERT(output2.error == GRPC_ERROR_NONE);
GPR_ASSERT(output2.result.addresses.ok());
GPR_ASSERT(!output2.result.addresses->empty());
}
grpc_shutdown();

@ -202,7 +202,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
state_ = state;
}
void ReturnResult(grpc_core::Resolver::Result /*result*/) override {
void ReportResult(grpc_core::Resolver::Result /*result*/) override {
GPR_ASSERT(result_cb_ != nullptr);
GPR_ASSERT(state_ != nullptr);
ResultCallback cb = result_cb_;
@ -212,12 +212,6 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
cb(state);
}
void ReturnError(grpc_error_handle error) override {
gpr_log(GPR_ERROR, "resolver returned error: %s",
grpc_error_std_string(error).c_str());
GPR_ASSERT(false);
}
private:
ResultCallback result_cb_ = nullptr;
OnResolutionCallbackArg* state_ = nullptr;

@ -32,8 +32,7 @@
static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
class TestResultHandler : public grpc_core::Resolver::ResultHandler {
void ReturnResult(grpc_core::Resolver::Result /*result*/) override {}
void ReturnError(grpc_error_handle /*error*/) override {}
void ReportResult(grpc_core::Resolver::Result /*result*/) override {}
};
static void test_succeeds(grpc_core::ResolverFactory* factory,

@ -45,21 +45,20 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
ev_ = ev;
}
void ReturnResult(grpc_core::Resolver::Result actual) override {
void ReportResult(grpc_core::Resolver::Result actual) override {
GPR_ASSERT(ev_ != nullptr);
// We only check the addresses, because that's the only thing
// explicitly set by the test via
// FakeResolverResponseGenerator::SetResponse().
GPR_ASSERT(actual.addresses.size() == expected_.addresses.size());
for (size_t i = 0; i < expected_.addresses.size(); ++i) {
GPR_ASSERT(actual.addresses[i] == expected_.addresses[i]);
GPR_ASSERT(actual.addresses.ok());
GPR_ASSERT(actual.addresses->size() == expected_.addresses->size());
for (size_t i = 0; i < expected_.addresses->size(); ++i) {
GPR_ASSERT((*actual.addresses)[i] == (*expected_.addresses)[i]);
}
gpr_event_set(ev_, reinterpret_cast<void*>(1));
ev_ = nullptr;
}
void ReturnError(grpc_error_handle /*error*/) override {}
private:
grpc_core::Resolver::Result expected_;
gpr_event* ev_ = nullptr;
@ -89,7 +88,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() {
static size_t test_counter = 0;
const size_t num_addresses = 2;
// Create address list.
grpc_core::Resolver::Result result;
grpc_core::ServerAddressList addresses;
for (size_t i = 0; i < num_addresses; ++i) {
std::string uri_string = absl::StrFormat("ipv4:127.0.0.1:100%" PRIuPTR,
test_counter * num_addresses + i);
@ -98,11 +97,12 @@ static grpc_core::Resolver::Result create_new_resolver_result() {
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
absl::InlinedVector<grpc_arg, 2> args_to_add;
result.addresses.emplace_back(
address.addr, address.len,
addresses.emplace_back(address.addr, address.len,
grpc_channel_args_copy_and_add(nullptr, nullptr, 0));
}
++test_counter;
grpc_core::Resolver::Result result;
result.addresses = std::move(addresses);
return result;
}

@ -32,11 +32,7 @@ static std::shared_ptr<grpc_core::WorkSerializer>* g_work_serializer;
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
void ReturnResult(grpc_core::Resolver::Result /*result*/) override {}
void ReturnError(grpc_error_handle error) override {
GRPC_ERROR_UNREF(error);
}
void ReportResult(grpc_core::Resolver::Result /*result*/) override {}
};
static void test_succeeds(grpc_core::ResolverFactory* factory,

@ -293,6 +293,7 @@ class TestServer {
grpc_core::Resolver::Result BuildResolverResponse(
const std::vector<std::string>& addresses) {
grpc_core::Resolver::Result result;
result.addresses = grpc_core::ServerAddressList();
for (const auto& address_str : addresses) {
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
if (!uri.ok()) {
@ -302,7 +303,7 @@ grpc_core::Resolver::Result BuildResolverResponse(
}
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
result.addresses.emplace_back(address.addr, address.len, nullptr);
result.addresses->emplace_back(address.addr, address.len, nullptr);
}
return result;
}

@ -403,6 +403,7 @@ TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) {
grpc_core::Resolver::Result BuildResolverResult(
const std::vector<std::string>& addresses) {
grpc_core::Resolver::Result result;
result.addresses = grpc_core::ServerAddressList();
for (const auto& address_str : addresses) {
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
if (!uri.ok()) {
@ -412,7 +413,7 @@ grpc_core::Resolver::Result BuildResolverResult(
}
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
result.addresses.emplace_back(address.addr, address.len, nullptr);
result.addresses->emplace_back(address.addr, address.len, nullptr);
}
return result;
}

@ -447,11 +447,11 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
config->address().c_str());
auto uri = URI::Parse(config->address());
args.config.reset();
args.addresses.clear();
args.addresses = ServerAddressList();
if (uri.ok()) {
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
args.addresses.emplace_back(address, /*args=*/nullptr);
args.addresses->emplace_back(address, /*args=*/nullptr);
} else {
gpr_log(GPR_ERROR,
"%s: could not parse URI (%s), using empty address list",

@ -199,6 +199,7 @@ class FakeResolverResponseGeneratorWrapper {
std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
nullptr) {
grpc_core::Resolver::Result result;
result.addresses = grpc_core::ServerAddressList();
for (const int& port : ports) {
absl::StatusOr<grpc_core::URI> lb_uri = grpc_core::URI::Parse(
absl::StrCat(ipv6_only ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port));
@ -211,13 +212,14 @@ class FakeResolverResponseGeneratorWrapper {
if (attribute != nullptr) {
attributes[attribute_key] = attribute->Copy();
}
result.addresses.emplace_back(address.addr, address.len,
result.addresses->emplace_back(address.addr, address.len,
nullptr /* args */, std::move(attributes));
}
if (service_config_json != nullptr) {
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config = grpc_core::ServiceConfig::Create(
nullptr, service_config_json, &result.service_config_error);
GPR_ASSERT(result.service_config != nullptr);
nullptr, service_config_json, &error);
GPR_ASSERT(*result.service_config != nullptr);
}
return result;
}

@ -228,13 +228,13 @@ class FakeResolverResponseGeneratorWrapper {
static grpc_core::Resolver::Result BuildFakeResults(
absl::string_view service_config_json) {
grpc_core::Resolver::Result result;
result.service_config_error = GRPC_ERROR_NONE;
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config = grpc_core::ServiceConfig::Create(
result.args, service_config_json, &result.service_config_error);
EXPECT_EQ(result.service_config_error, GRPC_ERROR_NONE)
result.args, service_config_json, &error);
EXPECT_EQ(error, GRPC_ERROR_NONE)
<< "JSON: " << service_config_json
<< "Error: " << grpc_error_std_string(result.service_config_error);
EXPECT_NE(result.service_config, nullptr);
<< "Error: " << grpc_error_std_string(error);
EXPECT_NE(*result.service_config, nullptr);
return result;
}

@ -55,6 +55,7 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/cpp/server/secure_server_credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -173,6 +174,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
grpc_core::Resolver::Result result;
result.addresses = grpc_core::ServerAddressList();
for (const int& port : ports) {
std::string lb_uri_str =
absl::StrCat(ipv6_only_ ? "ipv6:[::1]:" : "ipv4:127.0.0.1:", port);
@ -180,7 +182,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
GPR_ASSERT(lb_uri.ok());
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*lb_uri, &address));
result.addresses.emplace_back(address.addr, address.len,
result.addresses->emplace_back(address.addr, address.len,
nullptr /* args */);
}
return result;
@ -195,16 +197,18 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config = grpc_core::ServiceConfig::Create(
nullptr, "{}", &result.service_config_error);
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config =
grpc_core::ServiceConfig::Create(nullptr, "{}", &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
response_generator_->SetResponse(result);
}
void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config = grpc_core::ServiceConfig::Create(
nullptr, "{", &result.service_config_error);
result.service_config =
absl::InvalidArgumentError("error parsing service config");
response_generator_->SetResponse(result);
}
@ -212,8 +216,13 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
const char* svc_cfg) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config = grpc_core::ServiceConfig::Create(
nullptr, svc_cfg, &result.service_config_error);
grpc_error_handle error = GRPC_ERROR_NONE;
result.service_config =
grpc_core::ServiceConfig::Create(nullptr, svc_cfg, &error);
if (error != GRPC_ERROR_NONE) {
result.service_config = grpc_error_to_absl_status(error);
GRPC_ERROR_UNREF(error);
}
response_generator_->SetResponse(result);
}

@ -145,12 +145,10 @@ class AssertFailureResultHandler : public grpc_core::Resolver::ResultHandler {
gpr_mu_unlock(args_->mu);
}
void ReturnResult(grpc_core::Resolver::Result /*result*/) override {
void ReportResult(grpc_core::Resolver::Result /*result*/) override {
GPR_ASSERT(false);
}
void ReturnError(grpc_error_handle /*error*/) override { GPR_ASSERT(false); }
private:
ArgsStruct* args_;
};

@ -264,19 +264,18 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) {
}
void CheckServiceConfigResultLocked(const char* service_config_json,
grpc_error_handle service_config_error,
absl::Status service_config_error,
ArgsStruct* args) {
if (!args->expected_service_config_string.empty()) {
GPR_ASSERT(service_config_json != nullptr);
ASSERT_NE(service_config_json, nullptr);
EXPECT_EQ(service_config_json, args->expected_service_config_string);
}
if (args->expected_service_config_error.empty()) {
EXPECT_EQ(service_config_error, GRPC_ERROR_NONE);
EXPECT_TRUE(service_config_error.ok());
} else {
EXPECT_THAT(grpc_error_std_string(service_config_error),
EXPECT_THAT(service_config_error.ToString(),
testing::HasSubstr(args->expected_service_config_error));
}
GRPC_ERROR_UNREF(service_config_error);
}
void CheckLBPolicyResultLocked(const grpc_channel_args* channel_args,
@ -430,7 +429,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
explicit ResultHandler(ArgsStruct* args) : args_(args) {}
void ReturnResult(grpc_core::Resolver::Result result) override {
void ReportResult(grpc_core::Resolver::Result result) override {
CheckResult(result);
gpr_atm_rel_store(&args_->done_atm, 1);
gpr_mu_lock(args_->mu);
@ -439,12 +438,6 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
gpr_mu_unlock(args_->mu);
}
void ReturnError(grpc_error_handle error) override {
gpr_log(GPR_ERROR, "resolver returned error: %s",
grpc_error_std_string(error).c_str());
GPR_ASSERT(false);
}
virtual void CheckResult(const grpc_core::Resolver::Result& /*result*/) {}
protected:
@ -465,9 +458,10 @@ class CheckingResultHandler : public ResultHandler {
explicit CheckingResultHandler(ArgsStruct* args) : ResultHandler(args) {}
void CheckResult(const grpc_core::Resolver::Result& result) override {
ASSERT_TRUE(result.addresses.ok()) << result.addresses.status().ToString();
ArgsStruct* args = args_struct();
std::vector<GrpcLBAddress> found_lb_addrs;
AddActualAddresses(result.addresses, /*is_balancer=*/false,
AddActualAddresses(*result.addresses, /*is_balancer=*/false,
&found_lb_addrs);
const grpc_core::ServerAddressList* balancer_addresses =
grpc_core::FindGrpclbBalancerAddressesInChannelArgs(*result.args);
@ -478,7 +472,7 @@ class CheckingResultHandler : public ResultHandler {
gpr_log(GPR_INFO,
"found %" PRIdPTR " backend addresses and %" PRIdPTR
" balancer addresses",
result.addresses.size(),
result.addresses->size(),
balancer_addresses == nullptr ? 0L : balancer_addresses->size());
if (args->expected_addrs.size() != found_lb_addrs.size()) {
gpr_log(GPR_DEBUG,
@ -499,12 +493,17 @@ class CheckingResultHandler : public ResultHandler {
absl::GetFlag(FLAGS_do_ordered_address_comparison).c_str());
GPR_ASSERT(0);
}
if (!result.service_config.ok()) {
CheckServiceConfigResultLocked(nullptr, result.service_config.status(),
args);
} else {
const char* service_config_json =
result.service_config == nullptr
*result.service_config == nullptr
? nullptr
: result.service_config->json_string().c_str();
CheckServiceConfigResultLocked(
service_config_json, GRPC_ERROR_REF(result.service_config_error), args);
: (*result.service_config)->json_string().c_str();
CheckServiceConfigResultLocked(service_config_json, absl::OkStatus(),
args);
}
if (args->expected_service_config_string.empty()) {
CheckLBPolicyResultLocked(result.args, args);
}

@ -612,7 +612,7 @@ current_test_subprocess = subprocess.Popen([
'--target_name', 'ipv4-config-causing-fallback-to-tcp-inject-broken-nameservers.resolver-tests-version-4.grpctestingexp.',
'--do_ordered_address_comparison', 'False',
'--expected_addrs', '1.2.3.4:443,False',
'--expected_chosen_service_config', '{"loadBalancingPolicy":["round_robin"]}',
'--expected_chosen_service_config', '',
'--expected_service_config_error', 'field:loadBalancingPolicy error:type should be string',
'--expected_lb_policy', '',
'--enable_srv_queries', 'True',

@ -464,7 +464,7 @@ resolver_component_tests:
- expected_addrs:
- {address: '1.2.3.4:443', is_balancer: false}
do_ordered_address_comparison: false
expected_chosen_service_config: '{"loadBalancingPolicy":["round_robin"]}'
expected_chosen_service_config: null
expected_service_config_error: 'field:loadBalancingPolicy error:type should be string'
expected_lb_policy: null
enable_srv_queries: true

Loading…
Cancel
Save