Eliminate ResolvingLoadBalancingPolicy.

pull/24806/head
Mark D. Roth 4 years ago
parent 660a860cff
commit 0e13934764
  1. 2
      BUILD
  2. 2
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 4
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 556
      src/core/ext/filters/client_channel/client_channel.cc
  14. 2
      src/core/ext/filters/client_channel/lb_policy.cc
  15. 355
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  16. 138
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  17. 1
      src/python/grpcio/grpc_core_dependencies.py
  18. 2
      tools/doxygen/Doxyfile.c++.internal
  19. 2
      tools/doxygen/Doxyfile.core.internal

@ -1082,7 +1082,6 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/resolver.cc",
"src/core/ext/filters/client_channel/resolver_registry.cc",
"src/core/ext/filters/client_channel/resolver_result_parsing.cc",
"src/core/ext/filters/client_channel/resolving_lb_policy.cc",
"src/core/ext/filters/client_channel/retry_throttle.cc",
"src/core/ext/filters/client_channel/server_address.cc",
"src/core/ext/filters/client_channel/service_config.cc",
@ -1114,7 +1113,6 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/resolver_factory.h",
"src/core/ext/filters/client_channel/resolver_registry.h",
"src/core/ext/filters/client_channel/resolver_result_parsing.h",
"src/core/ext/filters/client_channel/resolving_lb_policy.h",
"src/core/ext/filters/client_channel/retry_throttle.h",
"src/core/ext/filters/client_channel/server_address.h",
"src/core/ext/filters/client_channel/service_config.h",

@ -295,8 +295,6 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/resolver_registry.h",
"src/core/ext/filters/client_channel/resolver_result_parsing.cc",
"src/core/ext/filters/client_channel/resolver_result_parsing.h",
"src/core/ext/filters/client_channel/resolving_lb_policy.cc",
"src/core/ext/filters/client_channel/resolving_lb_policy.h",
"src/core/ext/filters/client_channel/retry_throttle.cc",
"src/core/ext/filters/client_channel/retry_throttle.h",
"src/core/ext/filters/client_channel/server_address.cc",

@ -1484,7 +1484,6 @@ add_library(grpc
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
src/core/ext/filters/client_channel/resolver_registry.cc
src/core/ext/filters/client_channel/resolver_result_parsing.cc
src/core/ext/filters/client_channel/resolving_lb_policy.cc
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/server_address.cc
src/core/ext/filters/client_channel/service_config.cc
@ -2279,7 +2278,6 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
src/core/ext/filters/client_channel/resolver_registry.cc
src/core/ext/filters/client_channel/resolver_result_parsing.cc
src/core/ext/filters/client_channel/resolving_lb_policy.cc
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/server_address.cc
src/core/ext/filters/client_channel/service_config.cc

@ -1888,7 +1888,6 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_result_parsing.cc \
src/core/ext/filters/client_channel/resolving_lb_policy.cc \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
@ -2539,7 +2538,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_result_parsing.cc \
src/core/ext/filters/client_channel/resolving_lb_policy.cc \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \

@ -420,7 +420,6 @@ libs:
- src/core/ext/filters/client_channel/resolver_factory.h
- src/core/ext/filters/client_channel/resolver_registry.h
- src/core/ext/filters/client_channel/resolver_result_parsing.h
- src/core/ext/filters/client_channel/resolving_lb_policy.h
- src/core/ext/filters/client_channel/retry_throttle.h
- src/core/ext/filters/client_channel/server_address.h
- src/core/ext/filters/client_channel/service_config.h
@ -915,7 +914,6 @@ libs:
- src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
- src/core/ext/filters/client_channel/resolver_registry.cc
- src/core/ext/filters/client_channel/resolver_result_parsing.cc
- src/core/ext/filters/client_channel/resolving_lb_policy.cc
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/server_address.cc
- src/core/ext/filters/client_channel/service_config.cc
@ -1587,7 +1585,6 @@ libs:
- src/core/ext/filters/client_channel/resolver_factory.h
- src/core/ext/filters/client_channel/resolver_registry.h
- src/core/ext/filters/client_channel/resolver_result_parsing.h
- src/core/ext/filters/client_channel/resolving_lb_policy.h
- src/core/ext/filters/client_channel/retry_throttle.h
- src/core/ext/filters/client_channel/server_address.h
- src/core/ext/filters/client_channel/service_config.h
@ -1841,7 +1838,6 @@ libs:
- src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
- src/core/ext/filters/client_channel/resolver_registry.cc
- src/core/ext/filters/client_channel/resolver_result_parsing.cc
- src/core/ext/filters/client_channel/resolving_lb_policy.cc
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/server_address.cc
- src/core/ext/filters/client_channel/service_config.cc

@ -90,7 +90,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_result_parsing.cc \
src/core/ext/filters/client_channel/resolving_lb_policy.cc \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \

@ -57,7 +57,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\resolver\\xds\\xds_resolver.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver_registry.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver_result_parsing.cc " +
"src\\core\\ext\\filters\\client_channel\\resolving_lb_policy.cc " +
"src\\core\\ext\\filters\\client_channel\\retry_throttle.cc " +
"src\\core\\ext\\filters\\client_channel\\server_address.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config.cc " +

@ -233,7 +233,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
'src/core/ext/filters/client_channel/resolving_lb_policy.h',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h',
@ -843,7 +842,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
'src/core/ext/filters/client_channel/resolving_lb_policy.h',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h',

@ -274,8 +274,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.cc',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
'src/core/ext/filters/client_channel/resolving_lb_policy.cc',
'src/core/ext/filters/client_channel/resolving_lb_policy.h',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/server_address.cc',
@ -1374,7 +1372,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
'src/core/ext/filters/client_channel/resolving_lb_policy.h',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h',

@ -191,8 +191,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver_registry.h )
s.files += %w( src/core/ext/filters/client_channel/resolver_result_parsing.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver_result_parsing.h )
s.files += %w( src/core/ext/filters/client_channel/resolving_lb_policy.cc )
s.files += %w( src/core/ext/filters/client_channel/resolving_lb_policy.h )
s.files += %w( src/core/ext/filters/client_channel/retry_throttle.cc )
s.files += %w( src/core/ext/filters/client_channel/retry_throttle.h )
s.files += %w( src/core/ext/filters/client_channel/server_address.cc )

@ -501,7 +501,6 @@
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/resolver_result_parsing.cc',
'src/core/ext/filters/client_channel/resolving_lb_policy.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
@ -1125,7 +1124,6 @@
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/resolver_result_parsing.cc',
'src/core/ext/filters/client_channel/resolving_lb_policy.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',

@ -171,8 +171,6 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver_registry.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver_result_parsing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver_result_parsing.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolving_lb_policy.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolving_lb_policy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_throttle.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_throttle.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/server_address.cc" role="src" />

@ -30,6 +30,7 @@
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include <grpc/support/alloc.h>
@ -45,12 +46,12 @@
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/service_config_call_data.h"
@ -78,9 +79,6 @@
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
using grpc_core::internal::ClientChannelMethodParsedConfig;
using grpc_core::internal::ServerRetryThrottleData;
//
// Client channel filter
//
@ -105,6 +103,9 @@ using grpc_core::internal::ServerRetryThrottleData;
namespace grpc_core {
using internal::ClientChannelMethodParsedConfig;
using internal::ServerRetryThrottleData;
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
@ -236,34 +237,48 @@ class ChannelData {
Atomic<bool> done_{false};
};
class ChannelConfigHelper
: public ResolvingLoadBalancingPolicy::ChannelConfigHelper {
class ResolverResultHandler : public Resolver::ResultHandler {
public:
explicit ChannelConfigHelper(ChannelData* chand) : chand_(chand) {}
explicit ResolverResultHandler(ChannelData* chand) : chand_(chand) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
}
ChooseServiceConfigResult ChooseServiceConfig(
const Resolver::Result& result) override;
~ResolverResultHandler() override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
}
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
}
void StartUsingServiceConfigForCalls() override;
void ReturnResult(Resolver::Result result) override {
chand_->OnResolverResultChangedLocked(std::move(result));
}
void ResolverTransientFailure(grpc_error* error) override;
void ReturnError(grpc_error* error) override {
chand_->OnResolverError(error);
}
private:
static void ChooseLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
ChannelData* chand_;
};
ChannelData(grpc_channel_element_args* args, grpc_error** error);
~ChannelData();
void OnResolverResultChangedLocked(Resolver::Result result);
void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const grpc_channel_args& args);
void UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker);
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker,
grpc_error* resolver_transient_failure_error = GRPC_ERROR_NONE);
void UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
@ -273,9 +288,8 @@ class ChannelData {
void UpdateServiceConfigInDataPlaneLocked();
void CreateResolvingLoadBalancingPolicyLocked();
void DestroyResolvingLoadBalancingPolicyLocked();
void CreateResolverLocked();
void DestroyResolverAndLbPolicyLocked();
grpc_error* DoPingLocked(grpc_transport_op* op);
@ -293,10 +307,9 @@ class ChannelData {
ClientChannelFactory* client_channel_factory_;
const grpc_channel_args* channel_args_;
RefCountedPtr<ServiceConfig> default_service_config_;
grpc_core::UniquePtr<char> server_name_;
grpc_core::UniquePtr<char> target_uri_;
UniquePtr<char> server_name_;
UniquePtr<char> target_uri_;
channelz::ChannelNode* channelz_node_;
ChannelConfigHelper channel_config_helper_;
//
// Fields used in the data plane. Guarded by data_plane_mu.
@ -316,12 +329,14 @@ class ChannelData {
//
std::shared_ptr<WorkSerializer> work_serializer_;
grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
ConnectivityStateTracker state_tracker_;
grpc_core::UniquePtr<char> health_check_service_name_;
OrphanablePtr<Resolver> resolver_;
bool previous_resolution_contained_addresses_ = false;
RefCountedPtr<ServiceConfig> saved_service_config_;
RefCountedPtr<ConfigSelector> saved_config_selector_;
UniquePtr<char> health_check_service_name_;
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
// The number of SubchannelWrapper instances referencing a given Subchannel.
std::map<Subchannel*, int> subchannel_refcount_map_;
// The set of SubchannelWrappers that currently exist.
@ -346,8 +361,8 @@ class ChannelData {
// synchronously via get_channel_info().
//
gpr_mu info_mu_;
grpc_core::UniquePtr<char> info_lb_policy_name_;
grpc_core::UniquePtr<char> info_service_config_json_;
UniquePtr<char> info_lb_policy_name_;
UniquePtr<char> info_service_config_json_;
//
// Fields guarded by a mutex, since they need to be accessed
@ -399,8 +414,8 @@ class CallData {
grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
calld_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
linked_mdelem->md = grpc_mdelem_from_slices(
grpc_core::ExternallyManagedSlice(key.data(), key.size()),
grpc_core::ExternallyManagedSlice(value.data(), value.size()));
ExternallyManagedSlice(key.data(), key.size()),
ExternallyManagedSlice(value.data(), value.size()));
GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
GRPC_ERROR_NONE);
}
@ -893,7 +908,7 @@ class CallData {
class ChannelData::SubchannelWrapper : public SubchannelInterface {
public:
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
grpc_core::UniquePtr<char> health_check_service_name)
UniquePtr<char> health_check_service_name)
: SubchannelInterface(
GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
? "SubchannelWrapper"
@ -959,8 +974,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
initial_state);
subchannel_->WatchConnectivityState(
initial_state,
grpc_core::UniquePtr<char>(
gpr_strdup(health_check_service_name_.get())),
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
watcher_wrapper));
}
@ -986,8 +1000,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
}
void UpdateHealthCheckServiceName(
grpc_core::UniquePtr<char> health_check_service_name) {
void UpdateHealthCheckServiceName(UniquePtr<char> health_check_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: subchannel wrapper %p: updating health check service "
@ -1013,8 +1026,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
watcher_wrapper = replacement;
subchannel_->WatchConnectivityState(
replacement->last_seen_state(),
grpc_core::UniquePtr<char>(
gpr_strdup(health_check_service_name.get())),
UniquePtr<char>(gpr_strdup(health_check_service_name.get())),
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
replacement));
}
@ -1113,7 +1125,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
}
ConnectivityStateChange state_change = PopConnectivityStateChange();
absl::optional<absl::Cord> keepalive_throttling =
state_change.status.GetPayload(grpc_core::kKeepaliveThrottlingKey);
state_change.status.GetPayload(kKeepaliveThrottlingKey);
if (keepalive_throttling.has_value()) {
int new_keepalive_time = -1;
if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
@ -1178,7 +1190,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
ChannelData* chand_;
Subchannel* subchannel_;
grpc_core::UniquePtr<char> health_check_service_name_;
UniquePtr<char> health_check_service_name_;
// Maps from the address of the watcher passed to us by the LB policy
// to the address of the WrapperWatcher that we passed to the underlying
// subchannel. This is needed so that when the LB policy calls
@ -1367,10 +1379,11 @@ class ChannelData::ClientChannelControlHelper
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override {
if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
// Determine health check service name.
bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
grpc_core::UniquePtr<char> health_check_service_name;
UniquePtr<char> health_check_service_name;
if (!inhibit_health_checking) {
health_check_service_name.reset(
gpr_strdup(chand_->health_check_service_name_.get()));
@ -1410,6 +1423,7 @@ class ChannelData::ClientChannelControlHelper
void UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override {
if (chand_->resolver_ == nullptr) return; // Shutting down.
grpc_error* disconnect_error = chand_->disconnect_error();
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
const char* extra = disconnect_error == GRPC_ERROR_NONE
@ -1426,11 +1440,17 @@ class ChannelData::ClientChannelControlHelper
}
}
// No-op -- we should never get this from ResolvingLoadBalancingPolicy.
void RequestReresolution() override {}
void RequestReresolution() override {
if (chand_->resolver_ == nullptr) return; // Shutting down.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
}
chand_->resolver_->RequestReresolutionLocked();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
if (chand_->resolver_ == nullptr) return; // Shutting down.
if (chand_->channelz_node_ != nullptr) {
chand_->channelz_node_->AddTraceEvent(
ConvertSeverityEnum(severity),
@ -1449,139 +1469,6 @@ class ChannelData::ClientChannelControlHelper
ChannelData* chand_;
};
//
// ChannelData::ChannelConfigHelper
//
ChannelData::ChannelConfigHelper::ChooseServiceConfigResult
ChannelData::ChannelConfigHelper::ChooseServiceConfig(
const Resolver::Result& result) {
ChooseServiceConfigResult service_config_result;
RefCountedPtr<ServiceConfig> service_config;
RefCountedPtr<ConfigSelector> config_selector;
if (result.service_config_error != GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
chand_, grpc_error_string(result.service_config_error));
}
// If the service config was invalid, then fallback to the
// previously returned service config.
if (chand_->saved_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. "
"Continuing to use previous service config.",
chand_);
}
service_config = chand_->saved_service_config_;
config_selector = chand_->saved_config_selector_;
} else {
// No previously returned config, so put the channel into
// TRANSIENT_FAILURE.
service_config_result.no_valid_service_config = true;
return service_config_result;
}
} else if (result.service_config == nullptr) {
// Resolver did not return any service config.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned no service config. Using default "
"service config for channel.",
chand_);
}
service_config = chand_->default_service_config_;
} else {
// Use ServiceConfig and ConfigSelector returned by resolver.
service_config = result.service_config;
config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
}
GPR_ASSERT(service_config != nullptr);
// Extract global config for client channel.
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
static_cast<const internal::ClientChannelGlobalParsedConfig*>(
service_config->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex()));
// Find LB policy config.
ChooseLbPolicy(result, parsed_service_config,
&service_config_result.lb_policy_config);
// Check if the ServiceConfig has changed.
const bool service_config_changed =
chand_->saved_service_config_ == nullptr ||
service_config->json_string() !=
chand_->saved_service_config_->json_string();
// Check if the ConfigSelector has changed.
const bool config_selector_changed = !ConfigSelector::Equals(
chand_->saved_config_selector_.get(), config_selector.get());
// Indicate a change if either the ServiceConfig or ConfigSelector have
// changed.
service_config_result.service_config_changed =
service_config_changed || config_selector_changed;
// If it has, apply the global parameters now.
if (service_config_result.service_config_changed) {
chand_->UpdateServiceConfigInControlPlaneLocked(
std::move(service_config), std::move(config_selector),
parsed_service_config, service_config_result.lb_policy_config->name());
}
// Return results.
return service_config_result;
}
void ChannelData::ChannelConfigHelper::StartUsingServiceConfigForCalls() {
chand_->UpdateServiceConfigInDataPlaneLocked();
}
void ChannelData::ChannelConfigHelper::ResolverTransientFailure(
grpc_error* error) {
MutexLock lock(&chand_->data_plane_mu_);
GRPC_ERROR_UNREF(chand_->resolver_transient_failure_error_);
chand_->resolver_transient_failure_error_ = error;
}
void ChannelData::ChannelConfigHelper::ChooseLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
// Prefer the LB policy config found in the service config.
if (parsed_service_config->parsed_lb_config() != nullptr) {
*lb_policy_config = parsed_service_config->parsed_lb_config();
return;
}
// Try the deprecated LB policy name from the service config.
// If not, try the setting from channel args.
const char* policy_name = nullptr;
if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
} else {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
policy_name = grpc_channel_arg_get_string(channel_arg);
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (policy_name == nullptr) policy_name = "pick_first";
// Now that we have the policy name, construct an empty config for it.
Json config_json = Json::Array{Json::Object{
{policy_name, Json::Object{}},
}};
grpc_error* parse_error = GRPC_ERROR_NONE;
*lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
config_json, &parse_error);
// The policy name came from one of three places:
// - The deprecated loadBalancingPolicy field in the service config,
// in which case the code in ClientChannelServiceConfigParser
// already verified that the policy does not require a config.
// - One of the hard-coded values here, all of which are known to not
// require a config.
// - A channel arg, in which case the application did something that
// is a misuse of our API.
// In the first two cases, these assertions will always be true. In
// the last case, this is probably fine for now.
// TODO(roth): If the last case becomes a problem, add better error
// handling here.
GPR_ASSERT(*lb_policy_config != nullptr);
GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
}
//
// ChannelData implementation
//
@ -1640,11 +1527,10 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
channelz_node_(GetChannelzNode(args->channel_args)),
channel_config_helper_(this),
work_serializer_(std::make_shared<WorkSerializer>()),
interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
disconnect_error_(GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
@ -1715,7 +1601,7 @@ ChannelData::~ChannelData() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
}
DestroyResolvingLoadBalancingPolicyLocked();
DestroyResolverAndLbPolicyLocked();
grpc_channel_args_destroy(channel_args_);
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
// Stop backup polling.
@ -1725,10 +1611,247 @@ ChannelData::~ChannelData() {
gpr_mu_destroy(&info_mu_);
}
RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
// Prefer the LB policy config found in the service config.
if (parsed_service_config->parsed_lb_config() != nullptr) {
return parsed_service_config->parsed_lb_config();
}
// Try the deprecated LB policy name from the service config.
// If not, try the setting from channel args.
const char* policy_name = nullptr;
if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
} else {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
policy_name = grpc_channel_arg_get_string(channel_arg);
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (policy_name == nullptr) policy_name = "pick_first";
// Now that we have the policy name, construct an empty config for it.
Json config_json = Json::Array{Json::Object{
{policy_name, Json::Object{}},
}};
grpc_error* parse_error = GRPC_ERROR_NONE;
auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
config_json, &parse_error);
// The policy name came from one of three places:
// - The deprecated loadBalancingPolicy field in the service config,
// in which case the code in ClientChannelServiceConfigParser
// already verified that the policy does not require a config.
// - One of the hard-coded values here, all of which are known to not
// require a config.
// - A channel arg, in which case the application did something that
// is a misuse of our API.
// In the first two cases, these assertions will always be true. In
// the last case, this is probably fine for now.
// TODO(roth): If the last case becomes a problem, add better error
// handling here.
GPR_ASSERT(lb_policy_config != nullptr);
GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
return lb_policy_config;
}
void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) {
// Handle race conditions.
if (resolver_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
}
// We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from
// zero to non-zero.
// (c) Address resolution that causes number of backends to go from
// non-zero to zero.
// (d) Address resolution that causes a new LB policy to be created.
//
// We track a list of strings to eventually be concatenated and traced.
absl::InlinedVector<const char*, 3> trace_strings;
if (result.addresses.empty() && previous_resolution_contained_addresses_) {
trace_strings.push_back("Address list became empty");
} else if (!result.addresses.empty() &&
!previous_resolution_contained_addresses_) {
trace_strings.push_back("Address list became non-empty");
}
previous_resolution_contained_addresses_ = !result.addresses.empty();
// The result of grpc_error_string() is owned by the error itself.
// We're storing that string in trace_strings, so we need to make sure
// that the error lives until we're done with the string.
grpc_error* service_config_error =
GRPC_ERROR_REF(result.service_config_error);
if (service_config_error != GRPC_ERROR_NONE) {
trace_strings.push_back(grpc_error_string(service_config_error));
}
// Choose the service config.
RefCountedPtr<ServiceConfig> service_config;
RefCountedPtr<ConfigSelector> config_selector;
if (service_config_error != GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
this, grpc_error_string(service_config_error));
}
// If the service config was invalid, then fallback to the
// previously returned service config.
if (saved_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. "
"Continuing to use previous service config.",
this);
}
service_config = saved_service_config_;
config_selector = saved_config_selector_;
} else {
// We received an invalid service config and we don't have a
// previous service config to fall back to. Put the channel into
// TRANSIENT_FAILURE.
OnResolverError(GRPC_ERROR_REF(service_config_error));
trace_strings.push_back("no valid service config");
}
} else if (result.service_config == nullptr) {
// Resolver did not return any service config.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned no service config. Using default "
"service config for channel.",
this);
}
service_config = default_service_config_;
} else {
// Use ServiceConfig and ConfigSelector returned by resolver.
service_config = result.service_config;
config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
}
if (service_config != nullptr) {
// Extract global config for client channel.
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
static_cast<const internal::ClientChannelGlobalParsedConfig*>(
service_config->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex()));
// Choose LB policy config.
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
ChooseLbPolicy(result, parsed_service_config);
// Check if the ServiceConfig has changed.
const bool service_config_changed =
saved_service_config_ == nullptr ||
service_config->json_string() != saved_service_config_->json_string();
// Check if the ConfigSelector has changed.
const bool config_selector_changed = !ConfigSelector::Equals(
saved_config_selector_.get(), config_selector.get());
// If either has changed, apply the global parameters now.
if (service_config_changed || config_selector_changed) {
// Update service config in control plane.
UpdateServiceConfigInControlPlaneLocked(
std::move(service_config), std::move(config_selector),
parsed_service_config, lb_policy_config->name());
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
}
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
std::move(result));
if (service_config_changed || config_selector_changed) {
// Start using new service config for calls.
// This needs to happen after the LB policy has been updated, since
// the ConfigSelector may need the LB policy to know about new
// destinations before it can send RPCs to those destinations.
UpdateServiceConfigInDataPlaneLocked();
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back("Service config changed");
}
}
// Add channel trace event.
if (!trace_strings.empty()) {
std::string message =
absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
if (channelz_node_ != nullptr) {
channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
grpc_slice_from_cpp_string(message));
}
}
GRPC_ERROR_UNREF(service_config_error);
}
void ChannelData::OnResolverError(grpc_error* error) {
if (resolver_ == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
grpc_error_string(error));
}
// If we already have an LB policy from a previous resolution
// result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE.
if (lb_policy_ == nullptr) {
grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
"resolver failure",
absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
GRPC_ERROR_REF(state_error)),
state_error);
}
GRPC_ERROR_UNREF(error);
}
void ChannelData::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result) {
// Construct update.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
// Remove the config selector from channel args so that we're not holding
// unnecessary refs that cause it to be destroyed somewhere other than in the
// WorkSerializer.
const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
update_args.args =
grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
// Create policy if needed.
if (lb_policy_ == nullptr) {
lb_policy_ = CreateLbPolicyLocked(*update_args.args);
}
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
lb_policy_.get());
}
lb_policy_->UpdateLocked(std::move(update_args));
}
// Creates a new LB policy.
OrphanablePtr<LoadBalancingPolicy> ChannelData::CreateLbPolicyLocked(
const grpc_channel_args& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer_;
lb_policy_args.channel_control_helper =
absl::make_unique<ClientChannelControlHelper>(this);
lb_policy_args.args = &args;
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_client_channel_routing_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
lb_policy.get());
}
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties_);
return lb_policy;
}
void ChannelData::UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker,
grpc_error* resolver_transient_failure_error) {
// Clean the control plane when entering IDLE.
if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
health_check_service_name_.reset();
@ -1762,6 +1885,9 @@ void ChannelData::UpdateStateAndPickerLocked(
RefCountedPtr<ConfigSelector> config_selector_to_unref;
{
MutexLock lock(&data_plane_mu_);
// Update resolver transient failure.
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
resolver_transient_failure_error_ = resolver_transient_failure_error;
// Handle subchannel updates.
for (auto& p : pending_subchannel_updates_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@ -1806,7 +1932,7 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ConfigSelector> config_selector,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
const char* lb_policy_name) {
grpc_core::UniquePtr<char> service_config_json(
UniquePtr<char> service_config_json(
gpr_strdup(service_config->json_string().c_str()));
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
@ -1826,12 +1952,11 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked(
// Update health check service name used by existing subchannel wrappers.
for (auto* subchannel_wrapper : subchannel_wrappers_) {
subchannel_wrapper->UpdateHealthCheckServiceName(
grpc_core::UniquePtr<char>(
gpr_strdup(health_check_service_name_.get())));
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())));
}
}
// Swap out the data used by GetChannelInfo().
grpc_core::UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
{
MutexLock lock(&info_mu_);
info_lb_policy_name_ = std::move(lb_policy_name_owned);
@ -1899,30 +2024,41 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
// of scope.
}
void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
// Instantiate resolving LB policy.
LoadBalancingPolicy::Args lb_args;
lb_args.work_serializer = work_serializer_;
lb_args.channel_control_helper =
absl::make_unique<ClientChannelControlHelper>(this);
lb_args.args = channel_args_;
grpc_core::UniquePtr<char> target_uri(gpr_strdup(target_uri_.get()));
resolving_lb_policy_.reset(new ResolvingLoadBalancingPolicy(
std::move(lb_args), &grpc_client_channel_routing_trace,
std::move(target_uri), &channel_config_helper_));
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
void ChannelData::CreateResolverLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
}
resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
absl::make_unique<ResolverResultHandler>(this));
// Since the validity of the args was checked when the channel was created,
// CreateResolver() must return a non-null result.
GPR_ASSERT(resolver_ != nullptr);
UpdateStateAndPickerLocked(
GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
resolver_->StartLocked();
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
resolving_lb_policy_.get());
gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
}
}
void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
if (resolving_lb_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
resolving_lb_policy_.reset();
void ChannelData::DestroyResolverAndLbPolicyLocked() {
if (resolver_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
resolver_.get());
}
resolver_.reset();
if (lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties_);
lb_policy_.reset();
}
}
}
@ -1972,8 +2108,8 @@ void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
}
// Reset backoff.
if (op->reset_connect_backoff) {
if (resolving_lb_policy_ != nullptr) {
resolving_lb_policy_->ResetBackoffLocked();
if (lb_policy_ != nullptr) {
lb_policy_->ResetBackoffLocked();
}
}
// Disconnect or enter IDLE.
@ -1982,7 +2118,7 @@ void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
grpc_error_string(op->disconnect_with_error));
}
DestroyResolvingLoadBalancingPolicyLocked();
DestroyResolverAndLbPolicyLocked();
intptr_t value;
if (grpc_error_get_int(op->disconnect_with_error,
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
@ -2071,10 +2207,10 @@ ChannelData::GetConnectedSubchannelInDataPlane(
}
void ChannelData::TryToConnectLocked() {
if (resolving_lb_policy_ != nullptr) {
resolving_lb_policy_->ExitIdleLocked();
} else {
CreateResolvingLoadBalancingPolicyLocked();
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
} else if (resolver_ == nullptr) {
CreateResolverLocked();
}
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
}

@ -105,7 +105,7 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
// 2. We are currently running in the data plane mutex, but we
// need to bounce into the control plane work_serializer to call
// ExitIdleLocked().
if (!exit_idle_called_) {
if (!exit_idle_called_ && parent_ != nullptr) {
exit_idle_called_ = true;
auto* parent = parent_->Ref().release(); // ref held by lambda.
ExecCtx::Run(DEBUG_LOCATION,

@ -1,355 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
#include <inttypes.h>
#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
namespace grpc_core {
//
// ResolvingLoadBalancingPolicy::ResolverResultHandler
//
class ResolvingLoadBalancingPolicy::ResolverResultHandler
: public Resolver::ResultHandler {
public:
explicit ResolverResultHandler(
RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
~ResolverResultHandler() override {
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
parent_.get());
}
}
void ReturnResult(Resolver::Result result) override {
parent_->OnResolverResultChangedLocked(std::move(result));
}
void ReturnError(grpc_error* error) override {
parent_->OnResolverError(error);
}
private:
RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
};
//
// ResolvingLoadBalancingPolicy::ResolvingControlHelper
//
class ResolvingLoadBalancingPolicy::ResolvingControlHelper
: public LoadBalancingPolicy::ChannelControlHelper {
public:
explicit ResolvingControlHelper(
RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
if (parent_->resolver_ == nullptr) return; // Shutting down.
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
void RequestReresolution() override {
if (parent_->resolver_ == nullptr) return; // Shutting down.
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
parent_.get());
}
parent_->resolver_->RequestReresolutionLocked();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
if (parent_->resolver_ == nullptr) return; // Shutting down.
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
private:
RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
};
//
// ResolvingLoadBalancingPolicy
//
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
ChannelConfigHelper* helper)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
target_uri_(std::move(target_uri)),
helper_(helper) {
GPR_ASSERT(helper_ != nullptr);
resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), args.args, interested_parties(), work_serializer(),
absl::make_unique<ResolverResultHandler>(Ref()));
// Since the validity of args has been checked when create the channel,
// CreateResolver() must return a non-null result.
GPR_ASSERT(resolver_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
}
channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref()));
resolver_->StartLocked();
}
ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
GPR_ASSERT(resolver_ == nullptr);
GPR_ASSERT(lb_policy_ == nullptr);
}
void ResolvingLoadBalancingPolicy::ShutdownLocked() {
if (resolver_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this,
resolver_.get());
}
resolver_.reset();
if (lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties());
lb_policy_.reset();
}
}
}
void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
}
void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
if (resolver_ != nullptr) {
resolver_->ResetBackoffLocked();
resolver_->RequestReresolutionLocked();
}
if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
}
void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
if (resolver_ == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
grpc_error_string(error));
}
// If we already have an LB policy from a previous resolution
// result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE.
if (lb_policy_ == nullptr) {
grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
absl::make_unique<TransientFailurePicker>(state_error));
}
GRPC_ERROR_UNREF(error);
}
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result) {
// Construct update.
UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
// Remove the config selector from channel args so that we're not holding
// unnecessary refs that cause it to be destroyed somewhere other than in the
// WorkSerializer.
const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
update_args.args =
grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
// Create policy if needed.
if (lb_policy_ == nullptr) {
lb_policy_ = CreateLbPolicyLocked(*update_args.args);
}
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
lb_policy_.get());
}
lb_policy_->UpdateLocked(std::move(update_args));
}
// Creates a new LB policy.
OrphanablePtr<LoadBalancingPolicy>
ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
const grpc_channel_args& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.channel_control_helper =
absl::make_unique<ResolvingControlHelper>(Ref());
lb_policy_args.args = &args;
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
lb_policy.get());
}
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
bool resolution_contains_addresses, TraceStringVector* trace_strings) {
if (!resolution_contains_addresses &&
previous_resolution_contained_addresses_) {
trace_strings->push_back("Address list became empty");
} else if (resolution_contains_addresses &&
!previous_resolution_contained_addresses_) {
trace_strings->push_back("Address list became non-empty");
}
previous_resolution_contained_addresses_ = resolution_contains_addresses;
}
void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
const TraceStringVector& trace_strings) const {
if (!trace_strings.empty()) {
std::string message =
absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
message);
}
}
void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
Resolver::Result result) {
// Handle race conditions.
if (resolver_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
}
// We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from
// zero to non-zero.
// (c) Address resolution that causes number of backends to go from
// non-zero to zero.
// (d) Address resolution that causes a new LB policy to be created.
//
// We track a list of strings to eventually be concatenated and traced.
TraceStringVector trace_strings;
MaybeAddTraceMessagesForAddressChangesLocked(!result.addresses.empty(),
&trace_strings);
// The result of grpc_error_string() is owned by the error itself.
// We're storing that string in trace_strings, so we need to make sure
// that the error lives until we're done with the string.
grpc_error* service_config_error =
GRPC_ERROR_REF(result.service_config_error);
if (service_config_error != GRPC_ERROR_NONE) {
trace_strings.push_back(grpc_error_string(service_config_error));
}
// Choose the service config.
ChannelConfigHelper::ChooseServiceConfigResult service_config_result;
if (helper_ != nullptr) {
service_config_result = helper_->ChooseServiceConfig(result);
} else {
service_config_result.lb_policy_config = child_lb_config_;
}
if (service_config_result.no_valid_service_config) {
// We received an invalid service config and we don't have a
// previous service config to fall back to.
OnResolverError(GRPC_ERROR_REF(service_config_error));
trace_strings.push_back("no valid service config");
} else {
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(
std::move(service_config_result.lb_policy_config), std::move(result));
if (service_config_result.service_config_changed) {
// Tell channel to start using new service config for calls.
// This needs to happen after the LB policy has been updated, since
// the ConfigSelector may need the LB policy to know about new
// destinations before it can send RPCs to those destinations.
if (helper_ != nullptr) helper_->StartUsingServiceConfigForCalls();
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back("Service config changed");
}
}
// Add channel trace event.
ConcatenateAndAddChannelTraceLocked(trace_strings);
GRPC_ERROR_UNREF(service_config_error);
}
} // namespace grpc_core

@ -1,138 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVING_LB_POLICY_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVING_LB_POLICY_H
#include <grpc/support/port_platform.h>
#include "absl/container/inlined_vector.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
// An LB policy that wraps a resolver and a child LB policy to make use
// of the addresses returned by the resolver.
//
// When used in the client_channel code, the resolver will attempt to
// fetch the service config, and the child LB policy name and config
// will be determined based on the service config.
//
// When used in an LB policy implementation that needs to do another
// round of resolution before creating a child policy, the resolver does
// not fetch the service config, and the caller must pre-determine the
// child LB policy and config to use.
class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
class ChannelConfigHelper {
public:
struct ChooseServiceConfigResult {
// Set to true if the service config has changed since the last result.
bool service_config_changed = false;
// Set to true if we don't have a valid service config to use.
// This tells the ResolvingLoadBalancingPolicy to put the channel
// into TRANSIENT_FAILURE.
bool no_valid_service_config = false;
// The LB policy config to use.
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
};
virtual ~ChannelConfigHelper() = default;
// Chooses the service config for the channel.
virtual ChooseServiceConfigResult ChooseServiceConfig(
const Resolver::Result& result) = 0;
// Starts using the service config for calls.
virtual void StartUsingServiceConfigForCalls() = 0;
// Indicates a resolver transient failure.
virtual void ResolverTransientFailure(grpc_error* error) = 0;
};
ResolvingLoadBalancingPolicy(Args args, TraceFlag* tracer,
grpc_core::UniquePtr<char> target_uri,
ChannelConfigHelper* helper);
const char* name() const override { return "resolving_lb"; }
// No-op -- should never get updates from the channel.
// TODO(roth): Need to support updating child LB policy's config for xds
// use case.
void UpdateLocked(UpdateArgs /*args*/) override {}
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
using TraceStringVector = absl::InlinedVector<const char*, 3>;
class ResolverResultHandler;
class ResolvingControlHelper;
~ResolvingLoadBalancingPolicy() override;
void ShutdownLocked() override;
void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const grpc_channel_args& args);
void MaybeAddTraceMessagesForAddressChangesLocked(
bool resolution_contains_addresses, TraceStringVector* trace_strings);
void ConcatenateAndAddChannelTraceLocked(
const TraceStringVector& trace_strings) const;
void OnResolverResultChangedLocked(Resolver::Result result);
// Passed in from caller at construction time.
TraceFlag* tracer_;
grpc_core::UniquePtr<char> target_uri_;
ChannelConfigHelper* helper_;
// Resolver and associated state.
OrphanablePtr<Resolver> resolver_;
bool previous_resolution_contained_addresses_ = false;
// Determined by resolver results.
grpc_core::UniquePtr<char> child_policy_name_;
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVING_LB_POLICY_H */

@ -66,7 +66,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/resolver_result_parsing.cc',
'src/core/ext/filters/client_channel/resolving_lb_policy.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',

@ -1125,8 +1125,6 @@ src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_registry.h \
src/core/ext/filters/client_channel/resolver_result_parsing.cc \
src/core/ext/filters/client_channel/resolver_result_parsing.h \
src/core/ext/filters/client_channel/resolving_lb_policy.cc \
src/core/ext/filters/client_channel/resolving_lb_policy.h \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/retry_throttle.h \
src/core/ext/filters/client_channel/server_address.cc \

@ -955,8 +955,6 @@ src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_registry.h \
src/core/ext/filters/client_channel/resolver_result_parsing.cc \
src/core/ext/filters/client_channel/resolver_result_parsing.h \
src/core/ext/filters/client_channel/resolving_lb_policy.cc \
src/core/ext/filters/client_channel/resolving_lb_policy.h \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/retry_throttle.h \
src/core/ext/filters/client_channel/server_address.cc \

Loading…
Cancel
Save