diff --git a/BUILD b/BUILD
index 6cdc1a5104a..56a55a30509 100644
--- a/BUILD
+++ b/BUILD
@@ -1028,6 +1028,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_channelz.cc",
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
+ "src/core/ext/filters/client_channel/config_selector.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/health/health_check_client.cc",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",
@@ -1056,6 +1057,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_channelz.h",
"src/core/ext/filters/client_channel/client_channel_factory.h",
+ "src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
"src/core/ext/filters/client_channel/health/health_check_client.h",
diff --git a/BUILD.gn b/BUILD.gn
index 48a1d84e668..3834e5a0ab7 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -211,6 +211,8 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
+ "src/core/ext/filters/client_channel/config_selector.cc",
+ "src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 777b3706903..b0de0ea2ece 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1328,6 +1328,7 @@ add_library(grpc
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
+ src/core/ext/filters/client_channel/config_selector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -2000,6 +2001,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
+ src/core/ext/filters/client_channel/config_selector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
diff --git a/Makefile b/Makefile
index f1be51c42dc..0cbae55f24e 100644
--- a/Makefile
+++ b/Makefile
@@ -3630,6 +3630,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
+ src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@@ -4276,6 +4277,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
+ src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index a5552dfcdbb..4487cec9e65 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -377,6 +377,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel.h
- src/core/ext/filters/client_channel/client_channel_channelz.h
- src/core/ext/filters/client_channel/client_channel_factory.h
+ - src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
@@ -746,6 +747,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_channelz.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
+ - src/core/ext/filters/client_channel/config_selector.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -1301,6 +1303,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel.h
- src/core/ext/filters/client_channel/client_channel_channelz.h
- src/core/ext/filters/client_channel/client_channel_factory.h
+ - src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
@@ -1606,6 +1609,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_channelz.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
+ - src/core/ext/filters/client_channel/config_selector.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
diff --git a/config.m4 b/config.m4
index 4f0fd8513d9..a6bf9a21b6c 100644
--- a/config.m4
+++ b/config.m4
@@ -45,6 +45,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
+ src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
diff --git a/config.w32 b/config.w32
index 093add492ad..03cf0684684 100644
--- a/config.w32
+++ b/config.w32
@@ -14,6 +14,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\client_channel_channelz.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " +
+ "src\\core\\ext\\filters\\client_channel\\config_selector.cc " +
"src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " +
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 8851119c07f..f34c2cb79a7 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -227,6 +227,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_channelz.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
+ 'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
@@ -687,6 +688,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_channelz.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
+ 'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index e4ae503d0a2..43a47a09fa7 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -195,6 +195,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
+ 'src/core/ext/filters/client_channel/config_selector.cc',
+ 'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
@@ -1052,6 +1054,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_channelz.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
+ 'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 66abcc3f64f..72ec55f0c86 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -117,6 +117,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc )
+ s.files += %w( src/core/ext/filters/client_channel/config_selector.cc )
+ s.files += %w( src/core/ext/filters/client_channel/config_selector.h )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h )
diff --git a/grpc.gyp b/grpc.gyp
index f8e1d48c3cc..587e6ba8d94 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -440,6 +440,7 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
+ 'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@@ -948,6 +949,7 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
+ 'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
diff --git a/package.xml b/package.xml
index 93b22c15c0b..7553fe27739 100644
--- a/package.xml
+++ b/package.xml
@@ -97,6 +97,8 @@
+
+
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index d8af25d8f2f..8533be0fe2b 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -40,6 +40,7 @@
#include "src/core/ext/filters/client_channel/backend_metric.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
+#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
@@ -149,12 +150,16 @@ class ChannelData {
bool received_service_config_data() const {
return received_service_config_data_;
}
+ grpc_error* resolver_transient_failure_error() const {
+ return resolver_transient_failure_error_;
+ }
RefCountedPtr retry_throttle_data() const {
return retry_throttle_data_;
}
RefCountedPtr service_config() const {
return service_config_;
}
+ ConfigSelector* config_selector() const { return config_selector_.get(); }
WorkSerializer* work_serializer() const { return work_serializer_.get(); }
RefCountedPtr GetConnectedSubchannelInDataPlane(
@@ -234,6 +239,29 @@ class ChannelData {
Atomic done_{false};
};
+ class ChannelConfigHelper
+ : public ResolvingLoadBalancingPolicy::ChannelConfigHelper {
+ public:
+ explicit ChannelConfigHelper(ChannelData* chand) : chand_(chand) {}
+
+ ApplyServiceConfigResult ApplyServiceConfig(
+ const Resolver::Result& result) override;
+
+ void ApplyConfigSelector(
+ bool service_config_changed,
+ RefCountedPtr config_selector) override;
+
+ void ResolverTransientFailure(grpc_error* error) override;
+
+ private:
+ static void ProcessLbPolicy(
+ const Resolver::Result& resolver_result,
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+ RefCountedPtr* lb_policy_config);
+
+ ChannelData* chand_;
+ };
+
ChannelData(grpc_channel_element_args* args, grpc_error** error);
~ChannelData();
@@ -241,30 +269,20 @@ class ChannelData {
grpc_connectivity_state state, const char* reason,
std::unique_ptr picker);
- void UpdateServiceConfigLocked(
- RefCountedPtr retry_throttle_data,
- RefCountedPtr service_config);
+ void UpdateServiceConfigInDataPlaneLocked(
+ bool service_config_changed,
+ RefCountedPtr config_selector);
void CreateResolvingLoadBalancingPolicyLocked();
void DestroyResolvingLoadBalancingPolicyLocked();
- static bool ProcessResolverResultLocked(
- void* arg, const Resolver::Result& result,
- RefCountedPtr* lb_policy_config,
- grpc_error** service_config_error, bool* no_valid_service_config);
-
grpc_error* DoPingLocked(grpc_transport_op* op);
void StartTransportOpLocked(grpc_transport_op* op);
void TryToConnectLocked();
- void ProcessLbPolicy(
- const Resolver::Result& resolver_result,
- const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
- RefCountedPtr* lb_policy_config);
-
//
// Fields set at construction and never modified.
//
@@ -278,6 +296,7 @@ class ChannelData {
grpc_core::UniquePtr server_name_;
grpc_core::UniquePtr target_uri_;
channelz::ChannelNode* channelz_node_;
+ ChannelConfigHelper channel_config_helper_;
//
// Fields used in the data plane. Guarded by data_plane_mu.
@@ -286,9 +305,11 @@ class ChannelData {
std::unique_ptr picker_;
QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
// Data from service config.
+ grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
bool received_service_config_data_ = false;
RefCountedPtr retry_throttle_data_;
RefCountedPtr service_config_;
+ RefCountedPtr config_selector_;
//
// Fields used in the control plane. Guarded by work_serializer.
@@ -300,6 +321,7 @@ class ChannelData {
ConnectivityStateTracker state_tracker_;
grpc_core::UniquePtr health_check_service_name_;
RefCountedPtr saved_service_config_;
+ RefCountedPtr saved_config_selector_;
bool received_first_resolver_result_ = false;
// The number of SubchannelWrapper instances referencing a given Subchannel.
std::map subchannel_refcount_map_;
@@ -352,9 +374,6 @@ class CallData {
RefCountedPtr subchannel_call() { return subchannel_call_; }
- // Invoked by channel for queued picks once resolver results are available.
- void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
-
// Invoked by channel for queued picks when the picker is updated.
static void PickSubchannel(void* arg, grpc_error* error);
@@ -742,13 +761,17 @@ class CallData {
void CreateSubchannelCall(grpc_call_element* elem);
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error* error);
- // Removes the call from the channel's list of queued picks.
- void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
- // Adds the call to the channel's list of queued picks.
- void AddCallToQueuedPicksLocked(grpc_call_element* elem);
+ // Removes the call from the channel's list of queued picks if present.
+ void MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
+ // Adds the call to the channel's list of queued picks if not already present.
+ void MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem);
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
- void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
+ // If an error is returned, the error indicates the status with which
+ // the call should be failed.
+ grpc_error* ApplyServiceConfigToCallLocked(
+ grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
+ void MaybeInvokeConfigSelectorCommitCallback();
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
@@ -769,6 +792,7 @@ class CallData {
RefCountedPtr retry_throttle_data_;
const ClientChannelMethodParsedConfig* method_params_ = nullptr;
std::map call_attributes_;
+ std::function on_call_committed_;
RefCountedPtr subchannel_call_;
@@ -1335,6 +1359,180 @@ class ChannelData::ClientChannelControlHelper
ChannelData* chand_;
};
+//
+// ChannelData::ChannelConfigHelper
+//
+
+// Synchronous callback from ResolvingLoadBalancingPolicy to process a
+// resolver result update.
+ChannelData::ChannelConfigHelper::ApplyServiceConfigResult
+ChannelData::ChannelConfigHelper::ApplyServiceConfig(
+ const Resolver::Result& result) {
+ ApplyServiceConfigResult service_config_result;
+ RefCountedPtr service_config;
+ // If resolver did not return a service config or returned an invalid service
+ // config, we need a fallback service config.
+ if (result.service_config_error != GRPC_ERROR_NONE) {
+ // If the service config was invalid, then fallback to the saved service
+ // config. If there is no saved config either, use the default service
+ // config.
+ if (chand_->saved_service_config_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned invalid service config. "
+ "Continuing to use previous service config.",
+ chand_);
+ }
+ service_config = chand_->saved_service_config_;
+ } else if (chand_->default_service_config_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned invalid service config. Using "
+ "default service config provided by client API.",
+ chand_);
+ }
+ service_config = chand_->default_service_config_;
+ }
+ } else if (result.service_config == nullptr) {
+ if (chand_->default_service_config_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned no service config. Using default "
+ "service config provided by client API.",
+ chand_);
+ }
+ service_config = chand_->default_service_config_;
+ }
+ } else {
+ service_config = result.service_config;
+ }
+ service_config_result.service_config_error =
+ GRPC_ERROR_REF(result.service_config_error);
+ if (service_config == nullptr &&
+ result.service_config_error != GRPC_ERROR_NONE) {
+ service_config_result.no_valid_service_config = true;
+ return service_config_result;
+ }
+ // Process service config.
+ grpc_core::UniquePtr service_config_json;
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
+ nullptr;
+ if (service_config != nullptr) {
+ parsed_service_config =
+ static_cast(
+ service_config->GetGlobalParsedConfig(
+ internal::ClientChannelServiceConfigParser::ParserIndex()));
+ }
+ // Check if the config has changed.
+ service_config_result.service_config_changed =
+ ((service_config == nullptr) !=
+ (chand_->saved_service_config_ == nullptr)) ||
+ (service_config != nullptr &&
+ service_config->json_string() !=
+ chand_->saved_service_config_->json_string());
+ if (service_config_result.service_config_changed) {
+ service_config_json.reset(gpr_strdup(
+ service_config != nullptr ? service_config->json_string().c_str()
+ : ""));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p: resolver returned updated service config: \"%s\"",
+ chand_, service_config_json.get());
+ }
+ // Save health check service name.
+ if (service_config != nullptr) {
+ chand_->health_check_service_name_.reset(
+ gpr_strdup(parsed_service_config->health_check_service_name()));
+ } else {
+ chand_->health_check_service_name_.reset();
+ }
+ // Update health check service name used by existing subchannel wrappers.
+ for (auto* subchannel_wrapper : chand_->subchannel_wrappers_) {
+ subchannel_wrapper->UpdateHealthCheckServiceName(
+ grpc_core::UniquePtr(
+ gpr_strdup(chand_->health_check_service_name_.get())));
+ }
+ // Save service config.
+ chand_->saved_service_config_ = std::move(service_config);
+ }
+ // Find LB policy config.
+ ProcessLbPolicy(result, parsed_service_config,
+ &service_config_result.lb_policy_config);
+ grpc_core::UniquePtr lb_policy_name(
+ gpr_strdup((service_config_result.lb_policy_config)->name()));
+ // Swap out the data used by GetChannelInfo().
+ {
+ MutexLock lock(&chand_->info_mu_);
+ chand_->info_lb_policy_name_ = std::move(lb_policy_name);
+ if (service_config_json != nullptr) {
+ chand_->info_service_config_json_ = std::move(service_config_json);
+ }
+ }
+ // Return results.
+ return service_config_result;
+}
+
+void ChannelData::ChannelConfigHelper::ApplyConfigSelector(
+ bool service_config_changed,
+ RefCountedPtr config_selector) {
+ chand_->UpdateServiceConfigInDataPlaneLocked(service_config_changed,
+ std::move(config_selector));
+}
+
+void ChannelData::ChannelConfigHelper::ResolverTransientFailure(
+ grpc_error* error) {
+ MutexLock lock(&chand_->data_plane_mu_);
+ GRPC_ERROR_UNREF(chand_->resolver_transient_failure_error_);
+ chand_->resolver_transient_failure_error_ = error;
+}
+
+void ChannelData::ChannelConfigHelper::ProcessLbPolicy(
+ const Resolver::Result& resolver_result,
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+ RefCountedPtr* lb_policy_config) {
+ // Prefer the LB policy config found in the service config.
+ if (parsed_service_config != nullptr &&
+ parsed_service_config->parsed_lb_config() != nullptr) {
+ *lb_policy_config = parsed_service_config->parsed_lb_config();
+ return;
+ }
+ // Try the deprecated LB policy name from the service config.
+ // If not, try the setting from channel args.
+ const char* policy_name = nullptr;
+ if (parsed_service_config != nullptr &&
+ !parsed_service_config->parsed_deprecated_lb_policy().empty()) {
+ policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
+ } else {
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
+ policy_name = grpc_channel_arg_get_string(channel_arg);
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (policy_name == nullptr) policy_name = "pick_first";
+ // Now that we have the policy name, construct an empty config for it.
+ Json config_json = Json::Array{Json::Object{
+ {policy_name, Json::Object{}},
+ }};
+ grpc_error* parse_error = GRPC_ERROR_NONE;
+ *lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
+ config_json, &parse_error);
+ // The policy name came from one of three places:
+ // - The deprecated loadBalancingPolicy field in the service config,
+ // in which case the code in ClientChannelServiceConfigParser
+ // already verified that the policy does not require a config.
+ // - One of the hard-coded values here, all of which are known to not
+ // require a config.
+ // - A channel arg, in which case the application did something that
+ // is a misuse of our API.
+ // In the first two cases, these assertions will always be true. In
+ // the last case, this is probably fine for now.
+ // TODO(roth): If the last case becomes a problem, add better error
+ // handling here.
+ GPR_ASSERT(*lb_policy_config != nullptr);
+ GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
+}
+
//
// ChannelData implementation
//
@@ -1393,6 +1591,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
channelz_node_(GetChannelzNode(args->channel_args)),
+ channel_config_helper_(this),
work_serializer_(std::make_shared()),
interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
@@ -1461,6 +1660,7 @@ ChannelData::~ChannelData() {
}
DestroyResolvingLoadBalancingPolicyLocked();
grpc_channel_args_destroy(channel_args_);
+ GRPC_ERROR_UNREF(resolver_transient_failure_error_);
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
@@ -1475,6 +1675,7 @@ void ChannelData::UpdateStateAndPickerLocked(
if (picker_ == nullptr) {
health_check_service_name_.reset();
saved_service_config_.reset();
+ saved_config_selector_.reset();
received_first_resolver_result_ = false;
}
// Update connectivity state.
@@ -1497,9 +1698,11 @@ void ChannelData::UpdateStateAndPickerLocked(
// - refs to subchannel wrappers in the keys of pending_subchannel_updates_
// - ref stored in retry_throttle_data_
// - ref stored in service_config_
+ // - ref stored in config_selector_
// - ownership of the existing picker in picker_
RefCountedPtr retry_throttle_data_to_unref;
RefCountedPtr service_config_to_unref;
+ RefCountedPtr config_selector_to_unref;
{
MutexLock lock(&data_plane_mu_);
// Handle subchannel updates.
@@ -1524,6 +1727,7 @@ void ChannelData::UpdateStateAndPickerLocked(
// Note: We save the objects to unref until after the lock is released.
retry_throttle_data_to_unref = std::move(retry_throttle_data_);
service_config_to_unref = std::move(service_config_);
+ config_selector_to_unref = std::move(config_selector_);
}
// Re-process queued picks.
for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
@@ -1540,24 +1744,72 @@ void ChannelData::UpdateStateAndPickerLocked(
pending_subchannel_updates_.clear();
}
-void ChannelData::UpdateServiceConfigLocked(
- RefCountedPtr retry_throttle_data,
- RefCountedPtr service_config) {
+void ChannelData::UpdateServiceConfigInDataPlaneLocked(
+ bool service_config_changed,
+ RefCountedPtr config_selector) {
+ // Check if ConfigSelector has changed.
+ const bool config_selector_changed =
+ saved_config_selector_ != config_selector;
+ saved_config_selector_ = config_selector;
+ // We want to set the service config at least once, even if the
+ // resolver does not return a config, because that ensures that we
+ // disable retries if they are not enabled in the service config.
+ // TODO(roth): Consider removing the received_first_resolver_result_ check
+ // when we implement transparent retries.
+ if (!service_config_changed && !config_selector_changed &&
+ received_first_resolver_result_) {
+ return;
+ }
+ received_first_resolver_result_ = true;
+ // Get retry throttle data from service config.
+ RefCountedPtr retry_throttle_data;
+ if (saved_service_config_ != nullptr) {
+ const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
+ static_cast(
+ saved_service_config_->GetGlobalParsedConfig(
+ internal::ClientChannelServiceConfigParser::ParserIndex()));
+ if (parsed_service_config != nullptr) {
+ absl::optional
+ retry_throttle_config = parsed_service_config->retry_throttling();
+ if (retry_throttle_config.has_value()) {
+ retry_throttle_data =
+ internal::ServerRetryThrottleMap::GetDataForServer(
+ server_name_.get(),
+ retry_throttle_config.value().max_milli_tokens,
+ retry_throttle_config.value().milli_token_ratio);
+ }
+ }
+ }
+ // Create default config selector if not provided by resolver.
+ if (config_selector == nullptr) {
+ config_selector =
+ MakeRefCounted(saved_service_config_);
+ }
// Grab data plane lock to update service config.
//
// We defer unreffing the old values (and deallocating memory) until
// after releasing the lock to keep the critical section small.
+ RefCountedPtr service_config_to_unref = saved_service_config_;
+ RefCountedPtr config_selector_to_unref =
+ std::move(config_selector);
{
MutexLock lock(&data_plane_mu_);
+ GRPC_ERROR_UNREF(resolver_transient_failure_error_);
+ resolver_transient_failure_error_ = GRPC_ERROR_NONE;
// Update service config.
received_service_config_data_ = true;
// Old values will be unreffed after lock is released.
retry_throttle_data_.swap(retry_throttle_data);
- service_config_.swap(service_config);
- // Apply service config to queued picks.
+ service_config_.swap(service_config_to_unref);
+ config_selector_.swap(config_selector_to_unref);
+ // Re-process queued picks.
for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
- CallData* calld = static_cast(pick->elem->call_data);
- calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
+ grpc_call_element* elem = pick->elem;
+ CallData* calld = static_cast(elem->call_data);
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (calld->PickSubchannelLocked(elem, &error)) {
+ calld->AsyncPickDone(elem, error);
+ }
}
}
// Old values will be unreffed after lock is released when they go out
@@ -1574,7 +1826,7 @@ void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
grpc_core::UniquePtr target_uri(gpr_strdup(target_uri_.get()));
resolving_lb_policy_.reset(new ResolvingLoadBalancingPolicy(
std::move(lb_args), &grpc_client_channel_routing_trace,
- std::move(target_uri), ProcessResolverResultLocked, this));
+ std::move(target_uri), &channel_config_helper_));
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@@ -1591,180 +1843,6 @@ void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
}
}
-void ChannelData::ProcessLbPolicy(
- const Resolver::Result& resolver_result,
- const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
- RefCountedPtr* lb_policy_config) {
- // Prefer the LB policy config found in the service config.
- if (parsed_service_config != nullptr &&
- parsed_service_config->parsed_lb_config() != nullptr) {
- *lb_policy_config = parsed_service_config->parsed_lb_config();
- return;
- }
- // Try the deprecated LB policy name from the service config.
- // If not, try the setting from channel args.
- const char* policy_name = nullptr;
- if (parsed_service_config != nullptr &&
- !parsed_service_config->parsed_deprecated_lb_policy().empty()) {
- policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
- } else {
- const grpc_arg* channel_arg =
- grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
- policy_name = grpc_channel_arg_get_string(channel_arg);
- }
- // Use pick_first if nothing was specified and we didn't select grpclb
- // above.
- if (policy_name == nullptr) policy_name = "pick_first";
- // Now that we have the policy name, construct an empty config for it.
- Json config_json = Json::Array{Json::Object{
- {policy_name, Json::Object{}},
- }};
- grpc_error* parse_error = GRPC_ERROR_NONE;
- *lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
- config_json, &parse_error);
- // The policy name came from one of three places:
- // - The deprecated loadBalancingPolicy field in the service config,
- // in which case the code in ClientChannelServiceConfigParser
- // already verified that the policy does not require a config.
- // - One of the hard-coded values here, all of which are known to not
- // require a config.
- // - A channel arg, in which case the application did something that
- // is a misuse of our API.
- // In the first two cases, these assertions will always be true. In
- // the last case, this is probably fine for now.
- // TODO(roth): If the last case becomes a problem, add better error
- // handling here.
- GPR_ASSERT(*lb_policy_config != nullptr);
- GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
-}
-
-// Synchronous callback from ResolvingLoadBalancingPolicy to process a
-// resolver result update.
-bool ChannelData::ProcessResolverResultLocked(
- void* arg, const Resolver::Result& result,
- RefCountedPtr* lb_policy_config,
- grpc_error** service_config_error, bool* no_valid_service_config) {
- ChannelData* chand = static_cast(arg);
- RefCountedPtr service_config;
- // If resolver did not return a service config or returned an invalid service
- // config, we need a fallback service config.
- if (result.service_config_error != GRPC_ERROR_NONE) {
- // If the service config was invalid, then fallback to the saved service
- // config. If there is no saved config either, use the default service
- // config.
- if (chand->saved_service_config_ != nullptr) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: resolver returned invalid service config. "
- "Continuing to use previous service config.",
- chand);
- }
- service_config = chand->saved_service_config_;
- } else if (chand->default_service_config_ != nullptr) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: resolver returned invalid service config. Using "
- "default service config provided by client API.",
- chand);
- }
- service_config = chand->default_service_config_;
- }
- } else if (result.service_config == nullptr) {
- if (chand->default_service_config_ != nullptr) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: resolver returned no service config. Using default "
- "service config provided by client API.",
- chand);
- }
- service_config = chand->default_service_config_;
- }
- } else {
- service_config = result.service_config;
- }
- *service_config_error = GRPC_ERROR_REF(result.service_config_error);
- if (service_config == nullptr &&
- result.service_config_error != GRPC_ERROR_NONE) {
- *no_valid_service_config = true;
- return false;
- }
- // Process service config.
- grpc_core::UniquePtr service_config_json;
- const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
- nullptr;
- if (service_config != nullptr) {
- parsed_service_config =
- static_cast(
- service_config->GetGlobalParsedConfig(
- internal::ClientChannelServiceConfigParser::ParserIndex()));
- }
- // Check if the config has changed.
- const bool service_config_changed =
- ((service_config == nullptr) !=
- (chand->saved_service_config_ == nullptr)) ||
- (service_config != nullptr &&
- service_config->json_string() !=
- chand->saved_service_config_->json_string());
- if (service_config_changed) {
- service_config_json.reset(gpr_strdup(
- service_config != nullptr ? service_config->json_string().c_str()
- : ""));
- if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
- gpr_log(GPR_INFO,
- "chand=%p: resolver returned updated service config: \"%s\"",
- chand, service_config_json.get());
- }
- // Save health check service name.
- if (service_config != nullptr) {
- chand->health_check_service_name_.reset(
- gpr_strdup(parsed_service_config->health_check_service_name()));
- } else {
- chand->health_check_service_name_.reset();
- }
- // Update health check service name used by existing subchannel wrappers.
- for (auto* subchannel_wrapper : chand->subchannel_wrappers_) {
- subchannel_wrapper->UpdateHealthCheckServiceName(
- grpc_core::UniquePtr(
- gpr_strdup(chand->health_check_service_name_.get())));
- }
- // Save service config.
- chand->saved_service_config_ = std::move(service_config);
- }
- // We want to set the service config at least once. This should not really be
- // needed, but we are doing it as a defensive approach. This can be removed,
- // if we feel it is unnecessary.
- if (service_config_changed || !chand->received_first_resolver_result_) {
- chand->received_first_resolver_result_ = true;
- RefCountedPtr retry_throttle_data;
- if (parsed_service_config != nullptr) {
- absl::optional
- retry_throttle_config = parsed_service_config->retry_throttling();
- if (retry_throttle_config.has_value()) {
- retry_throttle_data =
- internal::ServerRetryThrottleMap::GetDataForServer(
- chand->server_name_.get(),
- retry_throttle_config.value().max_milli_tokens,
- retry_throttle_config.value().milli_token_ratio);
- }
- }
- chand->UpdateServiceConfigLocked(std::move(retry_throttle_data),
- chand->saved_service_config_);
- }
- chand->ProcessLbPolicy(result, parsed_service_config, lb_policy_config);
- grpc_core::UniquePtr lb_policy_name(
- gpr_strdup((*lb_policy_config)->name()));
- // Swap out the data used by GetChannelInfo().
- {
- MutexLock lock(&chand->info_mu_);
- chand->info_lb_policy_name_ = std::move(lb_policy_name);
- if (service_config_json != nullptr) {
- chand->info_service_config_json_ = std::move(service_config_json);
- }
- }
- // Return results.
- return service_config_changed;
-}
-
grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
if (state_tracker_.state() != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
@@ -2807,6 +2885,7 @@ void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
calld->RetryCommit(elem, retry_state);
+ calld->MaybeInvokeConfigSelectorCommitCallback();
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
calld->InvokeRecvInitialMetadataCallback(batch_data, error);
@@ -2893,6 +2972,7 @@ void CallData::RecvMessageReady(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
calld->RetryCommit(elem, retry_state);
+ calld->MaybeInvokeConfigSelectorCommitCallback();
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
calld->InvokeRecvMessageCallback(batch_data, error);
@@ -3094,6 +3174,7 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
}
// Not retrying, so commit the call.
calld->RetryCommit(elem, retry_state);
+ calld->MaybeInvokeConfigSelectorCommitCallback();
// Run any necessary closures.
calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
}
@@ -3716,7 +3797,7 @@ class CallData::QueuedPickCanceller {
}
if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
// Remove pick from list of queued picks.
- calld->RemoveCallFromQueuedPicksLocked(self->elem_);
+ calld->MaybeRemoveCallFromQueuedPicksLocked(self->elem_);
// Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
YieldCallCombinerIfPendingBatchesFound);
@@ -3729,7 +3810,8 @@ class CallData::QueuedPickCanceller {
grpc_closure closure_;
};
-void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
+void CallData::MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
+ if (!pick_queued_) return;
auto* chand = static_cast(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
@@ -3741,7 +3823,8 @@ void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
pick_canceller_ = nullptr;
}
-void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
+void CallData::MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem) {
+ if (pick_queued_) return;
auto* chand = static_cast(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
@@ -3754,23 +3837,29 @@ void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
pick_canceller_ = new QueuedPickCanceller(elem);
}
-void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
+grpc_error* CallData::ApplyServiceConfigToCallLocked(
+ grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
ChannelData* chand = static_cast(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, this);
}
+ ConfigSelector* config_selector = chand->config_selector();
auto service_config = chand->service_config();
if (service_config != nullptr) {
+ // Use the ConfigSelector to determine the config for the call.
+ ConfigSelector::CallConfig call_config =
+ config_selector->GetCallConfig({&path_, initial_metadata});
+ if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
+ call_attributes_ = std::move(call_config.call_attributes);
+ on_call_committed_ = std::move(call_config.on_call_committed);
// Create a ServiceConfigCallData for the call. This stores a ref to the
// ServiceConfig and caches the right set of parsed configs to use for
// the call. The MethodConfig will store itself in the call context,
// so that it can be accessed by filters in the subchannel, and it
// will be cleaned up when the call ends.
- const auto* method_params_vector =
- service_config->GetMethodParsedConfigVector(path_);
auto* service_config_call_data = arena_->New(
- std::move(service_config), method_params_vector, call_context_);
+ std::move(service_config), call_config.method_configs, call_context_);
// Apply our own method params to the call.
method_params_ = static_cast(
service_config_call_data->GetMethodParsedConfig(
@@ -3812,16 +3901,13 @@ void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
enable_retries_ = false;
}
+ return GRPC_ERROR_NONE;
}
-void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
- ChannelData* chand = static_cast(elem->channel_data);
- // Apply service config data to the call only once, and only if the
- // channel has the data available.
- if (GPR_LIKELY(chand->received_service_config_data() &&
- !service_config_applied_)) {
- service_config_applied_ = true;
- ApplyServiceConfigToCallLocked(elem);
+void CallData::MaybeInvokeConfigSelectorCommitCallback() {
+ if (on_call_committed_ != nullptr) {
+ on_call_committed_();
+ on_call_committed_ = nullptr;
}
}
@@ -3882,11 +3968,45 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
GRPC_ERROR_NONE);
// Queue the pick, so that it will be attempted once the channel
// becomes connected.
- AddCallToQueuedPicksLocked(elem);
+ MaybeAddCallToQueuedPicksLocked(elem);
+ return false;
+ }
+ grpc_metadata_batch* initial_metadata_batch =
+ seen_send_initial_metadata_
+ ? &send_initial_metadata_
+ : pending_batches_[0]
+ .batch->payload->send_initial_metadata.send_initial_metadata;
+ // Grab initial metadata flags so that we can check later if the call has
+ // wait_for_ready enabled.
+ const uint32_t send_initial_metadata_flags =
+ seen_send_initial_metadata_ ? send_initial_metadata_flags_
+ : pending_batches_[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ // Avoid picking if we haven't yet received service config data.
+ if (GPR_UNLIKELY(!chand->received_service_config_data())) {
+ // If the resolver returned transient failure before returning the
+ // first service config, fail any non-wait_for_ready calls.
+ grpc_error* resolver_error = chand->resolver_transient_failure_error();
+ if (resolver_error != GRPC_ERROR_NONE &&
+ (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
+ 0) {
+ MaybeRemoveCallFromQueuedPicksLocked(elem);
+ *error = GRPC_ERROR_REF(resolver_error);
+ return true;
+ }
+ // Either the resolver has not yet returned a result, or it has
+ // returned transient failure but the call is wait_for_ready. In
+ // either case, queue the call.
+ MaybeAddCallToQueuedPicksLocked(elem);
return false;
}
- // Apply service config to call if needed.
- MaybeApplyServiceConfigToCallLocked(elem);
+ // Apply service config to call if not yet applied.
+ if (GPR_LIKELY(!service_config_applied_)) {
+ service_config_applied_ = true;
+ *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
+ if (*error != GRPC_ERROR_NONE) return true;
+ }
// If this is a retry, use the send_initial_metadata payload that
// we've cached; otherwise, use the pending batch. The
// send_initial_metadata batch will be the first pending batch in the
@@ -3899,20 +4019,8 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
// attempt) to the LB policy instead the one from the parent channel.
LoadBalancingPolicy::PickArgs pick_args;
pick_args.call_state = &lb_call_state_;
- Metadata initial_metadata(
- this,
- seen_send_initial_metadata_
- ? &send_initial_metadata_
- : pending_batches_[0]
- .batch->payload->send_initial_metadata.send_initial_metadata);
+ Metadata initial_metadata(this, initial_metadata_batch);
pick_args.initial_metadata = &initial_metadata;
- // Grab initial metadata flags so that we can check later if the call has
- // wait_for_ready enabled.
- const uint32_t send_initial_metadata_flags =
- seen_send_initial_metadata_ ? send_initial_metadata_flags_
- : pending_batches_[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
// Attempt pick.
auto result = chand->picker()->Pick(pick_args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@@ -3927,7 +4035,8 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
grpc_error* disconnect_error = chand->disconnect_error();
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(result.error);
- if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
+ MaybeRemoveCallFromQueuedPicksLocked(elem);
+ MaybeInvokeConfigSelectorCommitCallback();
*error = GRPC_ERROR_REF(disconnect_error);
return true;
}
@@ -3948,8 +4057,9 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
"Failed to pick subchannel", &result.error, 1);
GRPC_ERROR_UNREF(result.error);
*error = new_error;
+ MaybeInvokeConfigSelectorCommitCallback();
}
- if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
+ MaybeRemoveCallFromQueuedPicksLocked(elem);
return !retried;
}
// If wait_for_ready is true, then queue to retry when we get a new
@@ -3958,22 +4068,24 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
}
// Fallthrough
case LoadBalancingPolicy::PickResult::PICK_QUEUE:
- if (!pick_queued_) AddCallToQueuedPicksLocked(elem);
+ MaybeAddCallToQueuedPicksLocked(elem);
return false;
default: // PICK_COMPLETE
- if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
+ MaybeRemoveCallFromQueuedPicksLocked(elem);
// Handle drops.
if (GPR_UNLIKELY(result.subchannel == nullptr)) {
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ MaybeInvokeConfigSelectorCommitCallback();
} else {
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
connected_subchannel_ =
chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
GPR_ASSERT(connected_subchannel_ != nullptr);
+ if (retry_committed_) MaybeInvokeConfigSelectorCommitCallback();
}
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
*error = result.error;
diff --git a/src/core/ext/filters/client_channel/config_selector.cc b/src/core/ext/filters/client_channel/config_selector.cc
new file mode 100644
index 00000000000..e5d2a3f0be3
--- /dev/null
+++ b/src/core/ext/filters/client_channel/config_selector.cc
@@ -0,0 +1,62 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include
+
+#include "src/core/ext/filters/client_channel/config_selector.h"
+
+#include "src/core/lib/channel/channel_args.h"
+
+// Channel arg key for ConfigSelector.
+#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector"
+
+namespace grpc_core {
+
+namespace {
+
+void* ConfigSelectorArgCopy(void* p) {
+ ConfigSelector* config_selector = static_cast(p);
+ config_selector->Ref().release();
+ return p;
+}
+
+void ConfigSelectorArgDestroy(void* p) {
+ ConfigSelector* config_selector = static_cast(p);
+ config_selector->Unref();
+}
+
+int ConfigSelectorArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
+
+const grpc_arg_pointer_vtable kChannelArgVtable = {
+ ConfigSelectorArgCopy, ConfigSelectorArgDestroy, ConfigSelectorArgCmp};
+
+} // namespace
+
+grpc_arg ConfigSelector::MakeChannelArg() const {
+ return grpc_channel_arg_pointer_create(
+ const_cast(GRPC_ARG_CONFIG_SELECTOR),
+ const_cast(this), &kChannelArgVtable);
+}
+
+RefCountedPtr ConfigSelector::GetFromChannelArgs(
+ const grpc_channel_args& args) {
+ ConfigSelector* config_selector =
+ grpc_channel_args_find_pointer(&args,
+ GRPC_ARG_CONFIG_SELECTOR);
+ return config_selector != nullptr ? config_selector->Ref() : nullptr;
+}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h
new file mode 100644
index 00000000000..efd27341fa5
--- /dev/null
+++ b/src/core/ext/filters/client_channel/config_selector.h
@@ -0,0 +1,91 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H
+
+#include
+
+#include
+#include