diff --git a/BUILD b/BUILD index 561329e33f8..2ab28868869 100644 --- a/BUILD +++ b/BUILD @@ -1082,7 +1082,6 @@ grpc_cc_library( "src/core/ext/filters/client_channel/resolver.cc", "src/core/ext/filters/client_channel/resolver_registry.cc", "src/core/ext/filters/client_channel/resolver_result_parsing.cc", - "src/core/ext/filters/client_channel/resolving_lb_policy.cc", "src/core/ext/filters/client_channel/retry_throttle.cc", "src/core/ext/filters/client_channel/server_address.cc", "src/core/ext/filters/client_channel/service_config.cc", @@ -1114,7 +1113,6 @@ grpc_cc_library( "src/core/ext/filters/client_channel/resolver_factory.h", "src/core/ext/filters/client_channel/resolver_registry.h", "src/core/ext/filters/client_channel/resolver_result_parsing.h", - "src/core/ext/filters/client_channel/resolving_lb_policy.h", "src/core/ext/filters/client_channel/retry_throttle.h", "src/core/ext/filters/client_channel/server_address.h", "src/core/ext/filters/client_channel/service_config.h", diff --git a/BUILD.gn b/BUILD.gn index ade80c7c1e2..abd600035f6 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -295,8 +295,6 @@ config("grpc_config") { "src/core/ext/filters/client_channel/resolver_registry.h", "src/core/ext/filters/client_channel/resolver_result_parsing.cc", "src/core/ext/filters/client_channel/resolver_result_parsing.h", - "src/core/ext/filters/client_channel/resolving_lb_policy.cc", - "src/core/ext/filters/client_channel/resolving_lb_policy.h", "src/core/ext/filters/client_channel/retry_throttle.cc", "src/core/ext/filters/client_channel/retry_throttle.h", "src/core/ext/filters/client_channel/server_address.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 66090c1fea2..a9c651e4eb5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1491,7 +1491,6 @@ add_library(grpc src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc - src/core/ext/filters/client_channel/resolving_lb_policy.cc src/core/ext/filters/client_channel/retry_throttle.cc src/core/ext/filters/client_channel/server_address.cc src/core/ext/filters/client_channel/service_config.cc @@ -2287,7 +2286,6 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc - src/core/ext/filters/client_channel/resolving_lb_policy.cc src/core/ext/filters/client_channel/retry_throttle.cc src/core/ext/filters/client_channel/server_address.cc src/core/ext/filters/client_channel/service_config.cc diff --git a/Makefile b/Makefile index 27308c8fc84..4e779e6f52d 100644 --- a/Makefile +++ b/Makefile @@ -1920,7 +1920,6 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ - src/core/ext/filters/client_channel/resolving_lb_policy.cc \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/server_address.cc \ src/core/ext/filters/client_channel/service_config.cc \ @@ -2572,7 +2571,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ - src/core/ext/filters/client_channel/resolving_lb_policy.cc \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/server_address.cc \ src/core/ext/filters/client_channel/service_config.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index e945677e959..bb68f4631a6 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -420,7 +420,6 @@ libs: - src/core/ext/filters/client_channel/resolver_factory.h - src/core/ext/filters/client_channel/resolver_registry.h - src/core/ext/filters/client_channel/resolver_result_parsing.h - - src/core/ext/filters/client_channel/resolving_lb_policy.h - src/core/ext/filters/client_channel/retry_throttle.h - src/core/ext/filters/client_channel/server_address.h - src/core/ext/filters/client_channel/service_config.h @@ -916,7 +915,6 @@ libs: - src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc - src/core/ext/filters/client_channel/resolver_registry.cc - src/core/ext/filters/client_channel/resolver_result_parsing.cc - - src/core/ext/filters/client_channel/resolving_lb_policy.cc - src/core/ext/filters/client_channel/retry_throttle.cc - src/core/ext/filters/client_channel/server_address.cc - src/core/ext/filters/client_channel/service_config.cc @@ -1589,7 +1587,6 @@ libs: - src/core/ext/filters/client_channel/resolver_factory.h - src/core/ext/filters/client_channel/resolver_registry.h - src/core/ext/filters/client_channel/resolver_result_parsing.h - - src/core/ext/filters/client_channel/resolving_lb_policy.h - src/core/ext/filters/client_channel/retry_throttle.h - src/core/ext/filters/client_channel/server_address.h - src/core/ext/filters/client_channel/service_config.h @@ -1843,7 +1840,6 @@ libs: - src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc - src/core/ext/filters/client_channel/resolver_registry.cc - src/core/ext/filters/client_channel/resolver_result_parsing.cc - - src/core/ext/filters/client_channel/resolving_lb_policy.cc - src/core/ext/filters/client_channel/retry_throttle.cc - src/core/ext/filters/client_channel/server_address.cc - src/core/ext/filters/client_channel/service_config.cc diff --git a/config.m4 b/config.m4 index aabd536cccc..a397d3ac8c3 100644 --- a/config.m4 +++ b/config.m4 @@ -90,7 +90,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ - src/core/ext/filters/client_channel/resolving_lb_policy.cc \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/server_address.cc \ src/core/ext/filters/client_channel/service_config.cc \ diff --git a/config.w32 b/config.w32 index 5070ec056af..973a1ff6496 100644 --- a/config.w32 +++ b/config.w32 @@ -57,7 +57,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\resolver\\xds\\xds_resolver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver_registry.cc " + "src\\core\\ext\\filters\\client_channel\\resolver_result_parsing.cc " + - "src\\core\\ext\\filters\\client_channel\\resolving_lb_policy.cc " + "src\\core\\ext\\filters\\client_channel\\retry_throttle.cc " + "src\\core\\ext\\filters\\client_channel\\server_address.cc " + "src\\core\\ext\\filters\\client_channel\\service_config.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index bf79b1b74dd..8ebdd7c187d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -233,7 +233,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver_factory.h', 'src/core/ext/filters/client_channel/resolver_registry.h', 'src/core/ext/filters/client_channel/resolver_result_parsing.h', - 'src/core/ext/filters/client_channel/resolving_lb_policy.h', 'src/core/ext/filters/client_channel/retry_throttle.h', 'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/service_config.h', @@ -844,7 +843,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver_factory.h', 'src/core/ext/filters/client_channel/resolver_registry.h', 'src/core/ext/filters/client_channel/resolver_result_parsing.h', - 'src/core/ext/filters/client_channel/resolving_lb_policy.h', 'src/core/ext/filters/client_channel/retry_throttle.h', 'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/service_config.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index d1c08a3d2b3..f5a666e15f0 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -274,8 +274,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver_registry.h', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.h', - 'src/core/ext/filters/client_channel/resolving_lb_policy.cc', - 'src/core/ext/filters/client_channel/resolving_lb_policy.h', 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/retry_throttle.h', 'src/core/ext/filters/client_channel/server_address.cc', @@ -1376,7 +1374,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver_factory.h', 'src/core/ext/filters/client_channel/resolver_registry.h', 'src/core/ext/filters/client_channel/resolver_result_parsing.h', - 'src/core/ext/filters/client_channel/resolving_lb_policy.h', 'src/core/ext/filters/client_channel/retry_throttle.h', 'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/service_config.h', diff --git a/grpc.gemspec b/grpc.gemspec index 9888b99cdfa..24d607a0dbe 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -191,8 +191,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/resolver_registry.h ) s.files += %w( src/core/ext/filters/client_channel/resolver_result_parsing.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver_result_parsing.h ) - s.files += %w( src/core/ext/filters/client_channel/resolving_lb_policy.cc ) - s.files += %w( src/core/ext/filters/client_channel/resolving_lb_policy.h ) s.files += %w( src/core/ext/filters/client_channel/retry_throttle.cc ) s.files += %w( src/core/ext/filters/client_channel/retry_throttle.h ) s.files += %w( src/core/ext/filters/client_channel/server_address.cc ) diff --git a/grpc.gyp b/grpc.gyp index b7337b95ee8..25739933388 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -501,7 +501,6 @@ 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', - 'src/core/ext/filters/client_channel/resolving_lb_policy.cc', 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/server_address.cc', 'src/core/ext/filters/client_channel/service_config.cc', @@ -1126,7 +1125,6 @@ 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', - 'src/core/ext/filters/client_channel/resolving_lb_policy.cc', 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/server_address.cc', 'src/core/ext/filters/client_channel/service_config.cc', diff --git a/package.xml b/package.xml index f0581816534..2ec60d6e5df 100644 --- a/package.xml +++ b/package.xml @@ -171,8 +171,6 @@ - - diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 39f3aadce16..2569932891f 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -30,6 +30,7 @@ #include "absl/strings/numbers.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include @@ -45,12 +46,12 @@ #include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/global_subchannel_pool.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/local_subchannel_pool.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" -#include "src/core/ext/filters/client_channel/resolving_lb_policy.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/service_config_call_data.h" @@ -78,9 +79,6 @@ #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" -using grpc_core::internal::ClientChannelMethodParsedConfig; -using grpc_core::internal::ServerRetryThrottleData; - // // Client channel filter // @@ -105,6 +103,9 @@ using grpc_core::internal::ServerRetryThrottleData; namespace grpc_core { +using internal::ClientChannelMethodParsedConfig; +using internal::ServerRetryThrottleData; + TraceFlag grpc_client_channel_call_trace(false, "client_channel_call"); TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing"); @@ -236,34 +237,48 @@ class ChannelData { Atomic done_{false}; }; - class ChannelConfigHelper - : public ResolvingLoadBalancingPolicy::ChannelConfigHelper { + class ResolverResultHandler : public Resolver::ResultHandler { public: - explicit ChannelConfigHelper(ChannelData* chand) : chand_(chand) {} + explicit ResolverResultHandler(ChannelData* chand) : chand_(chand) { + GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler"); + } - ChooseServiceConfigResult ChooseServiceConfig( - const Resolver::Result& result) override; + ~ResolverResultHandler() override { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_); + } + GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler"); + } - void StartUsingServiceConfigForCalls() override; + void ReturnResult(Resolver::Result result) override { + chand_->OnResolverResultChangedLocked(std::move(result)); + } - void ResolverTransientFailure(grpc_error* error) override; + void ReturnError(grpc_error* error) override { + chand_->OnResolverError(error); + } private: - static void ChooseLbPolicy( - const Resolver::Result& resolver_result, - const internal::ClientChannelGlobalParsedConfig* parsed_service_config, - RefCountedPtr* lb_policy_config); - ChannelData* chand_; }; ChannelData(grpc_channel_element_args* args, grpc_error** error); ~ChannelData(); + void OnResolverResultChangedLocked(Resolver::Result result); + void OnResolverError(grpc_error* error); + + void CreateOrUpdateLbPolicyLocked( + RefCountedPtr lb_policy_config, + Resolver::Result result); + OrphanablePtr CreateLbPolicyLocked( + const grpc_channel_args& args); + void UpdateStateAndPickerLocked( grpc_connectivity_state state, const absl::Status& status, const char* reason, - std::unique_ptr picker); + std::unique_ptr picker, + grpc_error* resolver_transient_failure_error = GRPC_ERROR_NONE); void UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, @@ -273,9 +288,8 @@ class ChannelData { void UpdateServiceConfigInDataPlaneLocked(); - void CreateResolvingLoadBalancingPolicyLocked(); - - void DestroyResolvingLoadBalancingPolicyLocked(); + void CreateResolverLocked(); + void DestroyResolverAndLbPolicyLocked(); grpc_error* DoPingLocked(grpc_transport_op* op); @@ -293,10 +307,9 @@ class ChannelData { ClientChannelFactory* client_channel_factory_; const grpc_channel_args* channel_args_; RefCountedPtr default_service_config_; - grpc_core::UniquePtr server_name_; - grpc_core::UniquePtr target_uri_; + UniquePtr server_name_; + UniquePtr target_uri_; channelz::ChannelNode* channelz_node_; - ChannelConfigHelper channel_config_helper_; // // Fields used in the data plane. Guarded by data_plane_mu. @@ -316,12 +329,14 @@ class ChannelData { // std::shared_ptr work_serializer_; grpc_pollset_set* interested_parties_; - RefCountedPtr subchannel_pool_; - OrphanablePtr resolving_lb_policy_; ConnectivityStateTracker state_tracker_; - grpc_core::UniquePtr health_check_service_name_; + OrphanablePtr resolver_; + bool previous_resolution_contained_addresses_ = false; RefCountedPtr saved_service_config_; RefCountedPtr saved_config_selector_; + UniquePtr health_check_service_name_; + OrphanablePtr lb_policy_; + RefCountedPtr subchannel_pool_; // The number of SubchannelWrapper instances referencing a given Subchannel. std::map subchannel_refcount_map_; // The set of SubchannelWrappers that currently exist. @@ -346,8 +361,8 @@ class ChannelData { // synchronously via get_channel_info(). // gpr_mu info_mu_; - grpc_core::UniquePtr info_lb_policy_name_; - grpc_core::UniquePtr info_service_config_json_; + UniquePtr info_lb_policy_name_; + UniquePtr info_service_config_json_; // // Fields guarded by a mutex, since they need to be accessed @@ -399,8 +414,8 @@ class CallData { grpc_linked_mdelem* linked_mdelem = static_cast( calld_->arena_->Alloc(sizeof(grpc_linked_mdelem))); linked_mdelem->md = grpc_mdelem_from_slices( - grpc_core::ExternallyManagedSlice(key.data(), key.size()), - grpc_core::ExternallyManagedSlice(value.data(), value.size())); + ExternallyManagedSlice(key.data(), key.size()), + ExternallyManagedSlice(value.data(), value.size())); GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) == GRPC_ERROR_NONE); } @@ -893,7 +908,7 @@ class CallData { class ChannelData::SubchannelWrapper : public SubchannelInterface { public: SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, - grpc_core::UniquePtr health_check_service_name) + UniquePtr health_check_service_name) : SubchannelInterface( GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) ? "SubchannelWrapper" @@ -959,8 +974,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { initial_state); subchannel_->WatchConnectivityState( initial_state, - grpc_core::UniquePtr( - gpr_strdup(health_check_service_name_.get())), + UniquePtr(gpr_strdup(health_check_service_name_.get())), RefCountedPtr( watcher_wrapper)); } @@ -986,8 +1000,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { subchannel_->ThrottleKeepaliveTime(new_keepalive_time); } - void UpdateHealthCheckServiceName( - grpc_core::UniquePtr health_check_service_name) { + void UpdateHealthCheckServiceName(UniquePtr health_check_service_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: subchannel wrapper %p: updating health check service " @@ -1013,8 +1026,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { watcher_wrapper = replacement; subchannel_->WatchConnectivityState( replacement->last_seen_state(), - grpc_core::UniquePtr( - gpr_strdup(health_check_service_name.get())), + UniquePtr(gpr_strdup(health_check_service_name.get())), RefCountedPtr( replacement)); } @@ -1113,7 +1125,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { } ConnectivityStateChange state_change = PopConnectivityStateChange(); absl::optional keepalive_throttling = - state_change.status.GetPayload(grpc_core::kKeepaliveThrottlingKey); + state_change.status.GetPayload(kKeepaliveThrottlingKey); if (keepalive_throttling.has_value()) { int new_keepalive_time = -1; if (absl::SimpleAtoi(std::string(keepalive_throttling.value()), @@ -1178,7 +1190,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { ChannelData* chand_; Subchannel* subchannel_; - grpc_core::UniquePtr health_check_service_name_; + UniquePtr health_check_service_name_; // Maps from the address of the watcher passed to us by the LB policy // to the address of the WrapperWatcher that we passed to the underlying // subchannel. This is needed so that when the LB policy calls @@ -1367,10 +1379,11 @@ class ChannelData::ClientChannelControlHelper RefCountedPtr CreateSubchannel( ServerAddress address, const grpc_channel_args& args) override { + if (chand_->resolver_ == nullptr) return nullptr; // Shutting down. // Determine health check service name. bool inhibit_health_checking = grpc_channel_arg_get_bool( grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); - grpc_core::UniquePtr health_check_service_name; + UniquePtr health_check_service_name; if (!inhibit_health_checking) { health_check_service_name.reset( gpr_strdup(chand_->health_check_service_name_.get())); @@ -1410,6 +1423,7 @@ class ChannelData::ClientChannelControlHelper void UpdateState( grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) override { + if (chand_->resolver_ == nullptr) return; // Shutting down. grpc_error* disconnect_error = chand_->disconnect_error(); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { const char* extra = disconnect_error == GRPC_ERROR_NONE @@ -1426,11 +1440,17 @@ class ChannelData::ClientChannelControlHelper } } - // No-op -- we should never get this from ResolvingLoadBalancingPolicy. - void RequestReresolution() override {} + void RequestReresolution() override { + if (chand_->resolver_ == nullptr) return; // Shutting down. + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_); + } + chand_->resolver_->RequestReresolutionLocked(); + } void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { + if (chand_->resolver_ == nullptr) return; // Shutting down. if (chand_->channelz_node_ != nullptr) { chand_->channelz_node_->AddTraceEvent( ConvertSeverityEnum(severity), @@ -1449,139 +1469,6 @@ class ChannelData::ClientChannelControlHelper ChannelData* chand_; }; -// -// ChannelData::ChannelConfigHelper -// - -ChannelData::ChannelConfigHelper::ChooseServiceConfigResult -ChannelData::ChannelConfigHelper::ChooseServiceConfig( - const Resolver::Result& result) { - ChooseServiceConfigResult service_config_result; - RefCountedPtr service_config; - RefCountedPtr config_selector; - if (result.service_config_error != GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s", - chand_, grpc_error_string(result.service_config_error)); - } - // If the service config was invalid, then fallback to the - // previously returned service config. - if (chand_->saved_service_config_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: resolver returned invalid service config. " - "Continuing to use previous service config.", - chand_); - } - service_config = chand_->saved_service_config_; - config_selector = chand_->saved_config_selector_; - } else { - // No previously returned config, so put the channel into - // TRANSIENT_FAILURE. - service_config_result.no_valid_service_config = true; - return service_config_result; - } - } else if (result.service_config == nullptr) { - // Resolver did not return any service config. - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: resolver returned no service config. Using default " - "service config for channel.", - chand_); - } - service_config = chand_->default_service_config_; - } else { - // Use ServiceConfig and ConfigSelector returned by resolver. - service_config = result.service_config; - config_selector = ConfigSelector::GetFromChannelArgs(*result.args); - } - GPR_ASSERT(service_config != nullptr); - // Extract global config for client channel. - const internal::ClientChannelGlobalParsedConfig* parsed_service_config = - static_cast( - service_config->GetGlobalParsedConfig( - internal::ClientChannelServiceConfigParser::ParserIndex())); - // Find LB policy config. - ChooseLbPolicy(result, parsed_service_config, - &service_config_result.lb_policy_config); - // Check if the ServiceConfig has changed. - const bool service_config_changed = - chand_->saved_service_config_ == nullptr || - service_config->json_string() != - chand_->saved_service_config_->json_string(); - // Check if the ConfigSelector has changed. - const bool config_selector_changed = !ConfigSelector::Equals( - chand_->saved_config_selector_.get(), config_selector.get()); - // Indicate a change if either the ServiceConfig or ConfigSelector have - // changed. - service_config_result.service_config_changed = - service_config_changed || config_selector_changed; - // If it has, apply the global parameters now. - if (service_config_result.service_config_changed) { - chand_->UpdateServiceConfigInControlPlaneLocked( - std::move(service_config), std::move(config_selector), - parsed_service_config, service_config_result.lb_policy_config->name()); - } - // Return results. - return service_config_result; -} - -void ChannelData::ChannelConfigHelper::StartUsingServiceConfigForCalls() { - chand_->UpdateServiceConfigInDataPlaneLocked(); -} - -void ChannelData::ChannelConfigHelper::ResolverTransientFailure( - grpc_error* error) { - MutexLock lock(&chand_->data_plane_mu_); - GRPC_ERROR_UNREF(chand_->resolver_transient_failure_error_); - chand_->resolver_transient_failure_error_ = error; -} - -void ChannelData::ChannelConfigHelper::ChooseLbPolicy( - const Resolver::Result& resolver_result, - const internal::ClientChannelGlobalParsedConfig* parsed_service_config, - RefCountedPtr* lb_policy_config) { - // Prefer the LB policy config found in the service config. - if (parsed_service_config->parsed_lb_config() != nullptr) { - *lb_policy_config = parsed_service_config->parsed_lb_config(); - return; - } - // Try the deprecated LB policy name from the service config. - // If not, try the setting from channel args. - const char* policy_name = nullptr; - if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) { - policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str(); - } else { - const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME); - policy_name = grpc_channel_arg_get_string(channel_arg); - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (policy_name == nullptr) policy_name = "pick_first"; - // Now that we have the policy name, construct an empty config for it. - Json config_json = Json::Array{Json::Object{ - {policy_name, Json::Object{}}, - }}; - grpc_error* parse_error = GRPC_ERROR_NONE; - *lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - config_json, &parse_error); - // The policy name came from one of three places: - // - The deprecated loadBalancingPolicy field in the service config, - // in which case the code in ClientChannelServiceConfigParser - // already verified that the policy does not require a config. - // - One of the hard-coded values here, all of which are known to not - // require a config. - // - A channel arg, in which case the application did something that - // is a misuse of our API. - // In the first two cases, these assertions will always be true. In - // the last case, this is probably fine for now. - // TODO(roth): If the last case becomes a problem, add better error - // handling here. - GPR_ASSERT(*lb_policy_config != nullptr); - GPR_ASSERT(parse_error == GRPC_ERROR_NONE); -} - // // ChannelData implementation // @@ -1640,11 +1527,10 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) client_channel_factory_( ClientChannelFactory::GetFromChannelArgs(args->channel_args)), channelz_node_(GetChannelzNode(args->channel_args)), - channel_config_helper_(this), work_serializer_(std::make_shared()), interested_parties_(grpc_pollset_set_create()), - subchannel_pool_(GetSubchannelPool(args->channel_args)), state_tracker_("client_channel", GRPC_CHANNEL_IDLE), + subchannel_pool_(GetSubchannelPool(args->channel_args)), disconnect_error_(GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p", @@ -1715,7 +1601,7 @@ ChannelData::~ChannelData() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: destroying channel", this); } - DestroyResolvingLoadBalancingPolicyLocked(); + DestroyResolverAndLbPolicyLocked(); grpc_channel_args_destroy(channel_args_); GRPC_ERROR_UNREF(resolver_transient_failure_error_); // Stop backup polling. @@ -1725,10 +1611,247 @@ ChannelData::~ChannelData() { gpr_mu_destroy(&info_mu_); } +RefCountedPtr ChooseLbPolicy( + const Resolver::Result& resolver_result, + const internal::ClientChannelGlobalParsedConfig* parsed_service_config) { + // Prefer the LB policy config found in the service config. + if (parsed_service_config->parsed_lb_config() != nullptr) { + return parsed_service_config->parsed_lb_config(); + } + // Try the deprecated LB policy name from the service config. + // If not, try the setting from channel args. + const char* policy_name = nullptr; + if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) { + policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str(); + } else { + const grpc_arg* channel_arg = + grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME); + policy_name = grpc_channel_arg_get_string(channel_arg); + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (policy_name == nullptr) policy_name = "pick_first"; + // Now that we have the policy name, construct an empty config for it. + Json config_json = Json::Array{Json::Object{ + {policy_name, Json::Object{}}, + }}; + grpc_error* parse_error = GRPC_ERROR_NONE; + auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + config_json, &parse_error); + // The policy name came from one of three places: + // - The deprecated loadBalancingPolicy field in the service config, + // in which case the code in ClientChannelServiceConfigParser + // already verified that the policy does not require a config. + // - One of the hard-coded values here, all of which are known to not + // require a config. + // - A channel arg, in which case the application did something that + // is a misuse of our API. + // In the first two cases, these assertions will always be true. In + // the last case, this is probably fine for now. + // TODO(roth): If the last case becomes a problem, add better error + // handling here. + GPR_ASSERT(lb_policy_config != nullptr); + GPR_ASSERT(parse_error == GRPC_ERROR_NONE); + return lb_policy_config; +} + +void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) { + // Handle race conditions. + if (resolver_ == nullptr) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: got resolver result", this); + } + // We only want to trace the address resolution in the follow cases: + // (a) Address resolution resulted in service config change. + // (b) Address resolution that causes number of backends to go from + // zero to non-zero. + // (c) Address resolution that causes number of backends to go from + // non-zero to zero. + // (d) Address resolution that causes a new LB policy to be created. + // + // We track a list of strings to eventually be concatenated and traced. + absl::InlinedVector trace_strings; + if (result.addresses.empty() && previous_resolution_contained_addresses_) { + trace_strings.push_back("Address list became empty"); + } else if (!result.addresses.empty() && + !previous_resolution_contained_addresses_) { + trace_strings.push_back("Address list became non-empty"); + } + previous_resolution_contained_addresses_ = !result.addresses.empty(); + // The result of grpc_error_string() is owned by the error itself. + // We're storing that string in trace_strings, so we need to make sure + // that the error lives until we're done with the string. + grpc_error* service_config_error = + GRPC_ERROR_REF(result.service_config_error); + if (service_config_error != GRPC_ERROR_NONE) { + trace_strings.push_back(grpc_error_string(service_config_error)); + } + // Choose the service config. + RefCountedPtr service_config; + RefCountedPtr config_selector; + if (service_config_error != GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s", + this, grpc_error_string(service_config_error)); + } + // If the service config was invalid, then fallback to the + // previously returned service config. + if (saved_service_config_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: resolver returned invalid service config. " + "Continuing to use previous service config.", + this); + } + service_config = saved_service_config_; + config_selector = saved_config_selector_; + } else { + // We received an invalid service config and we don't have a + // previous service config to fall back to. Put the channel into + // TRANSIENT_FAILURE. + OnResolverError(GRPC_ERROR_REF(service_config_error)); + trace_strings.push_back("no valid service config"); + } + } else if (result.service_config == nullptr) { + // Resolver did not return any service config. + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: resolver returned no service config. Using default " + "service config for channel.", + this); + } + service_config = default_service_config_; + } else { + // Use ServiceConfig and ConfigSelector returned by resolver. + service_config = result.service_config; + config_selector = ConfigSelector::GetFromChannelArgs(*result.args); + } + if (service_config != nullptr) { + // Extract global config for client channel. + const internal::ClientChannelGlobalParsedConfig* parsed_service_config = + static_cast( + service_config->GetGlobalParsedConfig( + internal::ClientChannelServiceConfigParser::ParserIndex())); + // Choose LB policy config. + RefCountedPtr lb_policy_config = + ChooseLbPolicy(result, parsed_service_config); + // Check if the ServiceConfig has changed. + const bool service_config_changed = + saved_service_config_ == nullptr || + service_config->json_string() != saved_service_config_->json_string(); + // Check if the ConfigSelector has changed. + const bool config_selector_changed = !ConfigSelector::Equals( + saved_config_selector_.get(), config_selector.get()); + // If either has changed, apply the global parameters now. + if (service_config_changed || config_selector_changed) { + // Update service config in control plane. + UpdateServiceConfigInControlPlaneLocked( + std::move(service_config), std::move(config_selector), + parsed_service_config, lb_policy_config->name()); + } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: service config not changed", this); + } + // Create or update LB policy, as needed. + CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), + std::move(result)); + if (service_config_changed || config_selector_changed) { + // Start using new service config for calls. + // This needs to happen after the LB policy has been updated, since + // the ConfigSelector may need the LB policy to know about new + // destinations before it can send RPCs to those destinations. + UpdateServiceConfigInDataPlaneLocked(); + // TODO(ncteisen): might be worth somehow including a snippet of the + // config in the trace, at the risk of bloating the trace logs. + trace_strings.push_back("Service config changed"); + } + } + // Add channel trace event. + if (!trace_strings.empty()) { + std::string message = + absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", ")); + if (channelz_node_ != nullptr) { + channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info, + grpc_slice_from_cpp_string(message)); + } + } + GRPC_ERROR_UNREF(service_config_error); +} + +void ChannelData::OnResolverError(grpc_error* error) { + if (resolver_ == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this, + grpc_error_string(error)); + } + // If we already have an LB policy from a previous resolution + // result, then we continue to let it set the connectivity state. + // Otherwise, we go into TRANSIENT_FAILURE. + if (lb_policy_ == nullptr) { + grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Resolver transient failure", &error, 1); + UpdateStateAndPickerLocked( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error), + "resolver failure", + absl::make_unique( + GRPC_ERROR_REF(state_error)), + state_error); + } + GRPC_ERROR_UNREF(error); +} + +void ChannelData::CreateOrUpdateLbPolicyLocked( + RefCountedPtr lb_policy_config, + Resolver::Result result) { + // Construct update. + LoadBalancingPolicy::UpdateArgs update_args; + update_args.addresses = std::move(result.addresses); + update_args.config = std::move(lb_policy_config); + // Remove the config selector from channel args so that we're not holding + // unnecessary refs that cause it to be destroyed somewhere other than in the + // WorkSerializer. + const char* arg_name = GRPC_ARG_CONFIG_SELECTOR; + update_args.args = + grpc_channel_args_copy_and_remove(result.args, &arg_name, 1); + // Create policy if needed. + if (lb_policy_ == nullptr) { + lb_policy_ = CreateLbPolicyLocked(*update_args.args); + } + // Update the policy. + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this, + lb_policy_.get()); + } + lb_policy_->UpdateLocked(std::move(update_args)); +} + +// Creates a new LB policy. +OrphanablePtr ChannelData::CreateLbPolicyLocked( + const grpc_channel_args& args) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.work_serializer = work_serializer_; + lb_policy_args.channel_control_helper = + absl::make_unique(this); + lb_policy_args.args = &args; + OrphanablePtr lb_policy = + MakeOrphanable(std::move(lb_policy_args), + &grpc_client_channel_routing_trace); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this, + lb_policy.get()); + } + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties_); + return lb_policy; +} + void ChannelData::UpdateStateAndPickerLocked( grpc_connectivity_state state, const absl::Status& status, const char* reason, - std::unique_ptr picker) { + std::unique_ptr picker, + grpc_error* resolver_transient_failure_error) { // Clean the control plane when entering IDLE. if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { health_check_service_name_.reset(); @@ -1762,6 +1885,9 @@ void ChannelData::UpdateStateAndPickerLocked( RefCountedPtr config_selector_to_unref; { MutexLock lock(&data_plane_mu_); + // Update resolver transient failure. + GRPC_ERROR_UNREF(resolver_transient_failure_error_); + resolver_transient_failure_error_ = resolver_transient_failure_error; // Handle subchannel updates. for (auto& p : pending_subchannel_updates_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { @@ -1806,7 +1932,7 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked( RefCountedPtr config_selector, const internal::ClientChannelGlobalParsedConfig* parsed_service_config, const char* lb_policy_name) { - grpc_core::UniquePtr service_config_json( + UniquePtr service_config_json( gpr_strdup(service_config->json_string().c_str())); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, @@ -1826,12 +1952,11 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked( // Update health check service name used by existing subchannel wrappers. for (auto* subchannel_wrapper : subchannel_wrappers_) { subchannel_wrapper->UpdateHealthCheckServiceName( - grpc_core::UniquePtr( - gpr_strdup(health_check_service_name_.get()))); + UniquePtr(gpr_strdup(health_check_service_name_.get()))); } } // Swap out the data used by GetChannelInfo(). - grpc_core::UniquePtr lb_policy_name_owned(gpr_strdup(lb_policy_name)); + UniquePtr lb_policy_name_owned(gpr_strdup(lb_policy_name)); { MutexLock lock(&info_mu_); info_lb_policy_name_ = std::move(lb_policy_name_owned); @@ -1899,30 +2024,41 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() { // of scope. } -void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { - // Instantiate resolving LB policy. - LoadBalancingPolicy::Args lb_args; - lb_args.work_serializer = work_serializer_; - lb_args.channel_control_helper = - absl::make_unique(this); - lb_args.args = channel_args_; - grpc_core::UniquePtr target_uri(gpr_strdup(target_uri_.get())); - resolving_lb_policy_.reset(new ResolvingLoadBalancingPolicy( - std::move(lb_args), &grpc_client_channel_routing_trace, - std::move(target_uri), &channel_config_helper_)); - grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(), - interested_parties_); +void ChannelData::CreateResolverLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: starting name resolution", this); + } + resolver_ = ResolverRegistry::CreateResolver( + target_uri_.get(), channel_args_, interested_parties_, work_serializer_, + absl::make_unique(this)); + // Since the validity of the args was checked when the channel was created, + // CreateResolver() must return a non-null result. + GPR_ASSERT(resolver_ != nullptr); + UpdateStateAndPickerLocked( + GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving", + absl::make_unique(nullptr)); + resolver_->StartLocked(); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this, - resolving_lb_policy_.get()); + gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get()); } } -void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() { - if (resolving_lb_policy_ != nullptr) { - grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(), - interested_parties_); - resolving_lb_policy_.reset(); +void ChannelData::DestroyResolverAndLbPolicyLocked() { + if (resolver_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this, + resolver_.get()); + } + resolver_.reset(); + if (lb_policy_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } } } @@ -1972,8 +2108,8 @@ void ChannelData::StartTransportOpLocked(grpc_transport_op* op) { } // Reset backoff. if (op->reset_connect_backoff) { - if (resolving_lb_policy_ != nullptr) { - resolving_lb_policy_->ResetBackoffLocked(); + if (lb_policy_ != nullptr) { + lb_policy_->ResetBackoffLocked(); } } // Disconnect or enter IDLE. @@ -1982,7 +2118,7 @@ void ChannelData::StartTransportOpLocked(grpc_transport_op* op) { gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this, grpc_error_string(op->disconnect_with_error)); } - DestroyResolvingLoadBalancingPolicyLocked(); + DestroyResolverAndLbPolicyLocked(); intptr_t value; if (grpc_error_get_int(op->disconnect_with_error, GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) && @@ -2071,10 +2207,10 @@ ChannelData::GetConnectedSubchannelInDataPlane( } void ChannelData::TryToConnectLocked() { - if (resolving_lb_policy_ != nullptr) { - resolving_lb_policy_->ExitIdleLocked(); - } else { - CreateResolvingLoadBalancingPolicyLocked(); + if (lb_policy_ != nullptr) { + lb_policy_->ExitIdleLocked(); + } else if (resolver_ == nullptr) { + CreateResolverLocked(); } GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect"); } diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 53c61eecd0c..4ef8cbb8b23 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -105,7 +105,7 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( // 2. We are currently running in the data plane mutex, but we // need to bounce into the control plane work_serializer to call // ExitIdleLocked(). - if (!exit_idle_called_) { + if (!exit_idle_called_ && parent_ != nullptr) { exit_idle_called_ = true; auto* parent = parent_->Ref().release(); // ref held by lambda. ExecCtx::Run(DEBUG_LOCATION, diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc deleted file mode 100644 index 889c20ca85c..00000000000 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ /dev/null @@ -1,355 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/ext/filters/client_channel/resolving_lb_policy.h" - -#include -#include -#include -#include -#include - -#include "absl/strings/str_cat.h" -#include "absl/strings/str_join.h" - -#include -#include -#include -#include - -#include "src/core/ext/filters/client_channel/backup_poller.h" -#include "src/core/ext/filters/client_channel/http_connect_handshaker.h" -#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" -#include "src/core/ext/filters/client_channel/lb_policy_registry.h" -#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" -#include "src/core/ext/filters/client_channel/resolver_registry.h" -#include "src/core/ext/filters/client_channel/retry_throttle.h" -#include "src/core/ext/filters/client_channel/server_address.h" -#include "src/core/ext/filters/client_channel/service_config.h" -#include "src/core/ext/filters/client_channel/subchannel.h" -#include "src/core/ext/filters/deadline/deadline_filter.h" -#include "src/core/lib/backoff/backoff.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/connected_channel.h" -#include "src/core/lib/channel/status_util.h" -#include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/surface/channel.h" -#include "src/core/lib/transport/connectivity_state.h" -#include "src/core/lib/transport/error_utils.h" -#include "src/core/lib/transport/metadata.h" -#include "src/core/lib/transport/metadata_batch.h" -#include "src/core/lib/transport/static_metadata.h" -#include "src/core/lib/transport/status_metadata.h" - -namespace grpc_core { - -// -// ResolvingLoadBalancingPolicy::ResolverResultHandler -// - -class ResolvingLoadBalancingPolicy::ResolverResultHandler - : public Resolver::ResultHandler { - public: - explicit ResolverResultHandler( - RefCountedPtr parent) - : parent_(std::move(parent)) {} - - ~ResolverResultHandler() override { - if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { - gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete", - parent_.get()); - } - } - - void ReturnResult(Resolver::Result result) override { - parent_->OnResolverResultChangedLocked(std::move(result)); - } - - void ReturnError(grpc_error* error) override { - parent_->OnResolverError(error); - } - - private: - RefCountedPtr parent_; -}; - -// -// ResolvingLoadBalancingPolicy::ResolvingControlHelper -// - -class ResolvingLoadBalancingPolicy::ResolvingControlHelper - : public LoadBalancingPolicy::ChannelControlHelper { - public: - explicit ResolvingControlHelper( - RefCountedPtr parent) - : parent_(std::move(parent)) {} - - RefCountedPtr CreateSubchannel( - ServerAddress address, const grpc_channel_args& args) override { - if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. - return parent_->channel_control_helper()->CreateSubchannel( - std::move(address), args); - } - - void UpdateState(grpc_connectivity_state state, const absl::Status& status, - std::unique_ptr picker) override { - if (parent_->resolver_ == nullptr) return; // Shutting down. - parent_->channel_control_helper()->UpdateState(state, status, - std::move(picker)); - } - - void RequestReresolution() override { - if (parent_->resolver_ == nullptr) return; // Shutting down. - if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { - gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving", - parent_.get()); - } - parent_->resolver_->RequestReresolutionLocked(); - } - - void AddTraceEvent(TraceSeverity severity, - absl::string_view message) override { - if (parent_->resolver_ == nullptr) return; // Shutting down. - parent_->channel_control_helper()->AddTraceEvent(severity, message); - } - - private: - RefCountedPtr parent_; -}; - -// -// ResolvingLoadBalancingPolicy -// - -ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( - Args args, TraceFlag* tracer, grpc_core::UniquePtr target_uri, - ChannelConfigHelper* helper) - : LoadBalancingPolicy(std::move(args)), - tracer_(tracer), - target_uri_(std::move(target_uri)), - helper_(helper) { - GPR_ASSERT(helper_ != nullptr); - resolver_ = ResolverRegistry::CreateResolver( - target_uri_.get(), args.args, interested_parties(), work_serializer(), - absl::make_unique(Ref())); - // Since the validity of args has been checked when create the channel, - // CreateResolver() must return a non-null result. - GPR_ASSERT(resolver_ != nullptr); - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this); - } - channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(), - absl::make_unique(Ref())); - resolver_->StartLocked(); -} - -ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() { - GPR_ASSERT(resolver_ == nullptr); - GPR_ASSERT(lb_policy_ == nullptr); -} - -void ResolvingLoadBalancingPolicy::ShutdownLocked() { - if (resolver_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this, - resolver_.get()); - } - resolver_.reset(); - if (lb_policy_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, - lb_policy_.get()); - } - grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), - interested_parties()); - lb_policy_.reset(); - } - } -} - -void ResolvingLoadBalancingPolicy::ExitIdleLocked() { - if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked(); -} - -void ResolvingLoadBalancingPolicy::ResetBackoffLocked() { - if (resolver_ != nullptr) { - resolver_->ResetBackoffLocked(); - resolver_->RequestReresolutionLocked(); - } - if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked(); -} - -void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { - if (resolver_ == nullptr) { - GRPC_ERROR_UNREF(error); - return; - } - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this, - grpc_error_string(error)); - } - // If we already have an LB policy from a previous resolution - // result, then we continue to let it set the connectivity state. - // Otherwise, we go into TRANSIENT_FAILURE. - if (lb_policy_ == nullptr) { - grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver transient failure", &error, 1); - helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error)); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error), - absl::make_unique(state_error)); - } - GRPC_ERROR_UNREF(error); -} - -void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( - RefCountedPtr lb_policy_config, - Resolver::Result result) { - // Construct update. - UpdateArgs update_args; - update_args.addresses = std::move(result.addresses); - update_args.config = std::move(lb_policy_config); - // Remove the config selector from channel args so that we're not holding - // unnecessary refs that cause it to be destroyed somewhere other than in the - // WorkSerializer. - const char* arg_name = GRPC_ARG_CONFIG_SELECTOR; - update_args.args = - grpc_channel_args_copy_and_remove(result.args, &arg_name, 1); - // Create policy if needed. - if (lb_policy_ == nullptr) { - lb_policy_ = CreateLbPolicyLocked(*update_args.args); - } - // Update the policy. - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this, - lb_policy_.get()); - } - lb_policy_->UpdateLocked(std::move(update_args)); -} - -// Creates a new LB policy. -OrphanablePtr -ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( - const grpc_channel_args& args) { - LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.work_serializer = work_serializer(); - lb_policy_args.channel_control_helper = - absl::make_unique(Ref()); - lb_policy_args.args = &args; - OrphanablePtr lb_policy = - MakeOrphanable(std::move(lb_policy_args), tracer_); - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this, - lb_policy.get()); - } - grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), - interested_parties()); - return lb_policy; -} - -void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked( - bool resolution_contains_addresses, TraceStringVector* trace_strings) { - if (!resolution_contains_addresses && - previous_resolution_contained_addresses_) { - trace_strings->push_back("Address list became empty"); - } else if (resolution_contains_addresses && - !previous_resolution_contained_addresses_) { - trace_strings->push_back("Address list became non-empty"); - } - previous_resolution_contained_addresses_ = resolution_contains_addresses; -} - -void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked( - const TraceStringVector& trace_strings) const { - if (!trace_strings.empty()) { - std::string message = - absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", ")); - channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO, - message); - } -} - -void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( - Resolver::Result result) { - // Handle race conditions. - if (resolver_ == nullptr) return; - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this); - } - // We only want to trace the address resolution in the follow cases: - // (a) Address resolution resulted in service config change. - // (b) Address resolution that causes number of backends to go from - // zero to non-zero. - // (c) Address resolution that causes number of backends to go from - // non-zero to zero. - // (d) Address resolution that causes a new LB policy to be created. - // - // We track a list of strings to eventually be concatenated and traced. - TraceStringVector trace_strings; - MaybeAddTraceMessagesForAddressChangesLocked(!result.addresses.empty(), - &trace_strings); - // The result of grpc_error_string() is owned by the error itself. - // We're storing that string in trace_strings, so we need to make sure - // that the error lives until we're done with the string. - grpc_error* service_config_error = - GRPC_ERROR_REF(result.service_config_error); - if (service_config_error != GRPC_ERROR_NONE) { - trace_strings.push_back(grpc_error_string(service_config_error)); - } - // Choose the service config. - ChannelConfigHelper::ChooseServiceConfigResult service_config_result; - if (helper_ != nullptr) { - service_config_result = helper_->ChooseServiceConfig(result); - } else { - service_config_result.lb_policy_config = child_lb_config_; - } - if (service_config_result.no_valid_service_config) { - // We received an invalid service config and we don't have a - // previous service config to fall back to. - OnResolverError(GRPC_ERROR_REF(service_config_error)); - trace_strings.push_back("no valid service config"); - } else { - // Create or update LB policy, as needed. - CreateOrUpdateLbPolicyLocked( - std::move(service_config_result.lb_policy_config), std::move(result)); - if (service_config_result.service_config_changed) { - // Tell channel to start using new service config for calls. - // This needs to happen after the LB policy has been updated, since - // the ConfigSelector may need the LB policy to know about new - // destinations before it can send RPCs to those destinations. - if (helper_ != nullptr) helper_->StartUsingServiceConfigForCalls(); - // TODO(ncteisen): might be worth somehow including a snippet of the - // config in the trace, at the risk of bloating the trace logs. - trace_strings.push_back("Service config changed"); - } - } - // Add channel trace event. - ConcatenateAndAddChannelTraceLocked(trace_strings); - GRPC_ERROR_UNREF(service_config_error); -} - -} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h deleted file mode 100644 index dd592be0faa..00000000000 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVING_LB_POLICY_H -#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVING_LB_POLICY_H - -#include - -#include "absl/container/inlined_vector.h" - -#include "src/core/ext/filters/client_channel/config_selector.h" -#include "src/core/ext/filters/client_channel/lb_policy.h" -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" -#include "src/core/ext/filters/client_channel/resolver.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/iomgr/call_combiner.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/transport/connectivity_state.h" -#include "src/core/lib/transport/metadata_batch.h" - -namespace grpc_core { - -// An LB policy that wraps a resolver and a child LB policy to make use -// of the addresses returned by the resolver. -// -// When used in the client_channel code, the resolver will attempt to -// fetch the service config, and the child LB policy name and config -// will be determined based on the service config. -// -// When used in an LB policy implementation that needs to do another -// round of resolution before creating a child policy, the resolver does -// not fetch the service config, and the caller must pre-determine the -// child LB policy and config to use. -class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { - public: - class ChannelConfigHelper { - public: - struct ChooseServiceConfigResult { - // Set to true if the service config has changed since the last result. - bool service_config_changed = false; - // Set to true if we don't have a valid service config to use. - // This tells the ResolvingLoadBalancingPolicy to put the channel - // into TRANSIENT_FAILURE. - bool no_valid_service_config = false; - // The LB policy config to use. - RefCountedPtr lb_policy_config; - }; - - virtual ~ChannelConfigHelper() = default; - - // Chooses the service config for the channel. - virtual ChooseServiceConfigResult ChooseServiceConfig( - const Resolver::Result& result) = 0; - - // Starts using the service config for calls. - virtual void StartUsingServiceConfigForCalls() = 0; - - // Indicates a resolver transient failure. - virtual void ResolverTransientFailure(grpc_error* error) = 0; - }; - - ResolvingLoadBalancingPolicy(Args args, TraceFlag* tracer, - grpc_core::UniquePtr target_uri, - ChannelConfigHelper* helper); - - const char* name() const override { return "resolving_lb"; } - - // No-op -- should never get updates from the channel. - // TODO(roth): Need to support updating child LB policy's config for xds - // use case. - void UpdateLocked(UpdateArgs /*args*/) override {} - - void ExitIdleLocked() override; - - void ResetBackoffLocked() override; - - private: - using TraceStringVector = absl::InlinedVector; - - class ResolverResultHandler; - class ResolvingControlHelper; - - ~ResolvingLoadBalancingPolicy() override; - - void ShutdownLocked() override; - - void OnResolverError(grpc_error* error); - void CreateOrUpdateLbPolicyLocked( - RefCountedPtr lb_policy_config, - Resolver::Result result); - OrphanablePtr CreateLbPolicyLocked( - const grpc_channel_args& args); - void MaybeAddTraceMessagesForAddressChangesLocked( - bool resolution_contains_addresses, TraceStringVector* trace_strings); - void ConcatenateAndAddChannelTraceLocked( - const TraceStringVector& trace_strings) const; - void OnResolverResultChangedLocked(Resolver::Result result); - - // Passed in from caller at construction time. - TraceFlag* tracer_; - grpc_core::UniquePtr target_uri_; - ChannelConfigHelper* helper_; - - // Resolver and associated state. - OrphanablePtr resolver_; - bool previous_resolution_contained_addresses_ = false; - - // Determined by resolver results. - grpc_core::UniquePtr child_policy_name_; - RefCountedPtr child_lb_config_; - - // Child LB policy. - OrphanablePtr lb_policy_; -}; - -} // namespace grpc_core - -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVING_LB_POLICY_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 13422cf5f60..d9ec88e4df2 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -66,7 +66,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', - 'src/core/ext/filters/client_channel/resolving_lb_policy.cc', 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/server_address.cc', 'src/core/ext/filters/client_channel/service_config.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index ebb64f0ce66..6446254f6cf 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1125,8 +1125,6 @@ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_registry.h \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.h \ -src/core/ext/filters/client_channel/resolving_lb_policy.cc \ -src/core/ext/filters/client_channel/resolving_lb_policy.h \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/retry_throttle.h \ src/core/ext/filters/client_channel/server_address.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index d5135c5a7a8..97aaed7a40d 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -955,8 +955,6 @@ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_registry.h \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.h \ -src/core/ext/filters/client_channel/resolving_lb_policy.cc \ -src/core/ext/filters/client_channel/resolving_lb_policy.h \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/retry_throttle.h \ src/core/ext/filters/client_channel/server_address.cc \