Merge pull request #23051 from markdroth/xds_config_selector2

Add ConfigSelector API.
pull/23267/head
Mark D. Roth 5 years ago committed by GitHub
commit 4d541fc30a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 4
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 588
      src/core/ext/filters/client_channel/client_channel.cc
  14. 62
      src/core/ext/filters/client_channel/config_selector.cc
  15. 91
      src/core/ext/filters/client_channel/config_selector.h
  16. 59
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  17. 57
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  18. 1
      src/python/grpcio/grpc_core_dependencies.py
  19. 2
      tools/doxygen/Doxyfile.c++.internal
  20. 2
      tools/doxygen/Doxyfile.core.internal

@ -1028,6 +1028,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel_channelz.cc",
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/config_selector.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/health/health_check_client.cc",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",
@ -1056,6 +1057,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_channelz.h",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",
"src/core/ext/filters/client_channel/health/health_check_client.h",

@ -211,6 +211,8 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/config_selector.cc",
"src/core/ext/filters/client_channel/config_selector.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/global_subchannel_pool.cc",
"src/core/ext/filters/client_channel/global_subchannel_pool.h",

@ -1328,6 +1328,7 @@ add_library(grpc
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/config_selector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -2000,6 +2001,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/client_channel_channelz.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/config_selector.cc
src/core/ext/filters/client_channel/global_subchannel_pool.cc
src/core/ext/filters/client_channel/health/health_check_client.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc

@ -3630,6 +3630,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
@ -4276,6 +4277,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -377,6 +377,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel.h
- src/core/ext/filters/client_channel/client_channel_channelz.h
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
@ -746,6 +747,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_channelz.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/config_selector.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
@ -1301,6 +1303,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel.h
- src/core/ext/filters/client_channel/client_channel_channelz.h
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/config_selector.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
@ -1606,6 +1609,7 @@ libs:
- src/core/ext/filters/client_channel/client_channel_channelz.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/config_selector.cc
- src/core/ext/filters/client_channel/global_subchannel_pool.cc
- src/core/ext/filters/client_channel/health/health_check_client.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc

@ -45,6 +45,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/client_channel_channelz.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -14,6 +14,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\client_channel_channelz.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " +
"src\\core\\ext\\filters\\client_channel\\config_selector.cc " +
"src\\core\\ext\\filters\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\health\\health_check_client.cc " +
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +

@ -227,6 +227,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_channelz.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
@ -687,6 +688,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_channelz.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',

@ -195,6 +195,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
@ -1052,6 +1054,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_channelz.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/config_selector.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',

@ -117,6 +117,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc )
s.files += %w( src/core/ext/filters/client_channel/config_selector.cc )
s.files += %w( src/core/ext/filters/client_channel/config_selector.h )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.cc )
s.files += %w( src/core/ext/filters/client_channel/global_subchannel_pool.h )

@ -440,6 +440,7 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
@ -948,6 +949,7 @@
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',

@ -97,6 +97,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/config_selector.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/config_selector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/global_subchannel_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/global_subchannel_pool.h" role="src" />

@ -40,6 +40,7 @@
#include "src/core/ext/filters/client_channel/backend_metric.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
@ -149,12 +150,16 @@ class ChannelData {
bool received_service_config_data() const {
return received_service_config_data_;
}
grpc_error* resolver_transient_failure_error() const {
return resolver_transient_failure_error_;
}
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
return retry_throttle_data_;
}
RefCountedPtr<ServiceConfig> service_config() const {
return service_config_;
}
ConfigSelector* config_selector() const { return config_selector_.get(); }
WorkSerializer* work_serializer() const { return work_serializer_.get(); }
RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
@ -234,6 +239,29 @@ class ChannelData {
Atomic<bool> done_{false};
};
class ChannelConfigHelper
: public ResolvingLoadBalancingPolicy::ChannelConfigHelper {
public:
explicit ChannelConfigHelper(ChannelData* chand) : chand_(chand) {}
ApplyServiceConfigResult ApplyServiceConfig(
const Resolver::Result& result) override;
void ApplyConfigSelector(
bool service_config_changed,
RefCountedPtr<ConfigSelector> config_selector) override;
void ResolverTransientFailure(grpc_error* error) override;
private:
static void ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
ChannelData* chand_;
};
ChannelData(grpc_channel_element_args* args, grpc_error** error);
~ChannelData();
@ -241,30 +269,20 @@ class ChannelData {
grpc_connectivity_state state, const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker);
void UpdateServiceConfigLocked(
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
RefCountedPtr<ServiceConfig> service_config);
void UpdateServiceConfigInDataPlaneLocked(
bool service_config_changed,
RefCountedPtr<ConfigSelector> config_selector);
void CreateResolvingLoadBalancingPolicyLocked();
void DestroyResolvingLoadBalancingPolicyLocked();
static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error, bool* no_valid_service_config);
grpc_error* DoPingLocked(grpc_transport_op* op);
void StartTransportOpLocked(grpc_transport_op* op);
void TryToConnectLocked();
void ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
//
// Fields set at construction and never modified.
//
@ -278,6 +296,7 @@ class ChannelData {
grpc_core::UniquePtr<char> server_name_;
grpc_core::UniquePtr<char> target_uri_;
channelz::ChannelNode* channelz_node_;
ChannelConfigHelper channel_config_helper_;
//
// Fields used in the data plane. Guarded by data_plane_mu.
@ -286,9 +305,11 @@ class ChannelData {
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_;
QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
// Data from service config.
grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
bool received_service_config_data_ = false;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<ServiceConfig> service_config_;
RefCountedPtr<ConfigSelector> config_selector_;
//
// Fields used in the control plane. Guarded by work_serializer.
@ -300,6 +321,7 @@ class ChannelData {
ConnectivityStateTracker state_tracker_;
grpc_core::UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
RefCountedPtr<ConfigSelector> saved_config_selector_;
bool received_first_resolver_result_ = false;
// The number of SubchannelWrapper instances referencing a given Subchannel.
std::map<Subchannel*, int> subchannel_refcount_map_;
@ -352,9 +374,6 @@ class CallData {
RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
// Invoked by channel for queued picks once resolver results are available.
void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
// Invoked by channel for queued picks when the picker is updated.
static void PickSubchannel(void* arg, grpc_error* error);
@ -742,13 +761,17 @@ class CallData {
void CreateSubchannelCall(grpc_call_element* elem);
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error* error);
// Removes the call from the channel's list of queued picks.
void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
// Adds the call to the channel's list of queued picks.
void AddCallToQueuedPicksLocked(grpc_call_element* elem);
// Removes the call from the channel's list of queued picks if present.
void MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
// Adds the call to the channel's list of queued picks if not already present.
void MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem);
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
// If an error is returned, the error indicates the status with which
// the call should be failed.
grpc_error* ApplyServiceConfigToCallLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
void MaybeInvokeConfigSelectorCommitCallback();
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
@ -769,6 +792,7 @@ class CallData {
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
const ClientChannelMethodParsedConfig* method_params_ = nullptr;
std::map<const char*, absl::string_view> call_attributes_;
std::function<void()> on_call_committed_;
RefCountedPtr<SubchannelCall> subchannel_call_;
@ -1335,6 +1359,180 @@ class ChannelData::ClientChannelControlHelper
ChannelData* chand_;
};
//
// ChannelData::ChannelConfigHelper
//
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
ChannelData::ChannelConfigHelper::ApplyServiceConfigResult
ChannelData::ChannelConfigHelper::ApplyServiceConfig(
const Resolver::Result& result) {
ApplyServiceConfigResult service_config_result;
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config.
if (result.service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then fallback to the saved service
// config. If there is no saved config either, use the default service
// config.
if (chand_->saved_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. "
"Continuing to use previous service config.",
chand_);
}
service_config = chand_->saved_service_config_;
} else if (chand_->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. Using "
"default service config provided by client API.",
chand_);
}
service_config = chand_->default_service_config_;
}
} else if (result.service_config == nullptr) {
if (chand_->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned no service config. Using default "
"service config provided by client API.",
chand_);
}
service_config = chand_->default_service_config_;
}
} else {
service_config = result.service_config;
}
service_config_result.service_config_error =
GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) {
service_config_result.no_valid_service_config = true;
return service_config_result;
}
// Process service config.
grpc_core::UniquePtr<char> service_config_json;
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
nullptr;
if (service_config != nullptr) {
parsed_service_config =
static_cast<const internal::ClientChannelGlobalParsedConfig*>(
service_config->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
// Check if the config has changed.
service_config_result.service_config_changed =
((service_config == nullptr) !=
(chand_->saved_service_config_ == nullptr)) ||
(service_config != nullptr &&
service_config->json_string() !=
chand_->saved_service_config_->json_string());
if (service_config_result.service_config_changed) {
service_config_json.reset(gpr_strdup(
service_config != nullptr ? service_config->json_string().c_str()
: ""));
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned updated service config: \"%s\"",
chand_, service_config_json.get());
}
// Save health check service name.
if (service_config != nullptr) {
chand_->health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
} else {
chand_->health_check_service_name_.reset();
}
// Update health check service name used by existing subchannel wrappers.
for (auto* subchannel_wrapper : chand_->subchannel_wrappers_) {
subchannel_wrapper->UpdateHealthCheckServiceName(
grpc_core::UniquePtr<char>(
gpr_strdup(chand_->health_check_service_name_.get())));
}
// Save service config.
chand_->saved_service_config_ = std::move(service_config);
}
// Find LB policy config.
ProcessLbPolicy(result, parsed_service_config,
&service_config_result.lb_policy_config);
grpc_core::UniquePtr<char> lb_policy_name(
gpr_strdup((service_config_result.lb_policy_config)->name()));
// Swap out the data used by GetChannelInfo().
{
MutexLock lock(&chand_->info_mu_);
chand_->info_lb_policy_name_ = std::move(lb_policy_name);
if (service_config_json != nullptr) {
chand_->info_service_config_json_ = std::move(service_config_json);
}
}
// Return results.
return service_config_result;
}
void ChannelData::ChannelConfigHelper::ApplyConfigSelector(
bool service_config_changed,
RefCountedPtr<ConfigSelector> config_selector) {
chand_->UpdateServiceConfigInDataPlaneLocked(service_config_changed,
std::move(config_selector));
}
void ChannelData::ChannelConfigHelper::ResolverTransientFailure(
grpc_error* error) {
MutexLock lock(&chand_->data_plane_mu_);
GRPC_ERROR_UNREF(chand_->resolver_transient_failure_error_);
chand_->resolver_transient_failure_error_ = error;
}
void ChannelData::ChannelConfigHelper::ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
// Prefer the LB policy config found in the service config.
if (parsed_service_config != nullptr &&
parsed_service_config->parsed_lb_config() != nullptr) {
*lb_policy_config = parsed_service_config->parsed_lb_config();
return;
}
// Try the deprecated LB policy name from the service config.
// If not, try the setting from channel args.
const char* policy_name = nullptr;
if (parsed_service_config != nullptr &&
!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
} else {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
policy_name = grpc_channel_arg_get_string(channel_arg);
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (policy_name == nullptr) policy_name = "pick_first";
// Now that we have the policy name, construct an empty config for it.
Json config_json = Json::Array{Json::Object{
{policy_name, Json::Object{}},
}};
grpc_error* parse_error = GRPC_ERROR_NONE;
*lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
config_json, &parse_error);
// The policy name came from one of three places:
// - The deprecated loadBalancingPolicy field in the service config,
// in which case the code in ClientChannelServiceConfigParser
// already verified that the policy does not require a config.
// - One of the hard-coded values here, all of which are known to not
// require a config.
// - A channel arg, in which case the application did something that
// is a misuse of our API.
// In the first two cases, these assertions will always be true. In
// the last case, this is probably fine for now.
// TODO(roth): If the last case becomes a problem, add better error
// handling here.
GPR_ASSERT(*lb_policy_config != nullptr);
GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
}
//
// ChannelData implementation
//
@ -1393,6 +1591,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
channelz_node_(GetChannelzNode(args->channel_args)),
channel_config_helper_(this),
work_serializer_(std::make_shared<WorkSerializer>()),
interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
@ -1461,6 +1660,7 @@ ChannelData::~ChannelData() {
}
DestroyResolvingLoadBalancingPolicyLocked();
grpc_channel_args_destroy(channel_args_);
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
@ -1475,6 +1675,7 @@ void ChannelData::UpdateStateAndPickerLocked(
if (picker_ == nullptr) {
health_check_service_name_.reset();
saved_service_config_.reset();
saved_config_selector_.reset();
received_first_resolver_result_ = false;
}
// Update connectivity state.
@ -1497,9 +1698,11 @@ void ChannelData::UpdateStateAndPickerLocked(
// - refs to subchannel wrappers in the keys of pending_subchannel_updates_
// - ref stored in retry_throttle_data_
// - ref stored in service_config_
// - ref stored in config_selector_
// - ownership of the existing picker in picker_
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_to_unref;
RefCountedPtr<ServiceConfig> service_config_to_unref;
RefCountedPtr<ConfigSelector> config_selector_to_unref;
{
MutexLock lock(&data_plane_mu_);
// Handle subchannel updates.
@ -1524,6 +1727,7 @@ void ChannelData::UpdateStateAndPickerLocked(
// Note: We save the objects to unref until after the lock is released.
retry_throttle_data_to_unref = std::move(retry_throttle_data_);
service_config_to_unref = std::move(service_config_);
config_selector_to_unref = std::move(config_selector_);
}
// Re-process queued picks.
for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
@ -1540,24 +1744,72 @@ void ChannelData::UpdateStateAndPickerLocked(
pending_subchannel_updates_.clear();
}
void ChannelData::UpdateServiceConfigLocked(
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
RefCountedPtr<ServiceConfig> service_config) {
void ChannelData::UpdateServiceConfigInDataPlaneLocked(
bool service_config_changed,
RefCountedPtr<ConfigSelector> config_selector) {
// Check if ConfigSelector has changed.
const bool config_selector_changed =
saved_config_selector_ != config_selector;
saved_config_selector_ = config_selector;
// We want to set the service config at least once, even if the
// resolver does not return a config, because that ensures that we
// disable retries if they are not enabled in the service config.
// TODO(roth): Consider removing the received_first_resolver_result_ check
// when we implement transparent retries.
if (!service_config_changed && !config_selector_changed &&
received_first_resolver_result_) {
return;
}
received_first_resolver_result_ = true;
// Get retry throttle data from service config.
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
if (saved_service_config_ != nullptr) {
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
static_cast<const internal::ClientChannelGlobalParsedConfig*>(
saved_service_config_->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex()));
if (parsed_service_config != nullptr) {
absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
retry_throttle_config = parsed_service_config->retry_throttling();
if (retry_throttle_config.has_value()) {
retry_throttle_data =
internal::ServerRetryThrottleMap::GetDataForServer(
server_name_.get(),
retry_throttle_config.value().max_milli_tokens,
retry_throttle_config.value().milli_token_ratio);
}
}
}
// Create default config selector if not provided by resolver.
if (config_selector == nullptr) {
config_selector =
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Grab data plane lock to update service config.
//
// We defer unreffing the old values (and deallocating memory) until
// after releasing the lock to keep the critical section small.
RefCountedPtr<ServiceConfig> service_config_to_unref = saved_service_config_;
RefCountedPtr<ConfigSelector> config_selector_to_unref =
std::move(config_selector);
{
MutexLock lock(&data_plane_mu_);
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
resolver_transient_failure_error_ = GRPC_ERROR_NONE;
// Update service config.
received_service_config_data_ = true;
// Old values will be unreffed after lock is released.
retry_throttle_data_.swap(retry_throttle_data);
service_config_.swap(service_config);
// Apply service config to queued picks.
service_config_.swap(service_config_to_unref);
config_selector_.swap(config_selector_to_unref);
// Re-process queued picks.
for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
CallData* calld = static_cast<CallData*>(pick->elem->call_data);
calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
grpc_call_element* elem = pick->elem;
CallData* calld = static_cast<CallData*>(elem->call_data);
grpc_error* error = GRPC_ERROR_NONE;
if (calld->PickSubchannelLocked(elem, &error)) {
calld->AsyncPickDone(elem, error);
}
}
}
// Old values will be unreffed after lock is released when they go out
@ -1574,7 +1826,7 @@ void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
grpc_core::UniquePtr<char> target_uri(gpr_strdup(target_uri_.get()));
resolving_lb_policy_.reset(new ResolvingLoadBalancingPolicy(
std::move(lb_args), &grpc_client_channel_routing_trace,
std::move(target_uri), ProcessResolverResultLocked, this));
std::move(target_uri), &channel_config_helper_));
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@ -1591,180 +1843,6 @@ void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
}
}
void ChannelData::ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
// Prefer the LB policy config found in the service config.
if (parsed_service_config != nullptr &&
parsed_service_config->parsed_lb_config() != nullptr) {
*lb_policy_config = parsed_service_config->parsed_lb_config();
return;
}
// Try the deprecated LB policy name from the service config.
// If not, try the setting from channel args.
const char* policy_name = nullptr;
if (parsed_service_config != nullptr &&
!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
} else {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
policy_name = grpc_channel_arg_get_string(channel_arg);
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (policy_name == nullptr) policy_name = "pick_first";
// Now that we have the policy name, construct an empty config for it.
Json config_json = Json::Array{Json::Object{
{policy_name, Json::Object{}},
}};
grpc_error* parse_error = GRPC_ERROR_NONE;
*lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
config_json, &parse_error);
// The policy name came from one of three places:
// - The deprecated loadBalancingPolicy field in the service config,
// in which case the code in ClientChannelServiceConfigParser
// already verified that the policy does not require a config.
// - One of the hard-coded values here, all of which are known to not
// require a config.
// - A channel arg, in which case the application did something that
// is a misuse of our API.
// In the first two cases, these assertions will always be true. In
// the last case, this is probably fine for now.
// TODO(roth): If the last case becomes a problem, add better error
// handling here.
GPR_ASSERT(*lb_policy_config != nullptr);
GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
}
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error, bool* no_valid_service_config) {
ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config.
if (result.service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then fallback to the saved service
// config. If there is no saved config either, use the default service
// config.
if (chand->saved_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. "
"Continuing to use previous service config.",
chand);
}
service_config = chand->saved_service_config_;
} else if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. Using "
"default service config provided by client API.",
chand);
}
service_config = chand->default_service_config_;
}
} else if (result.service_config == nullptr) {
if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned no service config. Using default "
"service config provided by client API.",
chand);
}
service_config = chand->default_service_config_;
}
} else {
service_config = result.service_config;
}
*service_config_error = GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) {
*no_valid_service_config = true;
return false;
}
// Process service config.
grpc_core::UniquePtr<char> service_config_json;
const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
nullptr;
if (service_config != nullptr) {
parsed_service_config =
static_cast<const internal::ClientChannelGlobalParsedConfig*>(
service_config->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
// Check if the config has changed.
const bool service_config_changed =
((service_config == nullptr) !=
(chand->saved_service_config_ == nullptr)) ||
(service_config != nullptr &&
service_config->json_string() !=
chand->saved_service_config_->json_string());
if (service_config_changed) {
service_config_json.reset(gpr_strdup(
service_config != nullptr ? service_config->json_string().c_str()
: ""));
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned updated service config: \"%s\"",
chand, service_config_json.get());
}
// Save health check service name.
if (service_config != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
} else {
chand->health_check_service_name_.reset();
}
// Update health check service name used by existing subchannel wrappers.
for (auto* subchannel_wrapper : chand->subchannel_wrappers_) {
subchannel_wrapper->UpdateHealthCheckServiceName(
grpc_core::UniquePtr<char>(
gpr_strdup(chand->health_check_service_name_.get())));
}
// Save service config.
chand->saved_service_config_ = std::move(service_config);
}
// We want to set the service config at least once. This should not really be
// needed, but we are doing it as a defensive approach. This can be removed,
// if we feel it is unnecessary.
if (service_config_changed || !chand->received_first_resolver_result_) {
chand->received_first_resolver_result_ = true;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
if (parsed_service_config != nullptr) {
absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
retry_throttle_config = parsed_service_config->retry_throttling();
if (retry_throttle_config.has_value()) {
retry_throttle_data =
internal::ServerRetryThrottleMap::GetDataForServer(
chand->server_name_.get(),
retry_throttle_config.value().max_milli_tokens,
retry_throttle_config.value().milli_token_ratio);
}
}
chand->UpdateServiceConfigLocked(std::move(retry_throttle_data),
chand->saved_service_config_);
}
chand->ProcessLbPolicy(result, parsed_service_config, lb_policy_config);
grpc_core::UniquePtr<char> lb_policy_name(
gpr_strdup((*lb_policy_config)->name()));
// Swap out the data used by GetChannelInfo().
{
MutexLock lock(&chand->info_mu_);
chand->info_lb_policy_name_ = std::move(lb_policy_name);
if (service_config_json != nullptr) {
chand->info_service_config_json_ = std::move(service_config_json);
}
}
// Return results.
return service_config_changed;
}
grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
if (state_tracker_.state() != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
@ -2807,6 +2885,7 @@ void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
}
// Received valid initial metadata, so commit the call.
calld->RetryCommit(elem, retry_state);
calld->MaybeInvokeConfigSelectorCommitCallback();
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
calld->InvokeRecvInitialMetadataCallback(batch_data, error);
@ -2893,6 +2972,7 @@ void CallData::RecvMessageReady(void* arg, grpc_error* error) {
}
// Received a valid message, so commit the call.
calld->RetryCommit(elem, retry_state);
calld->MaybeInvokeConfigSelectorCommitCallback();
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
calld->InvokeRecvMessageCallback(batch_data, error);
@ -3094,6 +3174,7 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
}
// Not retrying, so commit the call.
calld->RetryCommit(elem, retry_state);
calld->MaybeInvokeConfigSelectorCommitCallback();
// Run any necessary closures.
calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
}
@ -3716,7 +3797,7 @@ class CallData::QueuedPickCanceller {
}
if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
// Remove pick from list of queued picks.
calld->RemoveCallFromQueuedPicksLocked(self->elem_);
calld->MaybeRemoveCallFromQueuedPicksLocked(self->elem_);
// Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
YieldCallCombinerIfPendingBatchesFound);
@ -3729,7 +3810,8 @@ class CallData::QueuedPickCanceller {
grpc_closure closure_;
};
void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
void CallData::MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
if (!pick_queued_) return;
auto* chand = static_cast<ChannelData*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
@ -3741,7 +3823,8 @@ void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
pick_canceller_ = nullptr;
}
void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
void CallData::MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem) {
if (pick_queued_) return;
auto* chand = static_cast<ChannelData*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
@ -3754,23 +3837,29 @@ void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
pick_canceller_ = new QueuedPickCanceller(elem);
}
void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
grpc_error* CallData::ApplyServiceConfigToCallLocked(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, this);
}
ConfigSelector* config_selector = chand->config_selector();
auto service_config = chand->service_config();
if (service_config != nullptr) {
// Use the ConfigSelector to determine the config for the call.
ConfigSelector::CallConfig call_config =
config_selector->GetCallConfig({&path_, initial_metadata});
if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
call_attributes_ = std::move(call_config.call_attributes);
on_call_committed_ = std::move(call_config.on_call_committed);
// Create a ServiceConfigCallData for the call. This stores a ref to the
// ServiceConfig and caches the right set of parsed configs to use for
// the call. The MethodConfig will store itself in the call context,
// so that it can be accessed by filters in the subchannel, and it
// will be cleaned up when the call ends.
const auto* method_params_vector =
service_config->GetMethodParsedConfigVector(path_);
auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
std::move(service_config), method_params_vector, call_context_);
std::move(service_config), call_config.method_configs, call_context_);
// Apply our own method params to the call.
method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
service_config_call_data->GetMethodParsedConfig(
@ -3812,16 +3901,13 @@ void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
enable_retries_ = false;
}
return GRPC_ERROR_NONE;
}
void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Apply service config data to the call only once, and only if the
// channel has the data available.
if (GPR_LIKELY(chand->received_service_config_data() &&
!service_config_applied_)) {
service_config_applied_ = true;
ApplyServiceConfigToCallLocked(elem);
void CallData::MaybeInvokeConfigSelectorCommitCallback() {
if (on_call_committed_ != nullptr) {
on_call_committed_();
on_call_committed_ = nullptr;
}
}
@ -3882,11 +3968,45 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
GRPC_ERROR_NONE);
// Queue the pick, so that it will be attempted once the channel
// becomes connected.
AddCallToQueuedPicksLocked(elem);
MaybeAddCallToQueuedPicksLocked(elem);
return false;
}
grpc_metadata_batch* initial_metadata_batch =
seen_send_initial_metadata_
? &send_initial_metadata_
: pending_batches_[0]
.batch->payload->send_initial_metadata.send_initial_metadata;
// Grab initial metadata flags so that we can check later if the call has
// wait_for_ready enabled.
const uint32_t send_initial_metadata_flags =
seen_send_initial_metadata_ ? send_initial_metadata_flags_
: pending_batches_[0]
.batch->payload->send_initial_metadata
.send_initial_metadata_flags;
// Avoid picking if we haven't yet received service config data.
if (GPR_UNLIKELY(!chand->received_service_config_data())) {
// If the resolver returned transient failure before returning the
// first service config, fail any non-wait_for_ready calls.
grpc_error* resolver_error = chand->resolver_transient_failure_error();
if (resolver_error != GRPC_ERROR_NONE &&
(send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
0) {
MaybeRemoveCallFromQueuedPicksLocked(elem);
*error = GRPC_ERROR_REF(resolver_error);
return true;
}
// Either the resolver has not yet returned a result, or it has
// returned transient failure but the call is wait_for_ready. In
// either case, queue the call.
MaybeAddCallToQueuedPicksLocked(elem);
return false;
}
// Apply service config to call if needed.
MaybeApplyServiceConfigToCallLocked(elem);
// Apply service config to call if not yet applied.
if (GPR_LIKELY(!service_config_applied_)) {
service_config_applied_ = true;
*error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
if (*error != GRPC_ERROR_NONE) return true;
}
// If this is a retry, use the send_initial_metadata payload that
// we've cached; otherwise, use the pending batch. The
// send_initial_metadata batch will be the first pending batch in the
@ -3899,20 +4019,8 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
// attempt) to the LB policy instead the one from the parent channel.
LoadBalancingPolicy::PickArgs pick_args;
pick_args.call_state = &lb_call_state_;
Metadata initial_metadata(
this,
seen_send_initial_metadata_
? &send_initial_metadata_
: pending_batches_[0]
.batch->payload->send_initial_metadata.send_initial_metadata);
Metadata initial_metadata(this, initial_metadata_batch);
pick_args.initial_metadata = &initial_metadata;
// Grab initial metadata flags so that we can check later if the call has
// wait_for_ready enabled.
const uint32_t send_initial_metadata_flags =
seen_send_initial_metadata_ ? send_initial_metadata_flags_
: pending_batches_[0]
.batch->payload->send_initial_metadata
.send_initial_metadata_flags;
// Attempt pick.
auto result = chand->picker()->Pick(pick_args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@ -3927,7 +4035,8 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
grpc_error* disconnect_error = chand->disconnect_error();
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(result.error);
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
MaybeRemoveCallFromQueuedPicksLocked(elem);
MaybeInvokeConfigSelectorCommitCallback();
*error = GRPC_ERROR_REF(disconnect_error);
return true;
}
@ -3948,8 +4057,9 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
"Failed to pick subchannel", &result.error, 1);
GRPC_ERROR_UNREF(result.error);
*error = new_error;
MaybeInvokeConfigSelectorCommitCallback();
}
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
MaybeRemoveCallFromQueuedPicksLocked(elem);
return !retried;
}
// If wait_for_ready is true, then queue to retry when we get a new
@ -3958,22 +4068,24 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
}
// Fallthrough
case LoadBalancingPolicy::PickResult::PICK_QUEUE:
if (!pick_queued_) AddCallToQueuedPicksLocked(elem);
MaybeAddCallToQueuedPicksLocked(elem);
return false;
default: // PICK_COMPLETE
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
MaybeRemoveCallFromQueuedPicksLocked(elem);
// Handle drops.
if (GPR_UNLIKELY(result.subchannel == nullptr)) {
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
MaybeInvokeConfigSelectorCommitCallback();
} else {
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
connected_subchannel_ =
chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
GPR_ASSERT(connected_subchannel_ != nullptr);
if (retry_committed_) MaybeInvokeConfigSelectorCommitCallback();
}
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
*error = result.error;

@ -0,0 +1,62 @@
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/lib/channel/channel_args.h"
// Channel arg key for ConfigSelector.
#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector"
namespace grpc_core {
namespace {
void* ConfigSelectorArgCopy(void* p) {
ConfigSelector* config_selector = static_cast<ConfigSelector*>(p);
config_selector->Ref().release();
return p;
}
void ConfigSelectorArgDestroy(void* p) {
ConfigSelector* config_selector = static_cast<ConfigSelector*>(p);
config_selector->Unref();
}
int ConfigSelectorArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
const grpc_arg_pointer_vtable kChannelArgVtable = {
ConfigSelectorArgCopy, ConfigSelectorArgDestroy, ConfigSelectorArgCmp};
} // namespace
grpc_arg ConfigSelector::MakeChannelArg() const {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CONFIG_SELECTOR),
const_cast<ConfigSelector*>(this), &kChannelArgVtable);
}
RefCountedPtr<ConfigSelector> ConfigSelector::GetFromChannelArgs(
const grpc_channel_args& args) {
ConfigSelector* config_selector =
grpc_channel_args_find_pointer<ConfigSelector>(&args,
GRPC_ARG_CONFIG_SELECTOR);
return config_selector != nullptr ? config_selector->Ref() : nullptr;
}
} // namespace grpc_core

@ -0,0 +1,91 @@
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H
#include <grpc/support/port_platform.h>
#include <functional>
#include <map>
#include "absl/strings/string_view.h"
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/codegen/slice.h>
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/service_config_parser.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
// Internal API used to allow resolver implementations to override
// MethodConfig and provide input to LB policies on a per-call basis.
class ConfigSelector : public RefCounted<ConfigSelector> {
public:
struct GetCallConfigArgs {
grpc_slice* path;
grpc_metadata_batch* initial_metadata;
};
struct CallConfig {
// Can be set to indicate the call should be failed.
grpc_error* error = GRPC_ERROR_NONE;
// The per-method parsed configs that will be passed to
// ServiceConfigCallData.
const ServiceConfigParser::ParsedConfigVector* method_configs = nullptr;
// Call attributes that will be accessible to LB policy implementations.
std::map<const char*, absl::string_view> call_attributes;
// A callback that, if set, will be invoked when the call is
// committed (i.e., when we know that we will never again need to
// ask the picker for a subchannel for this call).
std::function<void()> on_call_committed;
};
virtual ~ConfigSelector() = default;
virtual CallConfig GetCallConfig(GetCallConfigArgs args) = 0;
grpc_arg MakeChannelArg() const;
static RefCountedPtr<ConfigSelector> GetFromChannelArgs(
const grpc_channel_args& args);
};
// Default ConfigSelector that gets the MethodConfig from the service config.
class DefaultConfigSelector : public ConfigSelector {
public:
explicit DefaultConfigSelector(RefCountedPtr<ServiceConfig> service_config)
: service_config_(std::move(service_config)) {}
CallConfig GetCallConfig(GetCallConfigArgs args) override {
CallConfig call_config;
if (service_config_ != nullptr) {
call_config.method_configs =
service_config_->GetMethodParsedConfigVector(*args.path);
}
return call_config;
}
private:
RefCountedPtr<ServiceConfig> service_config_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONFIG_SELECTOR_H */

@ -145,14 +145,12 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
ProcessResolverResultCallback process_resolver_result,
void* process_resolver_result_user_data)
ChannelConfigHelper* helper)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
target_uri_(std::move(target_uri)),
process_resolver_result_(process_resolver_result),
process_resolver_result_user_data_(process_resolver_result_user_data) {
GPR_ASSERT(process_resolver_result != nullptr);
helper_(helper) {
GPR_ASSERT(helper_ != nullptr);
resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), args.args, interested_parties(), work_serializer(),
absl::make_unique<ResolverResultHandler>(Ref()));
@ -214,6 +212,7 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
if (lb_policy_ == nullptr) {
grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::make_unique<TransientFailurePicker>(state_error));
@ -304,45 +303,51 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
TraceStringVector trace_strings;
const bool resolution_contains_addresses = result.addresses.size() > 0;
// Process the resolver result.
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
bool service_config_changed = false;
std::string service_config_error_string;
if (process_resolver_result_ != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE;
bool no_valid_service_config = false;
service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, result, &lb_policy_config,
&service_config_error, &no_valid_service_config);
if (service_config_error != GRPC_ERROR_NONE) {
service_config_error_string = grpc_error_string(service_config_error);
if (no_valid_service_config) {
ChannelConfigHelper::ApplyServiceConfigResult service_config_result;
if (helper_ != nullptr) {
service_config_result = helper_->ApplyServiceConfig(result);
if (service_config_result.service_config_error != GRPC_ERROR_NONE) {
if (service_config_result.no_valid_service_config) {
// We received an invalid service config and we don't have a
// fallback service config.
OnResolverError(service_config_error);
} else {
GRPC_ERROR_UNREF(service_config_error);
OnResolverError(service_config_result.service_config_error);
service_config_result.service_config_error = GRPC_ERROR_NONE;
}
}
} else {
lb_policy_config = child_lb_config_;
service_config_result.lb_policy_config = child_lb_config_;
}
if (lb_policy_config != nullptr) {
// Before we send the args to the LB policy, grab the ConfigSelector for
// later use.
RefCountedPtr<ConfigSelector> config_selector =
ConfigSelector::GetFromChannelArgs(*result.args);
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
std::move(result));
if (service_config_result.lb_policy_config != nullptr) {
CreateOrUpdateLbPolicyLocked(
std::move(service_config_result.lb_policy_config), std::move(result));
}
// Apply ConfigSelector to channel.
// This needs to happen after the LB policy has been updated, since
// the ConfigSelector may need the LB policy to know about new
// destinations before it can send RPCs to those destinations.
if (helper_ != nullptr) {
helper_->ApplyConfigSelector(service_config_result.service_config_changed,
std::move(config_selector));
}
// Add channel trace event.
if (service_config_changed) {
if (service_config_result.service_config_changed) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back("Service config changed");
}
if (!service_config_error_string.empty()) {
trace_strings.push_back(service_config_error_string.c_str());
if (service_config_result.service_config_error != GRPC_ERROR_NONE) {
trace_strings.push_back(
grpc_error_string(service_config_result.service_config_error));
}
MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
&trace_strings);
ConcatenateAndAddChannelTraceLocked(trace_strings);
GRPC_ERROR_UNREF(service_config_result.service_config_error);
}
} // namespace grpc_core

@ -23,6 +23,7 @@
#include "absl/container/inlined_vector.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/resolver.h"
@ -52,22 +53,37 @@ namespace grpc_core {
// child LB policy and config to use.
class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
// Synchronous callback that takes the resolver result and sets
// lb_policy_config to point to the right data.
// Returns true if the service config has changed since the last result.
// If the returned no_valid_service_config is true, that means that we
// don't have a valid service config to use, and we should set the channel
// to be in TRANSIENT_FAILURE.
typedef bool (*ProcessResolverResultCallback)(
void* user_data, const Resolver::Result& result,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error, bool* no_valid_service_config);
// If error is set when this returns, then construction failed, and
// the caller may not use the new object.
ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
ProcessResolverResultCallback process_resolver_result,
void* process_resolver_result_user_data);
class ChannelConfigHelper {
public:
struct ApplyServiceConfigResult {
// Set to true if the service config has changed since the last result.
bool service_config_changed = false;
// Set to true if we don't have a valid service config to use.
// This tells the ResolvingLoadBalancingPolicy to put the channel
// into TRANSIENT_FAILURE.
bool no_valid_service_config = false;
// A service config parsing error occurred.
grpc_error* service_config_error = GRPC_ERROR_NONE;
// The LB policy config to use.
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
};
// Applies the service config to the channel.
virtual ApplyServiceConfigResult ApplyServiceConfig(
const Resolver::Result& result) = 0;
// Applies the ConfigSelector to the channel.
virtual void ApplyConfigSelector(
bool service_config_changed,
RefCountedPtr<ConfigSelector> config_selector) = 0;
// Indicates a resolver transient failure.
virtual void ResolverTransientFailure(grpc_error* error) = 0;
};
ResolvingLoadBalancingPolicy(Args args, TraceFlag* tracer,
grpc_core::UniquePtr<char> target_uri,
ChannelConfigHelper* helper);
virtual const char* name() const override { return "resolving_lb"; }
@ -105,15 +121,16 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// Passed in from caller at construction time.
TraceFlag* tracer_;
grpc_core::UniquePtr<char> target_uri_;
ProcessResolverResultCallback process_resolver_result_ = nullptr;
void* process_resolver_result_user_data_ = nullptr;
grpc_core::UniquePtr<char> child_policy_name_;
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config_;
ChannelConfigHelper* helper_;
// Resolver and associated state.
OrphanablePtr<Resolver> resolver_;
bool previous_resolution_contained_addresses_ = false;
// Determined by resolver results.
grpc_core::UniquePtr<char> child_policy_name_;
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
};

@ -23,6 +23,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/client_channel_channelz.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/config_selector.cc',
'src/core/ext/filters/client_channel/global_subchannel_pool.cc',
'src/core/ext/filters/client_channel/health/health_check_client.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',

@ -1081,6 +1081,8 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_factory.h \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/config_selector.h \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.h \

@ -878,6 +878,8 @@ src/core/ext/filters/client_channel/client_channel_channelz.h \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_factory.h \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/config_selector.cc \
src/core/ext/filters/client_channel/config_selector.h \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/global_subchannel_pool.cc \
src/core/ext/filters/client_channel/global_subchannel_pool.h \

Loading…
Cancel
Save