Reviewer comments

reviewable/pr18946/r3
Yash Tibrewal 6 years ago
parent 8030e12624
commit fb8973722b
  1. 147
      src/core/ext/filters/client_channel/client_channel.cc
  2. 10
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  3. 106
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  4. 68
      src/core/ext/filters/client_channel/resolver_result_parsing.h
  5. 11
      src/core/ext/filters/client_channel/resolving_lb_policy.cc

@ -67,7 +67,6 @@
#include "src/core/lib/transport/status_metadata.h"
using grpc_core::internal::ClientChannelMethodParsedObject;
using grpc_core::internal::ProcessedResolverResult;
using grpc_core::internal::ServerRetryThrottleData;
//
@ -233,6 +232,12 @@ class ChannelData {
static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
void ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedObject* parsed_object,
UniquePtr<char>* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config);
//
// Fields set at construction and never modified.
//
@ -242,6 +247,7 @@ class ChannelData {
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
UniquePtr<char> server_name_;
RefCountedPtr<ServiceConfig> default_service_config_;
// Initialized shortly after construction.
channelz::ClientChannelNode* channelz_node_ = nullptr;
@ -266,7 +272,7 @@ class ChannelData {
grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
bool saved_service_config_ = false;
RefCountedPtr<ServiceConfig> saved_service_config_;
//
// Fields accessed from both data plane and control plane combiners.
@ -1068,6 +1074,17 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
"filter");
return;
}
// Get default service config
const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
if (service_config_json != nullptr) {
*error = GRPC_ERROR_NONE;
default_service_config_ = ServiceConfig::Create(service_config_json, error);
if (*error != GRPC_ERROR_NONE) {
default_service_config_.reset();
return;
}
}
grpc_uri* uri = grpc_uri_parse(server_uri, true);
if (uri != nullptr && uri->path[0] != '\0') {
server_name_.reset(
@ -1130,6 +1147,56 @@ ChannelData::~ChannelData() {
gpr_mu_destroy(&info_mu_);
}
void ChannelData::ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedObject* parsed_object,
UniquePtr<char>* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config) {
// Prefer the LB policy name found in the service config.
if (parsed_object != nullptr) {
if (parsed_object->parsed_lb_config() != nullptr) {
(*lb_policy_name)
.reset(gpr_strdup(parsed_object->parsed_lb_config()->name()));
*lb_policy_config = parsed_object->parsed_lb_config();
} else {
(*lb_policy_name)
.reset(gpr_strdup(parsed_object->parsed_deprecated_lb_policy()));
}
}
// Otherwise, find the LB policy name set by the client API.
if (*lb_policy_name == nullptr) {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
(*lb_policy_name)
.reset(gpr_strdup(grpc_channel_arg_get_string(channel_arg)));
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver has returned.
bool found_balancer_address = false;
for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
const ServerAddress& address = resolver_result.addresses[i];
if (address.IsBalancer()) {
found_balancer_address = true;
break;
}
}
if (found_balancer_address) {
if (*lb_policy_name != nullptr &&
strcmp((*lb_policy_name).get(), "grpclb") != 0) {
gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy",
(*lb_policy_name).get());
}
(*lb_policy_name).reset(gpr_strdup("grpclb"));
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (*lb_policy_name == nullptr) {
(*lb_policy_name).reset(gpr_strdup("pick_first"));
}
}
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
@ -1137,41 +1204,73 @@ bool ChannelData::ProcessResolverResultLocked(
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
ProcessedResolverResult resolver_result(result, chand->saved_service_config_);
*service_config_error = resolver_result.service_config_error();
if (*service_config_error != GRPC_ERROR_NONE) {
// We got an invalid service config. If we had a service config previously,
// we will continue using it.
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config
if (result.service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then prefer using the saved service
// config, otherwise use the default service config provided by the client
// API
if (chand->saved_service_config_ != nullptr) {
service_config = chand->saved_service_config_;
} else if (chand->default_service_config_ != nullptr) {
service_config = chand->default_service_config_;
}
} else if (result.service_config == nullptr) {
// We got no service config. Fallback to default service config
if (chand->default_service_config_ != nullptr) {
service_config = chand->default_service_config_;
}
} else {
service_config = result.service_config;
}
*service_config_error = GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) {
return false;
}
chand->saved_service_config_ = true;
char* service_config_json = gpr_strdup(resolver_result.service_config_json());
char* service_config_json = nullptr;
// Process service config.
const internal::ClientChannelGlobalParsedObject* parsed_object = nullptr;
if (service_config != nullptr) {
parsed_object =
static_cast<const internal::ClientChannelGlobalParsedObject*>(
service_config->GetParsedGlobalServiceConfigObject(
internal::ClientChannelServiceConfigParser::ParserIndex()));
service_config_json = gpr_strdup(service_config->service_config_json());
}
bool service_config_changed = service_config != chand->saved_service_config_;
if (service_config_changed) {
chand->saved_service_config_ = std::move(service_config);
Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttle_data;
if (parsed_object != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_object->health_check_service_name()));
retry_throttle_data = parsed_object->retry_throttling();
} else {
chand->health_check_service_name_.reset();
}
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
New<ServiceConfigSetter>(chand, retry_throttle_data,
chand->saved_service_config_);
}
UniquePtr<char> processed_lb_policy_name;
chand->ProcessLbPolicy(result, parsed_object, &processed_lb_policy_name,
lb_policy_config);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
chand, service_config_json);
}
chand->health_check_service_name_.reset(
gpr_strdup(resolver_result.health_check_service_name()));
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
New<ServiceConfigSetter>(chand, resolver_result.retry_throttle_data(),
resolver_result.service_config());
// Swap out the data used by GetChannelInfo().
bool service_config_changed;
{
MutexLock lock(&chand->info_mu_);
chand->info_lb_policy_name_ = resolver_result.lb_policy_name();
service_config_changed =
((service_config_json == nullptr) !=
(chand->info_service_config_json_ == nullptr)) ||
(service_config_json != nullptr &&
strcmp(service_config_json, chand->info_service_config_json_.get()) !=
0);
chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
chand->info_service_config_json_.reset(service_config_json);
}
// Return results.
*lb_policy_name = chand->info_lb_policy_name_.get();
*lb_policy_config = resolver_result.lb_policy_config();
return service_config_changed;
}

@ -251,7 +251,7 @@ char* ChooseServiceConfig(char* service_config_choice_json,
continue;
}
grpc_json* service_config_json = nullptr;
bool not_choice = false; // has this choice been rejected?
bool selected = true; // has this choice been rejected?
for (grpc_json* field = choice->child; field != nullptr;
field = field->next) {
// Check client language, if specified.
@ -260,7 +260,7 @@ char* ChooseServiceConfig(char* service_config_choice_json,
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:clientLanguage error:should be of type array"));
} else if (!ValueInJsonArray(field, "c++")) {
not_choice = true;
selected = false;
}
}
// Check client hostname, if specified.
@ -272,7 +272,7 @@ char* ChooseServiceConfig(char* service_config_choice_json,
}
char* hostname = grpc_gethostname();
if (hostname == nullptr || !ValueInJsonArray(field, hostname)) {
not_choice = true;
selected = false;
}
}
// Check percentage, if specified.
@ -290,7 +290,7 @@ char* ChooseServiceConfig(char* service_config_choice_json,
continue;
}
if (random_pct > percentage || percentage == 0) {
not_choice = true;
selected = false;
}
}
// Save service config.
@ -303,7 +303,7 @@ char* ChooseServiceConfig(char* service_config_choice_json,
}
}
}
if (!found_choice && !not_choice && service_config_json != nullptr) {
if (!found_choice && selected && service_config_json != nullptr) {
service_config = grpc_json_dump_to_string(service_config_json, 0);
found_choice = true;
}

@ -58,112 +58,6 @@ void ClientChannelServiceConfigParser::Register() {
New<ClientChannelServiceConfigParser>()));
}
ProcessedResolverResult::ProcessedResolverResult(
const Resolver::Result& resolver_result, bool saved_service_config)
: service_config_(resolver_result.service_config) {
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config
if (service_config_ == nullptr) {
// If the service config was invalid, then prefer using the saved service
// config, otherwise use the default service config provided by the client
// API
if (resolver_result.service_config_error != GRPC_ERROR_NONE &&
saved_service_config) {
// Return the service config error to client channel, so that it continues
// using the existing service config.
service_config_error_ =
GRPC_ERROR_REF(resolver_result.service_config_error);
} else {
// Either no service config or an invalid service config was received.
const char* service_config_json =
grpc_channel_arg_get_string(grpc_channel_args_find(
resolver_result.args, GRPC_ARG_SERVICE_CONFIG));
if (service_config_json != nullptr) {
service_config_ =
ServiceConfig::Create(service_config_json, &service_config_error_);
} else {
// We could not find a fallback service config, save the service config
// error.
service_config_error_ =
GRPC_ERROR_REF(resolver_result.service_config_error);
}
}
} else {
service_config_error_ =
GRPC_ERROR_REF(resolver_result.service_config_error);
}
if (service_config_error_ != GRPC_ERROR_NONE) {
// We got an invalid service config. Don't process any further.
return;
}
// Process service config.
const ClientChannelGlobalParsedObject* parsed_object = nullptr;
if (service_config_ != nullptr) {
parsed_object = static_cast<const ClientChannelGlobalParsedObject*>(
service_config_->GetParsedGlobalServiceConfigObject(
ClientChannelServiceConfigParser::ParserIndex()));
ProcessServiceConfig(resolver_result, parsed_object);
}
ProcessLbPolicy(resolver_result, parsed_object);
}
void ProcessedResolverResult::ProcessServiceConfig(
const Resolver::Result& resolver_result,
const ClientChannelGlobalParsedObject* parsed_object) {
health_check_service_name_ = parsed_object->health_check_service_name();
service_config_json_ = service_config_->service_config_json();
if (parsed_object != nullptr) {
retry_throttle_data_ = parsed_object->retry_throttling();
}
}
void ProcessedResolverResult::ProcessLbPolicy(
const Resolver::Result& resolver_result,
const ClientChannelGlobalParsedObject* parsed_object) {
// Prefer the LB policy name found in the service config.
if (parsed_object != nullptr) {
if (parsed_object->parsed_lb_config() != nullptr) {
lb_policy_name_.reset(
gpr_strdup(parsed_object->parsed_lb_config()->name()));
lb_policy_config_ = parsed_object->parsed_lb_config();
} else {
lb_policy_name_.reset(
gpr_strdup(parsed_object->parsed_deprecated_lb_policy()));
}
}
// Otherwise, find the LB policy name set by the client API.
if (lb_policy_name_ == nullptr) {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
lb_policy_name_.reset(gpr_strdup(grpc_channel_arg_get_string(channel_arg)));
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver has returned.
bool found_balancer_address = false;
for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
const ServerAddress& address = resolver_result.addresses[i];
if (address.IsBalancer()) {
found_balancer_address = true;
break;
}
}
if (found_balancer_address) {
if (lb_policy_name_ != nullptr &&
strcmp(lb_policy_name_.get(), "grpclb") != 0) {
gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy",
lb_policy_name_.get());
}
lb_policy_name_.reset(gpr_strdup("grpclb"));
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name_ == nullptr) {
lb_policy_name_.reset(gpr_strdup("pick_first"));
}
}
namespace {
// Parses a JSON field of the form generated for a google.proto.Duration

@ -118,74 +118,6 @@ class ClientChannelServiceConfigParser : public ServiceConfig::Parser {
static void Register();
};
// TODO(yashykt): It would be cleaner to move this logic to the client_channel
// code. A container of processed fields from the resolver result. Simplifies
// the usage of resolver result.
class ProcessedResolverResult {
public:
// Processes the resolver result and populates the relative members
// for later consumption.
ProcessedResolverResult(const Resolver::Result& resolver_result,
bool saved_service_config);
~ProcessedResolverResult() { GRPC_ERROR_UNREF(service_config_error_); }
// Getters. Any managed object's ownership is transferred.
const char* service_config_json() { return service_config_json_; }
RefCountedPtr<ServiceConfig> service_config() { return service_config_; }
UniquePtr<char> lb_policy_name() { return std::move(lb_policy_name_); }
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config() {
return lb_policy_config_;
}
Optional<ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttle_data() {
return retry_throttle_data_;
}
const char* health_check_service_name() { return health_check_service_name_; }
grpc_error* service_config_error() {
grpc_error* return_error = service_config_error_;
service_config_error_ = GRPC_ERROR_NONE;
return return_error;
}
private:
// Finds the service config; extracts LB config and (maybe) retry throttle
// params from it.
void ProcessServiceConfig(
const Resolver::Result& resolver_result,
const ClientChannelGlobalParsedObject* parsed_object);
// Extracts the LB policy.
void ProcessLbPolicy(const Resolver::Result& resolver_result,
const ClientChannelGlobalParsedObject* parsed_object);
// Parses the service config. Intended to be used by
// ServiceConfig::ParseGlobalParams.
static void ParseServiceConfig(const grpc_json* field,
ProcessedResolverResult* parsing_state);
// Parses the LB config from service config.
void ParseLbConfigFromServiceConfig(const grpc_json* field);
// Parses the retry throttle parameters from service config.
void ParseRetryThrottleParamsFromServiceConfig(const grpc_json* field);
// Service config.
const char* service_config_json_ = nullptr;
RefCountedPtr<ServiceConfig> service_config_;
grpc_error* service_config_error_ = GRPC_ERROR_NONE;
// LB policy.
UniquePtr<char> lb_policy_name_;
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config_;
// Retry throttle data.
Optional<ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttle_data_;
const char* health_check_service_name_ = nullptr;
};
} // namespace internal
} // namespace grpc_core

@ -538,7 +538,16 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
process_resolver_result_user_data_, result, &lb_policy_name,
&lb_policy_config, &service_config_error);
if (service_config_error != GRPC_ERROR_NONE) {
return OnResolverError(service_config_error);
if (channelz_node() != nullptr) {
trace_strings.push_back(
gpr_strdup(grpc_error_string(service_config_error)));
}
if (lb_policy_name == nullptr) {
// Use an empty lb_policy_name as an indicator that we received an
// invalid service config and we don't have a fallback service config.
return OnResolverError(service_config_error);
}
GRPC_ERROR_UNREF(service_config_error);
}
} else {
lb_policy_name = child_policy_name_.get();

Loading…
Cancel
Save