From 133918ec70afb9596733a2f5758127d3e4592f7a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 18 Jun 2020 12:45:02 -0700 Subject: [PATCH] Add ConfigSelector API. --- BUILD | 2 + BUILD.gn | 2 + CMakeLists.txt | 2 + Makefile | 2 + build_autogenerated.yaml | 4 + config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 2 + package.xml | 2 + .../filters/client_channel/client_channel.cc | 588 +++++++++++------- .../filters/client_channel/config_selector.cc | 62 ++ .../filters/client_channel/config_selector.h | 91 +++ .../client_channel/resolving_lb_policy.cc | 61 +- .../client_channel/resolving_lb_policy.h | 57 +- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 20 files changed, 603 insertions(+), 286 deletions(-) create mode 100644 src/core/ext/filters/client_channel/config_selector.cc create mode 100644 src/core/ext/filters/client_channel/config_selector.h diff --git a/BUILD b/BUILD index 99750d060cc..48c1c1cffb7 100644 --- a/BUILD +++ b/BUILD @@ -1028,6 +1028,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/client_channel_channelz.cc", "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_plugin.cc", + "src/core/ext/filters/client_channel/config_selector.cc", "src/core/ext/filters/client_channel/global_subchannel_pool.cc", "src/core/ext/filters/client_channel/health/health_check_client.cc", "src/core/ext/filters/client_channel/http_connect_handshaker.cc", @@ -1055,6 +1056,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/client_channel.h", "src/core/ext/filters/client_channel/client_channel_channelz.h", "src/core/ext/filters/client_channel/client_channel_factory.h", + "src/core/ext/filters/client_channel/config_selector.h", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/global_subchannel_pool.h", "src/core/ext/filters/client_channel/health/health_check_client.h", diff --git a/BUILD.gn b/BUILD.gn index 378250dd2c0..2d9c2593a5a 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -211,6 +211,8 @@ config("grpc_config") { "src/core/ext/filters/client_channel/client_channel_factory.cc", "src/core/ext/filters/client_channel/client_channel_factory.h", "src/core/ext/filters/client_channel/client_channel_plugin.cc", + "src/core/ext/filters/client_channel/config_selector.cc", + "src/core/ext/filters/client_channel/config_selector.h", "src/core/ext/filters/client_channel/connector.h", "src/core/ext/filters/client_channel/global_subchannel_pool.cc", "src/core/ext/filters/client_channel/global_subchannel_pool.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4e0c695a5bc..3666c7cdcb1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1328,6 +1328,7 @@ add_library(grpc src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/config_selector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -1999,6 +2000,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/client_channel_channelz.cc src/core/ext/filters/client_channel/client_channel_factory.cc src/core/ext/filters/client_channel/client_channel_plugin.cc + src/core/ext/filters/client_channel/config_selector.cc src/core/ext/filters/client_channel/global_subchannel_pool.cc src/core/ext/filters/client_channel/health/health_check_client.cc src/core/ext/filters/client_channel/http_connect_handshaker.cc diff --git a/Makefile b/Makefile index 83c7600a698..a57b45bfd02 100644 --- a/Makefile +++ b/Makefile @@ -3630,6 +3630,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/config_selector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ @@ -4275,6 +4276,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/config_selector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 5f09c7b3e73..e71ab653e9e 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -377,6 +377,7 @@ libs: - src/core/ext/filters/client_channel/client_channel.h - src/core/ext/filters/client_channel/client_channel_channelz.h - src/core/ext/filters/client_channel/client_channel_factory.h + - src/core/ext/filters/client_channel/config_selector.h - src/core/ext/filters/client_channel/connector.h - src/core/ext/filters/client_channel/global_subchannel_pool.h - src/core/ext/filters/client_channel/health/health_check_client.h @@ -746,6 +747,7 @@ libs: - src/core/ext/filters/client_channel/client_channel_channelz.cc - src/core/ext/filters/client_channel/client_channel_factory.cc - src/core/ext/filters/client_channel/client_channel_plugin.cc + - src/core/ext/filters/client_channel/config_selector.cc - src/core/ext/filters/client_channel/global_subchannel_pool.cc - src/core/ext/filters/client_channel/health/health_check_client.cc - src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -1300,6 +1302,7 @@ libs: - src/core/ext/filters/client_channel/client_channel.h - src/core/ext/filters/client_channel/client_channel_channelz.h - src/core/ext/filters/client_channel/client_channel_factory.h + - src/core/ext/filters/client_channel/config_selector.h - src/core/ext/filters/client_channel/connector.h - src/core/ext/filters/client_channel/global_subchannel_pool.h - src/core/ext/filters/client_channel/health/health_check_client.h @@ -1605,6 +1608,7 @@ libs: - src/core/ext/filters/client_channel/client_channel_channelz.cc - src/core/ext/filters/client_channel/client_channel_factory.cc - src/core/ext/filters/client_channel/client_channel_plugin.cc + - src/core/ext/filters/client_channel/config_selector.cc - src/core/ext/filters/client_channel/global_subchannel_pool.cc - src/core/ext/filters/client_channel/health/health_check_client.cc - src/core/ext/filters/client_channel/http_connect_handshaker.cc diff --git a/config.m4 b/config.m4 index cd1019abcd9..5ee9f1c8d58 100644 --- a/config.m4 +++ b/config.m4 @@ -45,6 +45,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/client_channel_channelz.cc \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ + src/core/ext/filters/client_channel/config_selector.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/health/health_check_client.cc \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ diff --git a/config.w32 b/config.w32 index 8109f9103af..1c533a33302 100644 --- a/config.w32 +++ b/config.w32 @@ -14,6 +14,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\client_channel_channelz.cc " + "src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " + "src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " + + "src\\core\\ext\\filters\\client_channel\\config_selector.cc " + "src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " + "src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " + "src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 8851119c07f..f34c2cb79a7 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -227,6 +227,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/client_channel.h', 'src/core/ext/filters/client_channel/client_channel_channelz.h', 'src/core/ext/filters/client_channel/client_channel_factory.h', + 'src/core/ext/filters/client_channel/config_selector.h', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/global_subchannel_pool.h', 'src/core/ext/filters/client_channel/health/health_check_client.h', @@ -687,6 +688,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/client_channel.h', 'src/core/ext/filters/client_channel/client_channel_channelz.h', 'src/core/ext/filters/client_channel/client_channel_factory.h', + 'src/core/ext/filters/client_channel/config_selector.h', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/global_subchannel_pool.h', 'src/core/ext/filters/client_channel/health/health_check_client.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 695768422c2..aaf5609a9c3 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -195,6 +195,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_factory.h', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/config_selector.cc', + 'src/core/ext/filters/client_channel/config_selector.h', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.h', @@ -1051,6 +1053,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/client_channel.h', 'src/core/ext/filters/client_channel/client_channel_channelz.h', 'src/core/ext/filters/client_channel/client_channel_factory.h', + 'src/core/ext/filters/client_channel/config_selector.h', 'src/core/ext/filters/client_channel/connector.h', 'src/core/ext/filters/client_channel/global_subchannel_pool.h', 'src/core/ext/filters/client_channel/health/health_check_client.h', diff --git a/grpc.gemspec b/grpc.gemspec index af04b9d9de4..881f55a81c2 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -117,6 +117,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc ) s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h ) s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc ) + s.files += %w( src/core/ext/filters/client_channel/config_selector.cc ) + s.files += %w( src/core/ext/filters/client_channel/config_selector.h ) s.files += %w( src/core/ext/filters/client_channel/connector.h ) s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc ) s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h ) diff --git a/grpc.gyp b/grpc.gyp index 4022227f5e3..e8bed76e891 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -440,6 +440,7 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/config_selector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', @@ -947,6 +948,7 @@ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/config_selector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', diff --git a/package.xml b/package.xml index 0166a778789..262c66a6ac3 100644 --- a/package.xml +++ b/package.xml @@ -97,6 +97,8 @@ + + diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d8af25d8f2f..8533be0fe2b 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -40,6 +40,7 @@ #include "src/core/ext/filters/client_channel/backend_metric.h" #include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/global_subchannel_pool.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" @@ -149,12 +150,16 @@ class ChannelData { bool received_service_config_data() const { return received_service_config_data_; } + grpc_error* resolver_transient_failure_error() const { + return resolver_transient_failure_error_; + } RefCountedPtr retry_throttle_data() const { return retry_throttle_data_; } RefCountedPtr service_config() const { return service_config_; } + ConfigSelector* config_selector() const { return config_selector_.get(); } WorkSerializer* work_serializer() const { return work_serializer_.get(); } RefCountedPtr GetConnectedSubchannelInDataPlane( @@ -234,6 +239,29 @@ class ChannelData { Atomic done_{false}; }; + class ChannelConfigHelper + : public ResolvingLoadBalancingPolicy::ChannelConfigHelper { + public: + explicit ChannelConfigHelper(ChannelData* chand) : chand_(chand) {} + + ApplyServiceConfigResult ApplyServiceConfig( + const Resolver::Result& result) override; + + void ApplyConfigSelector( + bool service_config_changed, + RefCountedPtr config_selector) override; + + void ResolverTransientFailure(grpc_error* error) override; + + private: + static void ProcessLbPolicy( + const Resolver::Result& resolver_result, + const internal::ClientChannelGlobalParsedConfig* parsed_service_config, + RefCountedPtr* lb_policy_config); + + ChannelData* chand_; + }; + ChannelData(grpc_channel_element_args* args, grpc_error** error); ~ChannelData(); @@ -241,30 +269,20 @@ class ChannelData { grpc_connectivity_state state, const char* reason, std::unique_ptr picker); - void UpdateServiceConfigLocked( - RefCountedPtr retry_throttle_data, - RefCountedPtr service_config); + void UpdateServiceConfigInDataPlaneLocked( + bool service_config_changed, + RefCountedPtr config_selector); void CreateResolvingLoadBalancingPolicyLocked(); void DestroyResolvingLoadBalancingPolicyLocked(); - static bool ProcessResolverResultLocked( - void* arg, const Resolver::Result& result, - RefCountedPtr* lb_policy_config, - grpc_error** service_config_error, bool* no_valid_service_config); - grpc_error* DoPingLocked(grpc_transport_op* op); void StartTransportOpLocked(grpc_transport_op* op); void TryToConnectLocked(); - void ProcessLbPolicy( - const Resolver::Result& resolver_result, - const internal::ClientChannelGlobalParsedConfig* parsed_service_config, - RefCountedPtr* lb_policy_config); - // // Fields set at construction and never modified. // @@ -278,6 +296,7 @@ class ChannelData { grpc_core::UniquePtr server_name_; grpc_core::UniquePtr target_uri_; channelz::ChannelNode* channelz_node_; + ChannelConfigHelper channel_config_helper_; // // Fields used in the data plane. Guarded by data_plane_mu. @@ -286,9 +305,11 @@ class ChannelData { std::unique_ptr picker_; QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks. // Data from service config. + grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE; bool received_service_config_data_ = false; RefCountedPtr retry_throttle_data_; RefCountedPtr service_config_; + RefCountedPtr config_selector_; // // Fields used in the control plane. Guarded by work_serializer. @@ -300,6 +321,7 @@ class ChannelData { ConnectivityStateTracker state_tracker_; grpc_core::UniquePtr health_check_service_name_; RefCountedPtr saved_service_config_; + RefCountedPtr saved_config_selector_; bool received_first_resolver_result_ = false; // The number of SubchannelWrapper instances referencing a given Subchannel. std::map subchannel_refcount_map_; @@ -352,9 +374,6 @@ class CallData { RefCountedPtr subchannel_call() { return subchannel_call_; } - // Invoked by channel for queued picks once resolver results are available. - void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem); - // Invoked by channel for queued picks when the picker is updated. static void PickSubchannel(void* arg, grpc_error* error); @@ -742,13 +761,17 @@ class CallData { void CreateSubchannelCall(grpc_call_element* elem); // Invoked when a pick is completed, on both success or failure. static void PickDone(void* arg, grpc_error* error); - // Removes the call from the channel's list of queued picks. - void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem); - // Adds the call to the channel's list of queued picks. - void AddCallToQueuedPicksLocked(grpc_call_element* elem); + // Removes the call from the channel's list of queued picks if present. + void MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem); + // Adds the call to the channel's list of queued picks if not already present. + void MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem); // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. - void ApplyServiceConfigToCallLocked(grpc_call_element* elem); + // If an error is returned, the error indicates the status with which + // the call should be failed. + grpc_error* ApplyServiceConfigToCallLocked( + grpc_call_element* elem, grpc_metadata_batch* initial_metadata); + void MaybeInvokeConfigSelectorCommitCallback(); // State for handling deadlines. // The code in deadline_filter.c requires this to be the first field. @@ -769,6 +792,7 @@ class CallData { RefCountedPtr retry_throttle_data_; const ClientChannelMethodParsedConfig* method_params_ = nullptr; std::map call_attributes_; + std::function on_call_committed_; RefCountedPtr subchannel_call_; @@ -1335,6 +1359,180 @@ class ChannelData::ClientChannelControlHelper ChannelData* chand_; }; +// +// ChannelData::ChannelConfigHelper +// + +// Synchronous callback from ResolvingLoadBalancingPolicy to process a +// resolver result update. +ChannelData::ChannelConfigHelper::ApplyServiceConfigResult +ChannelData::ChannelConfigHelper::ApplyServiceConfig( + const Resolver::Result& result) { + ApplyServiceConfigResult service_config_result; + RefCountedPtr service_config; + // If resolver did not return a service config or returned an invalid service + // config, we need a fallback service config. + if (result.service_config_error != GRPC_ERROR_NONE) { + // If the service config was invalid, then fallback to the saved service + // config. If there is no saved config either, use the default service + // config. + if (chand_->saved_service_config_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: resolver returned invalid service config. " + "Continuing to use previous service config.", + chand_); + } + service_config = chand_->saved_service_config_; + } else if (chand_->default_service_config_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: resolver returned invalid service config. Using " + "default service config provided by client API.", + chand_); + } + service_config = chand_->default_service_config_; + } + } else if (result.service_config == nullptr) { + if (chand_->default_service_config_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: resolver returned no service config. Using default " + "service config provided by client API.", + chand_); + } + service_config = chand_->default_service_config_; + } + } else { + service_config = result.service_config; + } + service_config_result.service_config_error = + GRPC_ERROR_REF(result.service_config_error); + if (service_config == nullptr && + result.service_config_error != GRPC_ERROR_NONE) { + service_config_result.no_valid_service_config = true; + return service_config_result; + } + // Process service config. + grpc_core::UniquePtr service_config_json; + const internal::ClientChannelGlobalParsedConfig* parsed_service_config = + nullptr; + if (service_config != nullptr) { + parsed_service_config = + static_cast( + service_config->GetGlobalParsedConfig( + internal::ClientChannelServiceConfigParser::ParserIndex())); + } + // Check if the config has changed. + service_config_result.service_config_changed = + ((service_config == nullptr) != + (chand_->saved_service_config_ == nullptr)) || + (service_config != nullptr && + service_config->json_string() != + chand_->saved_service_config_->json_string()); + if (service_config_result.service_config_changed) { + service_config_json.reset(gpr_strdup( + service_config != nullptr ? service_config->json_string().c_str() + : "")); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: resolver returned updated service config: \"%s\"", + chand_, service_config_json.get()); + } + // Save health check service name. + if (service_config != nullptr) { + chand_->health_check_service_name_.reset( + gpr_strdup(parsed_service_config->health_check_service_name())); + } else { + chand_->health_check_service_name_.reset(); + } + // Update health check service name used by existing subchannel wrappers. + for (auto* subchannel_wrapper : chand_->subchannel_wrappers_) { + subchannel_wrapper->UpdateHealthCheckServiceName( + grpc_core::UniquePtr( + gpr_strdup(chand_->health_check_service_name_.get()))); + } + // Save service config. + chand_->saved_service_config_ = std::move(service_config); + } + // Find LB policy config. + ProcessLbPolicy(result, parsed_service_config, + &service_config_result.lb_policy_config); + grpc_core::UniquePtr lb_policy_name( + gpr_strdup((service_config_result.lb_policy_config)->name())); + // Swap out the data used by GetChannelInfo(). + { + MutexLock lock(&chand_->info_mu_); + chand_->info_lb_policy_name_ = std::move(lb_policy_name); + if (service_config_json != nullptr) { + chand_->info_service_config_json_ = std::move(service_config_json); + } + } + // Return results. + return service_config_result; +} + +void ChannelData::ChannelConfigHelper::ApplyConfigSelector( + bool service_config_changed, + RefCountedPtr config_selector) { + chand_->UpdateServiceConfigInDataPlaneLocked(service_config_changed, + std::move(config_selector)); +} + +void ChannelData::ChannelConfigHelper::ResolverTransientFailure( + grpc_error* error) { + MutexLock lock(&chand_->data_plane_mu_); + GRPC_ERROR_UNREF(chand_->resolver_transient_failure_error_); + chand_->resolver_transient_failure_error_ = error; +} + +void ChannelData::ChannelConfigHelper::ProcessLbPolicy( + const Resolver::Result& resolver_result, + const internal::ClientChannelGlobalParsedConfig* parsed_service_config, + RefCountedPtr* lb_policy_config) { + // Prefer the LB policy config found in the service config. + if (parsed_service_config != nullptr && + parsed_service_config->parsed_lb_config() != nullptr) { + *lb_policy_config = parsed_service_config->parsed_lb_config(); + return; + } + // Try the deprecated LB policy name from the service config. + // If not, try the setting from channel args. + const char* policy_name = nullptr; + if (parsed_service_config != nullptr && + !parsed_service_config->parsed_deprecated_lb_policy().empty()) { + policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str(); + } else { + const grpc_arg* channel_arg = + grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME); + policy_name = grpc_channel_arg_get_string(channel_arg); + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (policy_name == nullptr) policy_name = "pick_first"; + // Now that we have the policy name, construct an empty config for it. + Json config_json = Json::Array{Json::Object{ + {policy_name, Json::Object{}}, + }}; + grpc_error* parse_error = GRPC_ERROR_NONE; + *lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + config_json, &parse_error); + // The policy name came from one of three places: + // - The deprecated loadBalancingPolicy field in the service config, + // in which case the code in ClientChannelServiceConfigParser + // already verified that the policy does not require a config. + // - One of the hard-coded values here, all of which are known to not + // require a config. + // - A channel arg, in which case the application did something that + // is a misuse of our API. + // In the first two cases, these assertions will always be true. In + // the last case, this is probably fine for now. + // TODO(roth): If the last case becomes a problem, add better error + // handling here. + GPR_ASSERT(*lb_policy_config != nullptr); + GPR_ASSERT(parse_error == GRPC_ERROR_NONE); +} + // // ChannelData implementation // @@ -1393,6 +1591,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) client_channel_factory_( ClientChannelFactory::GetFromChannelArgs(args->channel_args)), channelz_node_(GetChannelzNode(args->channel_args)), + channel_config_helper_(this), work_serializer_(std::make_shared()), interested_parties_(grpc_pollset_set_create()), subchannel_pool_(GetSubchannelPool(args->channel_args)), @@ -1461,6 +1660,7 @@ ChannelData::~ChannelData() { } DestroyResolvingLoadBalancingPolicyLocked(); grpc_channel_args_destroy(channel_args_); + GRPC_ERROR_UNREF(resolver_transient_failure_error_); // Stop backup polling. grpc_client_channel_stop_backup_polling(interested_parties_); grpc_pollset_set_destroy(interested_parties_); @@ -1475,6 +1675,7 @@ void ChannelData::UpdateStateAndPickerLocked( if (picker_ == nullptr) { health_check_service_name_.reset(); saved_service_config_.reset(); + saved_config_selector_.reset(); received_first_resolver_result_ = false; } // Update connectivity state. @@ -1497,9 +1698,11 @@ void ChannelData::UpdateStateAndPickerLocked( // - refs to subchannel wrappers in the keys of pending_subchannel_updates_ // - ref stored in retry_throttle_data_ // - ref stored in service_config_ + // - ref stored in config_selector_ // - ownership of the existing picker in picker_ RefCountedPtr retry_throttle_data_to_unref; RefCountedPtr service_config_to_unref; + RefCountedPtr config_selector_to_unref; { MutexLock lock(&data_plane_mu_); // Handle subchannel updates. @@ -1524,6 +1727,7 @@ void ChannelData::UpdateStateAndPickerLocked( // Note: We save the objects to unref until after the lock is released. retry_throttle_data_to_unref = std::move(retry_throttle_data_); service_config_to_unref = std::move(service_config_); + config_selector_to_unref = std::move(config_selector_); } // Re-process queued picks. for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { @@ -1540,24 +1744,72 @@ void ChannelData::UpdateStateAndPickerLocked( pending_subchannel_updates_.clear(); } -void ChannelData::UpdateServiceConfigLocked( - RefCountedPtr retry_throttle_data, - RefCountedPtr service_config) { +void ChannelData::UpdateServiceConfigInDataPlaneLocked( + bool service_config_changed, + RefCountedPtr config_selector) { + // Check if ConfigSelector has changed. + const bool config_selector_changed = + saved_config_selector_ != config_selector; + saved_config_selector_ = config_selector; + // We want to set the service config at least once, even if the + // resolver does not return a config, because that ensures that we + // disable retries if they are not enabled in the service config. + // TODO(roth): Consider removing the received_first_resolver_result_ check + // when we implement transparent retries. + if (!service_config_changed && !config_selector_changed && + received_first_resolver_result_) { + return; + } + received_first_resolver_result_ = true; + // Get retry throttle data from service config. + RefCountedPtr retry_throttle_data; + if (saved_service_config_ != nullptr) { + const internal::ClientChannelGlobalParsedConfig* parsed_service_config = + static_cast( + saved_service_config_->GetGlobalParsedConfig( + internal::ClientChannelServiceConfigParser::ParserIndex())); + if (parsed_service_config != nullptr) { + absl::optional + retry_throttle_config = parsed_service_config->retry_throttling(); + if (retry_throttle_config.has_value()) { + retry_throttle_data = + internal::ServerRetryThrottleMap::GetDataForServer( + server_name_.get(), + retry_throttle_config.value().max_milli_tokens, + retry_throttle_config.value().milli_token_ratio); + } + } + } + // Create default config selector if not provided by resolver. + if (config_selector == nullptr) { + config_selector = + MakeRefCounted(saved_service_config_); + } // Grab data plane lock to update service config. // // We defer unreffing the old values (and deallocating memory) until // after releasing the lock to keep the critical section small. + RefCountedPtr service_config_to_unref = saved_service_config_; + RefCountedPtr config_selector_to_unref = + std::move(config_selector); { MutexLock lock(&data_plane_mu_); + GRPC_ERROR_UNREF(resolver_transient_failure_error_); + resolver_transient_failure_error_ = GRPC_ERROR_NONE; // Update service config. received_service_config_data_ = true; // Old values will be unreffed after lock is released. retry_throttle_data_.swap(retry_throttle_data); - service_config_.swap(service_config); - // Apply service config to queued picks. + service_config_.swap(service_config_to_unref); + config_selector_.swap(config_selector_to_unref); + // Re-process queued picks. for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { - CallData* calld = static_cast(pick->elem->call_data); - calld->MaybeApplyServiceConfigToCallLocked(pick->elem); + grpc_call_element* elem = pick->elem; + CallData* calld = static_cast(elem->call_data); + grpc_error* error = GRPC_ERROR_NONE; + if (calld->PickSubchannelLocked(elem, &error)) { + calld->AsyncPickDone(elem, error); + } } } // Old values will be unreffed after lock is released when they go out @@ -1574,7 +1826,7 @@ void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { grpc_core::UniquePtr target_uri(gpr_strdup(target_uri_.get())); resolving_lb_policy_.reset(new ResolvingLoadBalancingPolicy( std::move(lb_args), &grpc_client_channel_routing_trace, - std::move(target_uri), ProcessResolverResultLocked, this)); + std::move(target_uri), &channel_config_helper_)); grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(), interested_parties_); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { @@ -1591,180 +1843,6 @@ void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() { } } -void ChannelData::ProcessLbPolicy( - const Resolver::Result& resolver_result, - const internal::ClientChannelGlobalParsedConfig* parsed_service_config, - RefCountedPtr* lb_policy_config) { - // Prefer the LB policy config found in the service config. - if (parsed_service_config != nullptr && - parsed_service_config->parsed_lb_config() != nullptr) { - *lb_policy_config = parsed_service_config->parsed_lb_config(); - return; - } - // Try the deprecated LB policy name from the service config. - // If not, try the setting from channel args. - const char* policy_name = nullptr; - if (parsed_service_config != nullptr && - !parsed_service_config->parsed_deprecated_lb_policy().empty()) { - policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str(); - } else { - const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME); - policy_name = grpc_channel_arg_get_string(channel_arg); - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (policy_name == nullptr) policy_name = "pick_first"; - // Now that we have the policy name, construct an empty config for it. - Json config_json = Json::Array{Json::Object{ - {policy_name, Json::Object{}}, - }}; - grpc_error* parse_error = GRPC_ERROR_NONE; - *lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - config_json, &parse_error); - // The policy name came from one of three places: - // - The deprecated loadBalancingPolicy field in the service config, - // in which case the code in ClientChannelServiceConfigParser - // already verified that the policy does not require a config. - // - One of the hard-coded values here, all of which are known to not - // require a config. - // - A channel arg, in which case the application did something that - // is a misuse of our API. - // In the first two cases, these assertions will always be true. In - // the last case, this is probably fine for now. - // TODO(roth): If the last case becomes a problem, add better error - // handling here. - GPR_ASSERT(*lb_policy_config != nullptr); - GPR_ASSERT(parse_error == GRPC_ERROR_NONE); -} - -// Synchronous callback from ResolvingLoadBalancingPolicy to process a -// resolver result update. -bool ChannelData::ProcessResolverResultLocked( - void* arg, const Resolver::Result& result, - RefCountedPtr* lb_policy_config, - grpc_error** service_config_error, bool* no_valid_service_config) { - ChannelData* chand = static_cast(arg); - RefCountedPtr service_config; - // If resolver did not return a service config or returned an invalid service - // config, we need a fallback service config. - if (result.service_config_error != GRPC_ERROR_NONE) { - // If the service config was invalid, then fallback to the saved service - // config. If there is no saved config either, use the default service - // config. - if (chand->saved_service_config_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: resolver returned invalid service config. " - "Continuing to use previous service config.", - chand); - } - service_config = chand->saved_service_config_; - } else if (chand->default_service_config_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: resolver returned invalid service config. Using " - "default service config provided by client API.", - chand); - } - service_config = chand->default_service_config_; - } - } else if (result.service_config == nullptr) { - if (chand->default_service_config_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: resolver returned no service config. Using default " - "service config provided by client API.", - chand); - } - service_config = chand->default_service_config_; - } - } else { - service_config = result.service_config; - } - *service_config_error = GRPC_ERROR_REF(result.service_config_error); - if (service_config == nullptr && - result.service_config_error != GRPC_ERROR_NONE) { - *no_valid_service_config = true; - return false; - } - // Process service config. - grpc_core::UniquePtr service_config_json; - const internal::ClientChannelGlobalParsedConfig* parsed_service_config = - nullptr; - if (service_config != nullptr) { - parsed_service_config = - static_cast( - service_config->GetGlobalParsedConfig( - internal::ClientChannelServiceConfigParser::ParserIndex())); - } - // Check if the config has changed. - const bool service_config_changed = - ((service_config == nullptr) != - (chand->saved_service_config_ == nullptr)) || - (service_config != nullptr && - service_config->json_string() != - chand->saved_service_config_->json_string()); - if (service_config_changed) { - service_config_json.reset(gpr_strdup( - service_config != nullptr ? service_config->json_string().c_str() - : "")); - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: resolver returned updated service config: \"%s\"", - chand, service_config_json.get()); - } - // Save health check service name. - if (service_config != nullptr) { - chand->health_check_service_name_.reset( - gpr_strdup(parsed_service_config->health_check_service_name())); - } else { - chand->health_check_service_name_.reset(); - } - // Update health check service name used by existing subchannel wrappers. - for (auto* subchannel_wrapper : chand->subchannel_wrappers_) { - subchannel_wrapper->UpdateHealthCheckServiceName( - grpc_core::UniquePtr( - gpr_strdup(chand->health_check_service_name_.get()))); - } - // Save service config. - chand->saved_service_config_ = std::move(service_config); - } - // We want to set the service config at least once. This should not really be - // needed, but we are doing it as a defensive approach. This can be removed, - // if we feel it is unnecessary. - if (service_config_changed || !chand->received_first_resolver_result_) { - chand->received_first_resolver_result_ = true; - RefCountedPtr retry_throttle_data; - if (parsed_service_config != nullptr) { - absl::optional - retry_throttle_config = parsed_service_config->retry_throttling(); - if (retry_throttle_config.has_value()) { - retry_throttle_data = - internal::ServerRetryThrottleMap::GetDataForServer( - chand->server_name_.get(), - retry_throttle_config.value().max_milli_tokens, - retry_throttle_config.value().milli_token_ratio); - } - } - chand->UpdateServiceConfigLocked(std::move(retry_throttle_data), - chand->saved_service_config_); - } - chand->ProcessLbPolicy(result, parsed_service_config, lb_policy_config); - grpc_core::UniquePtr lb_policy_name( - gpr_strdup((*lb_policy_config)->name())); - // Swap out the data used by GetChannelInfo(). - { - MutexLock lock(&chand->info_mu_); - chand->info_lb_policy_name_ = std::move(lb_policy_name); - if (service_config_json != nullptr) { - chand->info_service_config_json_ = std::move(service_config_json); - } - } - // Return results. - return service_config_changed; -} - grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { if (state_tracker_.state() != GRPC_CHANNEL_READY) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); @@ -2807,6 +2885,7 @@ void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { } // Received valid initial metadata, so commit the call. calld->RetryCommit(elem, retry_state); + calld->MaybeInvokeConfigSelectorCommitCallback(); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. calld->InvokeRecvInitialMetadataCallback(batch_data, error); @@ -2893,6 +2972,7 @@ void CallData::RecvMessageReady(void* arg, grpc_error* error) { } // Received a valid message, so commit the call. calld->RetryCommit(elem, retry_state); + calld->MaybeInvokeConfigSelectorCommitCallback(); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. calld->InvokeRecvMessageCallback(batch_data, error); @@ -3094,6 +3174,7 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) { } // Not retrying, so commit the call. calld->RetryCommit(elem, retry_state); + calld->MaybeInvokeConfigSelectorCommitCallback(); // Run any necessary closures. calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error)); } @@ -3716,7 +3797,7 @@ class CallData::QueuedPickCanceller { } if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) { // Remove pick from list of queued picks. - calld->RemoveCallFromQueuedPicksLocked(self->elem_); + calld->MaybeRemoveCallFromQueuedPicksLocked(self->elem_); // Fail pending batches on the call. calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error), YieldCallCombinerIfPendingBatchesFound); @@ -3729,7 +3810,8 @@ class CallData::QueuedPickCanceller { grpc_closure closure_; }; -void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) { +void CallData::MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem) { + if (!pick_queued_) return; auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list", @@ -3741,7 +3823,8 @@ void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) { pick_canceller_ = nullptr; } -void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) { +void CallData::MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem) { + if (pick_queued_) return; auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand, @@ -3754,23 +3837,29 @@ void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) { pick_canceller_ = new QueuedPickCanceller(elem); } -void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) { +grpc_error* CallData::ApplyServiceConfigToCallLocked( + grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { ChannelData* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, this); } + ConfigSelector* config_selector = chand->config_selector(); auto service_config = chand->service_config(); if (service_config != nullptr) { + // Use the ConfigSelector to determine the config for the call. + ConfigSelector::CallConfig call_config = + config_selector->GetCallConfig({&path_, initial_metadata}); + if (call_config.error != GRPC_ERROR_NONE) return call_config.error; + call_attributes_ = std::move(call_config.call_attributes); + on_call_committed_ = std::move(call_config.on_call_committed); // Create a ServiceConfigCallData for the call. This stores a ref to the // ServiceConfig and caches the right set of parsed configs to use for // the call. The MethodConfig will store itself in the call context, // so that it can be accessed by filters in the subchannel, and it // will be cleaned up when the call ends. - const auto* method_params_vector = - service_config->GetMethodParsedConfigVector(path_); auto* service_config_call_data = arena_->New( - std::move(service_config), method_params_vector, call_context_); + std::move(service_config), call_config.method_configs, call_context_); // Apply our own method params to the call. method_params_ = static_cast( service_config_call_data->GetMethodParsedConfig( @@ -3812,16 +3901,13 @@ void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) { if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) { enable_retries_ = false; } + return GRPC_ERROR_NONE; } -void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) { - ChannelData* chand = static_cast(elem->channel_data); - // Apply service config data to the call only once, and only if the - // channel has the data available. - if (GPR_LIKELY(chand->received_service_config_data() && - !service_config_applied_)) { - service_config_applied_ = true; - ApplyServiceConfigToCallLocked(elem); +void CallData::MaybeInvokeConfigSelectorCommitCallback() { + if (on_call_committed_ != nullptr) { + on_call_committed_(); + on_call_committed_ = nullptr; } } @@ -3882,11 +3968,45 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, GRPC_ERROR_NONE); // Queue the pick, so that it will be attempted once the channel // becomes connected. - AddCallToQueuedPicksLocked(elem); + MaybeAddCallToQueuedPicksLocked(elem); + return false; + } + grpc_metadata_batch* initial_metadata_batch = + seen_send_initial_metadata_ + ? &send_initial_metadata_ + : pending_batches_[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + // Grab initial metadata flags so that we can check later if the call has + // wait_for_ready enabled. + const uint32_t send_initial_metadata_flags = + seen_send_initial_metadata_ ? send_initial_metadata_flags_ + : pending_batches_[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + // Avoid picking if we haven't yet received service config data. + if (GPR_UNLIKELY(!chand->received_service_config_data())) { + // If the resolver returned transient failure before returning the + // first service config, fail any non-wait_for_ready calls. + grpc_error* resolver_error = chand->resolver_transient_failure_error(); + if (resolver_error != GRPC_ERROR_NONE && + (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == + 0) { + MaybeRemoveCallFromQueuedPicksLocked(elem); + *error = GRPC_ERROR_REF(resolver_error); + return true; + } + // Either the resolver has not yet returned a result, or it has + // returned transient failure but the call is wait_for_ready. In + // either case, queue the call. + MaybeAddCallToQueuedPicksLocked(elem); return false; } - // Apply service config to call if needed. - MaybeApplyServiceConfigToCallLocked(elem); + // Apply service config to call if not yet applied. + if (GPR_LIKELY(!service_config_applied_)) { + service_config_applied_ = true; + *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch); + if (*error != GRPC_ERROR_NONE) return true; + } // If this is a retry, use the send_initial_metadata payload that // we've cached; otherwise, use the pending batch. The // send_initial_metadata batch will be the first pending batch in the @@ -3899,20 +4019,8 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, // attempt) to the LB policy instead the one from the parent channel. LoadBalancingPolicy::PickArgs pick_args; pick_args.call_state = &lb_call_state_; - Metadata initial_metadata( - this, - seen_send_initial_metadata_ - ? &send_initial_metadata_ - : pending_batches_[0] - .batch->payload->send_initial_metadata.send_initial_metadata); + Metadata initial_metadata(this, initial_metadata_batch); pick_args.initial_metadata = &initial_metadata; - // Grab initial metadata flags so that we can check later if the call has - // wait_for_ready enabled. - const uint32_t send_initial_metadata_flags = - seen_send_initial_metadata_ ? send_initial_metadata_flags_ - : pending_batches_[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; // Attempt pick. auto result = chand->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { @@ -3927,7 +4035,8 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, grpc_error* disconnect_error = chand->disconnect_error(); if (disconnect_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(result.error); - if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromQueuedPicksLocked(elem); + MaybeInvokeConfigSelectorCommitCallback(); *error = GRPC_ERROR_REF(disconnect_error); return true; } @@ -3948,8 +4057,9 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, "Failed to pick subchannel", &result.error, 1); GRPC_ERROR_UNREF(result.error); *error = new_error; + MaybeInvokeConfigSelectorCommitCallback(); } - if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromQueuedPicksLocked(elem); return !retried; } // If wait_for_ready is true, then queue to retry when we get a new @@ -3958,22 +4068,24 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, } // Fallthrough case LoadBalancingPolicy::PickResult::PICK_QUEUE: - if (!pick_queued_) AddCallToQueuedPicksLocked(elem); + MaybeAddCallToQueuedPicksLocked(elem); return false; default: // PICK_COMPLETE - if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromQueuedPicksLocked(elem); // Handle drops. if (GPR_UNLIKELY(result.subchannel == nullptr)) { result.error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + MaybeInvokeConfigSelectorCommitCallback(); } else { // Grab a ref to the connected subchannel while we're still // holding the data plane mutex. connected_subchannel_ = chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); GPR_ASSERT(connected_subchannel_ != nullptr); + if (retry_committed_) MaybeInvokeConfigSelectorCommitCallback(); } lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready; *error = result.error; diff --git a/src/core/ext/filters/client_channel/config_selector.cc b/src/core/ext/filters/client_channel/config_selector.cc new file mode 100644 index 00000000000..e5d2a3f0be3 --- /dev/null +++ b/src/core/ext/filters/client_channel/config_selector.cc @@ -0,0 +1,62 @@ +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "src/core/ext/filters/client_channel/config_selector.h" + +#include "src/core/lib/channel/channel_args.h" + +// Channel arg key for ConfigSelector. +#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector" + +namespace grpc_core { + +namespace { + +void* ConfigSelectorArgCopy(void* p) { + ConfigSelector* config_selector = static_cast(p); + config_selector->Ref().release(); + return p; +} + +void ConfigSelectorArgDestroy(void* p) { + ConfigSelector* config_selector = static_cast(p); + config_selector->Unref(); +} + +int ConfigSelectorArgCmp(void* p, void* q) { return GPR_ICMP(p, q); } + +const grpc_arg_pointer_vtable kChannelArgVtable = { + ConfigSelectorArgCopy, ConfigSelectorArgDestroy, ConfigSelectorArgCmp}; + +} // namespace + +grpc_arg ConfigSelector::MakeChannelArg() const { + return grpc_channel_arg_pointer_create( + const_cast(GRPC_ARG_CONFIG_SELECTOR), + const_cast(this), &kChannelArgVtable); +} + +RefCountedPtr ConfigSelector::GetFromChannelArgs( + const grpc_channel_args& args) { + ConfigSelector* config_selector = + grpc_channel_args_find_pointer(&args, + GRPC_ARG_CONFIG_SELECTOR); + return config_selector != nullptr ? config_selector->Ref() : nullptr; +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h new file mode 100644 index 00000000000..efd27341fa5 --- /dev/null +++ b/src/core/ext/filters/client_channel/config_selector.h @@ -0,0 +1,91 @@ +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H + +#include + +#include +#include + +#include "absl/strings/string_view.h" + +#include +#include + +#include "src/core/ext/filters/client_channel/service_config.h" +#include "src/core/ext/filters/client_channel/service_config_parser.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/transport/metadata_batch.h" + +namespace grpc_core { + +// Internal API used to allow resolver implementations to override +// MethodConfig and provide input to LB policies on a per-call basis. +class ConfigSelector : public RefCounted { + public: + struct GetCallConfigArgs { + grpc_slice* path; + grpc_metadata_batch* initial_metadata; + }; + + struct CallConfig { + // Can be set to indicate the call should be failed. + grpc_error* error = GRPC_ERROR_NONE; + // The per-method parsed configs that will be passed to + // ServiceConfigCallData. + const ServiceConfigParser::ParsedConfigVector* method_configs = nullptr; + // Call attributes that will be accessible to LB policy implementations. + std::map call_attributes; + // A callback that, if set, will be invoked when the call is + // committed (i.e., when we know that we will never again need to + // ask the picker for a subchannel for this call). + std::function on_call_committed; + }; + + virtual ~ConfigSelector() = default; + + virtual CallConfig GetCallConfig(GetCallConfigArgs args) = 0; + + grpc_arg MakeChannelArg() const; + static RefCountedPtr GetFromChannelArgs( + const grpc_channel_args& args); +}; + +// Default ConfigSelector that gets the MethodConfig from the service config. +class DefaultConfigSelector : public ConfigSelector { + public: + explicit DefaultConfigSelector(RefCountedPtr service_config) + : service_config_(std::move(service_config)) {} + + CallConfig GetCallConfig(GetCallConfigArgs args) override { + CallConfig call_config; + if (service_config_ != nullptr) { + call_config.method_configs = + service_config_->GetMethodParsedConfigVector(*args.path); + } + return call_config; + } + + private: + RefCountedPtr service_config_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H */ diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index 74a5c29968b..356123c50ae 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -145,14 +145,12 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( Args args, TraceFlag* tracer, grpc_core::UniquePtr target_uri, - ProcessResolverResultCallback process_resolver_result, - void* process_resolver_result_user_data) + ChannelConfigHelper* helper) : LoadBalancingPolicy(std::move(args)), tracer_(tracer), target_uri_(std::move(target_uri)), - process_resolver_result_(process_resolver_result), - process_resolver_result_user_data_(process_resolver_result_user_data) { - GPR_ASSERT(process_resolver_result != nullptr); + helper_(helper) { + GPR_ASSERT(helper_ != nullptr); resolver_ = ResolverRegistry::CreateResolver( target_uri_.get(), args.args, interested_parties(), work_serializer(), absl::make_unique(Ref())); @@ -214,6 +212,7 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { if (lb_policy_ == nullptr) { grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Resolver transient failure", &error, 1); + helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error)); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, absl::make_unique(state_error)); @@ -304,45 +303,51 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( TraceStringVector trace_strings; const bool resolution_contains_addresses = result.addresses.size() > 0; // Process the resolver result. - RefCountedPtr lb_policy_config; - bool service_config_changed = false; - std::string service_config_error_string; - if (process_resolver_result_ != nullptr) { - grpc_error* service_config_error = GRPC_ERROR_NONE; - bool no_valid_service_config = false; - service_config_changed = process_resolver_result_( - process_resolver_result_user_data_, result, &lb_policy_config, - &service_config_error, &no_valid_service_config); - if (service_config_error != GRPC_ERROR_NONE) { - service_config_error_string = grpc_error_string(service_config_error); - if (no_valid_service_config) { + ChannelConfigHelper::ApplyServiceConfigResult service_config_result; + if (helper_ != nullptr) { + service_config_result = helper_->ApplyServiceConfig(result); + if (service_config_result.service_config_error != GRPC_ERROR_NONE) { + if (service_config_result.no_valid_service_config) { // We received an invalid service config and we don't have a // fallback service config. - OnResolverError(service_config_error); - } else { - GRPC_ERROR_UNREF(service_config_error); + OnResolverError(service_config_result.service_config_error); + service_config_result.service_config_error = GRPC_ERROR_NONE; } } } else { - lb_policy_config = child_lb_config_; + service_config_result.lb_policy_config = child_lb_config_; } - if (lb_policy_config != nullptr) { - // Create or update LB policy, as needed. - CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), - std::move(result)); + // Before we send the args to the LB policy, grab the ConfigSelector for + // later use. + RefCountedPtr config_selector = + ConfigSelector::GetFromChannelArgs(*result.args); + // Create or update LB policy, as needed. + if (service_config_result.lb_policy_config != nullptr) { + CreateOrUpdateLbPolicyLocked( + std::move(service_config_result.lb_policy_config), std::move(result)); + } + // Apply ConfigSelector to channel. + // This needs to happen after the LB policy has been updated, since + // the ConfigSelector may need the LB policy to know about new + // destinations before it can send RPCs to those destinations. + if (helper_ != nullptr) { + helper_->ApplyConfigSelector(service_config_result.service_config_changed, + std::move(config_selector)); } // Add channel trace event. - if (service_config_changed) { + if (service_config_result.service_config_changed) { // TODO(ncteisen): might be worth somehow including a snippet of the // config in the trace, at the risk of bloating the trace logs. trace_strings.push_back("Service config changed"); } - if (!service_config_error_string.empty()) { - trace_strings.push_back(service_config_error_string.c_str()); + if (service_config_result.service_config_error != GRPC_ERROR_NONE) { + trace_strings.push_back( + grpc_error_string(service_config_result.service_config_error)); } MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses, &trace_strings); ConcatenateAndAddChannelTraceLocked(trace_strings); + GRPC_ERROR_UNREF(service_config_result.service_config_error); } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index 39815e28039..98ea37fbb64 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -23,6 +23,7 @@ #include "absl/container/inlined_vector.h" +#include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/resolver.h" @@ -52,22 +53,37 @@ namespace grpc_core { // child LB policy and config to use. class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { public: - // Synchronous callback that takes the resolver result and sets - // lb_policy_config to point to the right data. - // Returns true if the service config has changed since the last result. - // If the returned no_valid_service_config is true, that means that we - // don't have a valid service config to use, and we should set the channel - // to be in TRANSIENT_FAILURE. - typedef bool (*ProcessResolverResultCallback)( - void* user_data, const Resolver::Result& result, - RefCountedPtr* lb_policy_config, - grpc_error** service_config_error, bool* no_valid_service_config); - // If error is set when this returns, then construction failed, and - // the caller may not use the new object. - ResolvingLoadBalancingPolicy( - Args args, TraceFlag* tracer, grpc_core::UniquePtr target_uri, - ProcessResolverResultCallback process_resolver_result, - void* process_resolver_result_user_data); + class ChannelConfigHelper { + public: + struct ApplyServiceConfigResult { + // Set to true if the service config has changed since the last result. + bool service_config_changed = false; + // Set to true if we don't have a valid service config to use. + // This tells the ResolvingLoadBalancingPolicy to put the channel + // into TRANSIENT_FAILURE. + bool no_valid_service_config = false; + // A service config parsing error occurred. + grpc_error* service_config_error = GRPC_ERROR_NONE; + // The LB policy config to use. + RefCountedPtr lb_policy_config; + }; + + // Applies the service config to the channel. + virtual ApplyServiceConfigResult ApplyServiceConfig( + const Resolver::Result& result) = 0; + + // Applies the ConfigSelector to the channel. + virtual void ApplyConfigSelector( + bool service_config_changed, + RefCountedPtr config_selector) = 0; + + // Indicates a resolver transient failure. + virtual void ResolverTransientFailure(grpc_error* error) = 0; + }; + + ResolvingLoadBalancingPolicy(Args args, TraceFlag* tracer, + grpc_core::UniquePtr target_uri, + ChannelConfigHelper* helper); virtual const char* name() const override { return "resolving_lb"; } @@ -105,15 +121,16 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { // Passed in from caller at construction time. TraceFlag* tracer_; grpc_core::UniquePtr target_uri_; - ProcessResolverResultCallback process_resolver_result_ = nullptr; - void* process_resolver_result_user_data_ = nullptr; - grpc_core::UniquePtr child_policy_name_; - RefCountedPtr child_lb_config_; + ChannelConfigHelper* helper_; // Resolver and associated state. OrphanablePtr resolver_; bool previous_resolution_contained_addresses_ = false; + // Determined by resolver results. + grpc_core::UniquePtr child_policy_name_; + RefCountedPtr child_lb_config_; + // Child LB policy. OrphanablePtr lb_policy_; }; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 37bed16955f..a25f10239f6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -23,6 +23,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/client_channel_channelz.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', 'src/core/ext/filters/client_channel/client_channel_plugin.cc', + 'src/core/ext/filters/client_channel/config_selector.cc', 'src/core/ext/filters/client_channel/global_subchannel_pool.cc', 'src/core/ext/filters/client_channel/health/health_check_client.cc', 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index e69d345dfbb..1a81817d3a9 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1081,6 +1081,8 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_factory.h \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ +src/core/ext/filters/client_channel/config_selector.cc \ +src/core/ext/filters/client_channel/config_selector.h \ src/core/ext/filters/client_channel/connector.h \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index bff4f23c4a4..a4443ebd080 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -878,6 +878,8 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \ src/core/ext/filters/client_channel/client_channel_factory.cc \ src/core/ext/filters/client_channel/client_channel_factory.h \ src/core/ext/filters/client_channel/client_channel_plugin.cc \ +src/core/ext/filters/client_channel/config_selector.cc \ +src/core/ext/filters/client_channel/config_selector.h \ src/core/ext/filters/client_channel/connector.h \ src/core/ext/filters/client_channel/global_subchannel_pool.cc \ src/core/ext/filters/client_channel/global_subchannel_pool.h \