Always construct LB policy config, even when only the policy name is specified.

pull/22100/head
Mark D. Roth 5 years ago
parent 389d9baa6f
commit 48b9cd992d
  1. 65
      src/core/ext/filters/client_channel/client_channel.cc
  2. 21
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  3. 12
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  4. 9
      test/core/util/test_lb_policies.cc

@ -239,9 +239,9 @@ class ChannelData {
void DestroyResolvingLoadBalancingPolicyLocked(); void DestroyResolvingLoadBalancingPolicyLocked();
static bool ProcessResolverResultLocked( static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name, void* arg, const Resolver::Result& result,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error); grpc_error** service_config_error, bool* no_valid_service_config);
grpc_error* DoPingLocked(grpc_transport_op* op); grpc_error* DoPingLocked(grpc_transport_op* op);
@ -252,7 +252,6 @@ class ChannelData {
void ProcessLbPolicy( void ProcessLbPolicy(
const Resolver::Result& resolver_result, const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config, const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
grpc_core::UniquePtr<char>* lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config); RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
// //
@ -1620,24 +1619,23 @@ void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
void ChannelData::ProcessLbPolicy( void ChannelData::ProcessLbPolicy(
const Resolver::Result& resolver_result, const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config, const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
grpc_core::UniquePtr<char>* lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) { RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
// Prefer the LB policy name found in the service config. // Prefer the LB policy config found in the service config.
if (parsed_service_config != nullptr && if (parsed_service_config != nullptr &&
parsed_service_config->parsed_lb_config() != nullptr) { parsed_service_config->parsed_lb_config() != nullptr) {
lb_policy_name->reset(
gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
*lb_policy_config = parsed_service_config->parsed_lb_config(); *lb_policy_config = parsed_service_config->parsed_lb_config();
return; return;
} }
const char* local_policy_name = nullptr; // Try the deprecated LB policy name from the service config.
// If not, try the setting from channel args.
const char* policy_name = nullptr;
if (parsed_service_config != nullptr && if (parsed_service_config != nullptr &&
parsed_service_config->parsed_deprecated_lb_policy() != nullptr) { parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
local_policy_name = parsed_service_config->parsed_deprecated_lb_policy(); policy_name = parsed_service_config->parsed_deprecated_lb_policy();
} else { } else {
const grpc_arg* channel_arg = const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME); grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
local_policy_name = grpc_channel_arg_get_string(channel_arg); policy_name = grpc_channel_arg_get_string(channel_arg);
} }
// Special case: If at least one balancer address is present, we use // Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver has returned. // the grpclb policy, regardless of what the resolver has returned.
@ -1650,27 +1648,46 @@ void ChannelData::ProcessLbPolicy(
} }
} }
if (found_balancer_address) { if (found_balancer_address) {
if (local_policy_name != nullptr && if (policy_name != nullptr && strcmp(policy_name, "grpclb") != 0) {
strcmp(local_policy_name, "grpclb") != 0) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one " "resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy", "balancer address -- forcing use of grpclb LB policy",
local_policy_name); policy_name);
} }
local_policy_name = "grpclb"; policy_name = "grpclb";
} }
// Use pick_first if nothing was specified and we didn't select grpclb // Use pick_first if nothing was specified and we didn't select grpclb
// above. // above.
lb_policy_name->reset(gpr_strdup( if (policy_name == nullptr) policy_name = "pick_first";
local_policy_name == nullptr ? "pick_first" : local_policy_name)); // Now that we have the policy name, construct an empty config for it.
Json config_json = Json::Array{Json::Object{
{policy_name, Json::Object{}},
}};
grpc_error* parse_error = GRPC_ERROR_NONE;
*lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
config_json, &parse_error);
// The policy name came from one of three places:
// - The deprecated loadBalancingPolicy field in the service config,
// in which case the code in ClientChannelServiceConfigParser
// already verified that the policy does not require a config.
// - One of the hard-coded values here, all of which are known to not
// require a config.
// - A channel arg, in which case the application did something that
// is a misuse of our API.
// In the first two cases, these assertions will always be true. In
// the last case, this is probably fine for now.
// TODO(roth): If the last case becomes a problem, add better error
// handling here.
GPR_ASSERT(*lb_policy_config != nullptr);
GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
} }
// Synchronous callback from ResolvingLoadBalancingPolicy to process a // Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update. // resolver result update.
bool ChannelData::ProcessResolverResultLocked( bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name, void* arg, const Resolver::Result& result,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error) { grpc_error** service_config_error, bool* no_valid_service_config) {
ChannelData* chand = static_cast<ChannelData*>(arg); ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config; RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service // If resolver did not return a service config or returned an invalid service
@ -1680,13 +1697,13 @@ bool ChannelData::ProcessResolverResultLocked(
// config. If there is no saved config either, use the default service // config. If there is no saved config either, use the default service
// config. // config.
if (chand->saved_service_config_ != nullptr) { if (chand->saved_service_config_ != nullptr) {
service_config = chand->saved_service_config_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. " "chand=%p: resolver returned invalid service config. "
"Continuing to use previous service config.", "Continuing to use previous service config.",
chand); chand);
} }
service_config = chand->saved_service_config_;
} else if (chand->default_service_config_ != nullptr) { } else if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -1712,6 +1729,7 @@ bool ChannelData::ProcessResolverResultLocked(
*service_config_error = GRPC_ERROR_REF(result.service_config_error); *service_config_error = GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr && if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) { result.service_config_error != GRPC_ERROR_NONE) {
*no_valid_service_config = true;
return false; return false;
} }
// Process service config. // Process service config.
@ -1776,19 +1794,18 @@ bool ChannelData::ProcessResolverResultLocked(
chand->UpdateServiceConfigLocked(std::move(retry_throttle_data), chand->UpdateServiceConfigLocked(std::move(retry_throttle_data),
chand->saved_service_config_); chand->saved_service_config_);
} }
grpc_core::UniquePtr<char> processed_lb_policy_name; chand->ProcessLbPolicy(result, parsed_service_config, lb_policy_config);
chand->ProcessLbPolicy(result, parsed_service_config, grpc_core::UniquePtr<char> lb_policy_name(
&processed_lb_policy_name, lb_policy_config); gpr_strdup((*lb_policy_config)->name()));
// Swap out the data used by GetChannelInfo(). // Swap out the data used by GetChannelInfo().
{ {
MutexLock lock(&chand->info_mu_); MutexLock lock(&chand->info_mu_);
chand->info_lb_policy_name_ = std::move(processed_lb_policy_name); chand->info_lb_policy_name_ = std::move(lb_policy_name);
if (service_config_json != nullptr) { if (service_config_json != nullptr) {
chand->info_service_config_json_ = std::move(service_config_json); chand->info_service_config_json_ = std::move(service_config_json);
} }
} }
// Return results. // Return results.
*lb_policy_name = chand->info_lb_policy_name_.get();
return service_config_changed; return service_config_changed;
} }

@ -268,7 +268,6 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
} }
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings) { Resolver::Result result, TraceStringVector* trace_strings) {
// If the child policy name changes, we need to create a new child // If the child policy name changes, we need to create a new child
@ -320,6 +319,7 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
// that was there before, which will be immediately shut down) // that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper // and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY. // when the new child transitions into state READY.
const char* lb_policy_name = lb_policy_config->name();
const bool create_policy = const bool create_policy =
// case 1 // case 1
lb_policy_ == nullptr || lb_policy_ == nullptr ||
@ -451,34 +451,33 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
TraceStringVector trace_strings; TraceStringVector trace_strings;
const bool resolution_contains_addresses = result.addresses.size() > 0; const bool resolution_contains_addresses = result.addresses.size() > 0;
// Process the resolver result. // Process the resolver result.
const char* lb_policy_name = nullptr;
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config; RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
bool service_config_changed = false; bool service_config_changed = false;
char* service_config_error_string = nullptr; char* service_config_error_string = nullptr;
if (process_resolver_result_ != nullptr) { if (process_resolver_result_ != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE; grpc_error* service_config_error = GRPC_ERROR_NONE;
bool no_valid_service_config = false;
service_config_changed = process_resolver_result_( service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, result, &lb_policy_name, process_resolver_result_user_data_, result, &lb_policy_config,
&lb_policy_config, &service_config_error); &service_config_error, &no_valid_service_config);
if (service_config_error != GRPC_ERROR_NONE) { if (service_config_error != GRPC_ERROR_NONE) {
service_config_error_string = service_config_error_string =
gpr_strdup(grpc_error_string(service_config_error)); gpr_strdup(grpc_error_string(service_config_error));
if (lb_policy_name == nullptr) { if (no_valid_service_config) {
// Use an empty lb_policy_name as an indicator that we received an // We received an invalid service config and we don't have a
// invalid service config and we don't have a fallback service config. // fallback service config.
OnResolverError(service_config_error); OnResolverError(service_config_error);
} else { } else {
GRPC_ERROR_UNREF(service_config_error); GRPC_ERROR_UNREF(service_config_error);
} }
} }
} else { } else {
lb_policy_name = child_policy_name_.get();
lb_policy_config = child_lb_config_; lb_policy_config = child_lb_config_;
} }
if (lb_policy_name != nullptr) { if (lb_policy_config != nullptr) {
// Create or update LB policy, as needed. // Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config, CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), std::move(result),
std::move(result), &trace_strings); &trace_strings);
} }
// Add channel trace event. // Add channel trace event.
if (service_config_changed) { if (service_config_changed) {

@ -52,16 +52,15 @@ namespace grpc_core {
class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
public: public:
// Synchronous callback that takes the resolver result and sets // Synchronous callback that takes the resolver result and sets
// lb_policy_name and lb_policy_config to point to the right data. // lb_policy_config to point to the right data.
// Returns true if the service config has changed since the last result. // Returns true if the service config has changed since the last result.
// If the returned service_config_error is not none and lb_policy_name is // If the returned no_valid_service_config is true, that means that we
// empty, it means that we don't have a valid service config to use, and we // don't have a valid service config to use, and we should set the channel
// should set the channel to be in TRANSIENT_FAILURE. // to be in TRANSIENT_FAILURE.
typedef bool (*ProcessResolverResultCallback)( typedef bool (*ProcessResolverResultCallback)(
void* user_data, const Resolver::Result& result, void* user_data, const Resolver::Result& result,
const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error); grpc_error** service_config_error, bool* no_valid_service_config);
// If error is set when this returns, then construction failed, and // If error is set when this returns, then construction failed, and
// the caller may not use the new object. // the caller may not use the new object.
ResolvingLoadBalancingPolicy( ResolvingLoadBalancingPolicy(
@ -92,7 +91,6 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void OnResolverError(grpc_error* error); void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked( void CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings); Resolver::Result result, TraceStringVector* trace_strings);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked( OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(

@ -209,6 +209,13 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
} }
}; };
class InterceptTrailingConfig : public LoadBalancingPolicy::Config {
public:
const char* name() const override {
return kInterceptRecvTrailingMetadataLbPolicyName;
}
};
class InterceptTrailingFactory : public LoadBalancingPolicyFactory { class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
public: public:
explicit InterceptTrailingFactory(InterceptRecvTrailingMetadataCallback cb, explicit InterceptTrailingFactory(InterceptRecvTrailingMetadataCallback cb,
@ -227,7 +234,7 @@ class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& /*json*/, grpc_error** /*error*/) const override { const Json& /*json*/, grpc_error** /*error*/) const override {
return nullptr; return MakeRefCounted<InterceptTrailingConfig>();
} }
private: private:

Loading…
Cancel
Save