Reviewer comments

reviewable/pr18746/r3
Yash Tibrewal 6 years ago
parent baa52808f1
commit 70839d966f
  1. 28
      src/core/ext/filters/client_channel/client_channel.cc
  2. 1
      src/core/ext/filters/client_channel/client_channel_plugin.cc
  3. 59
      src/core/ext/filters/client_channel/lb_policy.cc
  4. 18
      src/core/ext/filters/client_channel/lb_policy.h
  5. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  7. 5
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  8. 15
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  9. 9
      src/core/ext/filters/client_channel/lb_policy_factory.h
  10. 91
      src/core/ext/filters/client_channel/lb_policy_registry.cc
  11. 8
      src/core/ext/filters/client_channel/lb_policy_registry.h
  12. 102
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  13. 19
      src/core/ext/filters/client_channel/resolver_result_parsing.h
  14. 2
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  15. 3
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  16. 162
      src/core/ext/filters/client_channel/service_config.h
  17. 1
      src/core/ext/filters/client_channel/subchannel.cc
  18. 3
      src/core/ext/filters/message_size/message_size_filter.cc
  19. 13
      test/core/client_channel/service_config_test.cc

@ -223,7 +223,7 @@ class ChannelData {
~ChannelData();
static bool ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name,
void* arg, const Resolver::Result& result, const char** lb_policy_name,
const ParsedLoadBalancingConfig** lb_policy_config,
const HealthCheckParsedObject** health_check);
@ -252,8 +252,8 @@ class ChannelData {
QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
// Data from service config.
bool received_service_config_data_ = false;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
grpc_core::RefCountedPtr<grpc_core::ServiceConfig> service_config_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<ServiceConfig> service_config_;
//
// Fields used in the control plane. Guarded by combiner.
@ -619,7 +619,7 @@ class CallData {
grpc_call_context_element* call_context_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<grpc_core::ServiceConfig> service_config_;
RefCountedPtr<ServiceConfig> service_config_;
const ClientChannelMethodParsedObject* method_params_ = nullptr;
RefCountedPtr<SubchannelCall> subchannel_call_;
@ -1110,11 +1110,11 @@ ChannelData::~ChannelData() {
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name,
void* arg, const Resolver::Result& result, const char** lb_policy_name,
const ParsedLoadBalancingConfig** lb_policy_config,
const grpc_core::HealthCheckParsedObject** health_check) {
const HealthCheckParsedObject** health_check) {
ChannelData* chand = static_cast<ChannelData*>(arg);
ProcessedResolverResult resolver_result(result, chand->enable_retries_);
ProcessedResolverResult resolver_result(result);
const char* service_config_json = resolver_result.service_config_json();
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
@ -1854,7 +1854,7 @@ void CallData::DoRetry(grpc_call_element* elem,
// Compute backoff delay.
grpc_millis next_attempt_time;
if (server_pushback_ms >= 0) {
next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
last_attempt_got_server_pushback_ = true;
} else {
if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
@ -1871,7 +1871,7 @@ void CallData::DoRetry(grpc_call_element* elem,
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
this, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
this, next_attempt_time - ExecCtx::Get()->Now());
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
@ -3079,22 +3079,22 @@ void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, this);
}
if (chand->service_config() != nullptr) {
service_config_ = chand->service_config();
if (service_config_ != nullptr) {
// Store a ref to the service_config in CallData. Also, save pointers to the
// ServiceConfig and ServiceConfigObjectsVector (for this call) in the
// call_context so that all future filters can access it.
service_config_ = chand->service_config();
call_context_[GRPC_SERVICE_CONFIG].value = &service_config_;
const auto* method_params_vector_ptr =
chand->service_config()->GetMethodServiceConfigObjectsVector(path_);
service_config_->GetMethodServiceConfigObjectsVector(path_);
if (method_params_vector_ptr != nullptr) {
method_params_ = static_cast<ClientChannelMethodParsedObject*>(
((*method_params_vector_ptr)
[grpc_core::internal::ClientChannelServiceConfigParser::
[internal::ClientChannelServiceConfigParser::
client_channel_service_config_parser_index()])
.get());
call_context_[GRPC_SERVICE_CONFIG_METHOD_PARAMS].value =
const_cast<grpc_core::ServiceConfig::ServiceConfigObjectsVector*>(
const_cast<ServiceConfig::ServiceConfigObjectsVector*>(
method_params_vector_ptr);
}
}

@ -53,7 +53,6 @@ static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
void grpc_service_config_register_parsers() {
grpc_core::internal::ClientChannelServiceConfigParser::Register();
grpc_core::MessageSizeParser::Register();
grpc_core::HealthCheckParser::Register();
}

@ -62,65 +62,6 @@ void LoadBalancingPolicy::ShutdownAndUnrefLocked(void* arg,
policy->Unref();
}
grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig(
const grpc_json* lb_config_array, const char* field_name,
grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
char* error_msg;
if (lb_config_array == nullptr || lb_config_array->type != GRPC_JSON_ARRAY) {
gpr_asprintf(&error_msg, "field:%s error:type should be array", field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
// Find the first LB policy that this client supports.
for (const grpc_json* lb_config = lb_config_array->child;
lb_config != nullptr; lb_config = lb_config->next) {
if (lb_config->type != GRPC_JSON_OBJECT) {
gpr_asprintf(&error_msg,
"field:%s error:child entry should be of type object",
field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
grpc_json* policy = nullptr;
for (grpc_json* field = lb_config->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || field->type != GRPC_JSON_OBJECT) {
gpr_asprintf(&error_msg,
"field:%s error:child entry should be of type object",
field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
if (policy != nullptr) {
gpr_asprintf(&error_msg, "field:%s error:oneOf violation", field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
} // Violate "oneof" type.
policy = field;
}
if (policy == nullptr) {
gpr_asprintf(&error_msg, "field:%s error:no policy found in child entry",
field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
// If we support this policy, then select it.
if (LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(policy->key)) {
return policy;
}
}
gpr_asprintf(&error_msg, "field:%s error:No known policy", field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
//
// LoadBalancingPolicy::UpdateArgs
//

@ -36,7 +36,17 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
namespace grpc_core {
class ParsedLoadBalancingConfig;
/// Interface for parsed forms of load balancing configs found in a service
/// config.
class ParsedLoadBalancingConfig {
public:
virtual ~ParsedLoadBalancingConfig() = default;
// Returns the load balancing policy name
virtual const char* name() const GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS;
};
/// Interface for load balancing policies.
///
@ -272,12 +282,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
void Orphan() override;
/// Returns the JSON node of policy (with both policy name and config content)
/// given the JSON node of a LoadBalancingConfig array.
static grpc_json* ParseLoadBalancingConfig(const grpc_json* lb_config_array,
const char* field_name,
grpc_error** error);
// A picker that returns PICK_QUEUE for all picks.
// Also calls the parent LB policy's ExitIdleLocked() method when the
// first pick is seen.

@ -1814,8 +1814,10 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
UniquePtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
GPR_DEBUG_ASSERT(json != nullptr);
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
if (json == nullptr) {
return UniquePtr<ParsedLoadBalancingConfig>(
New<ParsedGrpcLbConfig>(nullptr));
}
InlinedVector<grpc_error*, 2> error_list;
UniquePtr<ParsedLoadBalancingConfig> child_policy = nullptr;
@ -1830,7 +1832,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
}
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, "childPolicy", &parse_error);
field, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
error_list.push_back(parse_error);
}

@ -548,8 +548,9 @@ class PickFirstFactory : public LoadBalancingPolicyFactory {
UniquePtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(json != nullptr);
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
if (json != nullptr) {
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
}
return UniquePtr<ParsedLoadBalancingConfig>(New<ParsedPickFirstConfig>());
}
};

@ -525,8 +525,9 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory {
UniquePtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(json != nullptr);
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
if (json != nullptr) {
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
}
return UniquePtr<ParsedLoadBalancingConfig>(New<ParsedRoundRobinConfig>());
}
};

@ -1667,7 +1667,16 @@ class XdsFactory : public LoadBalancingPolicyFactory {
UniquePtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(json != nullptr);
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
// xds was mentioned as a policy in the deprecated loadBalancingPolicy
// field.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:Xds Parser has required field - "
"balancerName. Please use loadBalancingConfig instead of the "
"deprecated loadBalancingPolicy");
return nullptr;
}
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
InlinedVector<grpc_error*, 3> error_list;
@ -1697,7 +1706,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
}
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, "childPolicy", &parse_error);
field, &parse_error);
if (child_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
error_list.push_back(parse_error);
@ -1710,7 +1719,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
}
grpc_error* parse_error = GRPC_ERROR_NONE;
fallback_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, "fallbackPolicy", &parse_error);
field, &parse_error);
if (fallback_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
error_list.push_back(parse_error);

@ -27,15 +27,6 @@
namespace grpc_core {
class ParsedLoadBalancingConfig {
public:
virtual ~ParsedLoadBalancingConfig() = default;
virtual const char* name() const GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS;
};
class LoadBalancingPolicyFactory {
public:
/// Returns a new LB policy instance.

@ -99,25 +99,87 @@ bool LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(const char* name) {
return g_state->GetLoadBalancingPolicyFactory(name) != nullptr;
}
namespace {
// Returns the JSON node of policy (with both policy name and config content)
// given the JSON node of a LoadBalancingConfig array.
grpc_json* ParseLoadBalancingConfigHelper(const grpc_json* lb_config_array,
grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
char* error_msg;
if (lb_config_array == nullptr || lb_config_array->type != GRPC_JSON_ARRAY) {
gpr_asprintf(&error_msg, "field:%s error:type should be array",
lb_config_array->key);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
const char* field_name = lb_config_array->key;
// Find the first LB policy that this client supports.
for (const grpc_json* lb_config = lb_config_array->child;
lb_config != nullptr; lb_config = lb_config->next) {
if (lb_config->type != GRPC_JSON_OBJECT) {
gpr_asprintf(&error_msg,
"field:%s error:child entry should be of type object",
field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
grpc_json* policy = nullptr;
for (grpc_json* field = lb_config->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || field->type != GRPC_JSON_OBJECT) {
gpr_asprintf(&error_msg,
"field:%s error:child entry should be of type object",
field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
if (policy != nullptr) {
gpr_asprintf(&error_msg, "field:%s error:oneOf violation", field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
} // Violate "oneof" type.
policy = field;
}
if (policy == nullptr) {
gpr_asprintf(&error_msg, "field:%s error:no policy found in child entry",
field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
// If we support this policy, then select it.
if (LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(policy->key)) {
return policy;
}
}
gpr_asprintf(&error_msg, "field:%s error:No known policy", field_name);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
return nullptr;
}
} // namespace
UniquePtr<ParsedLoadBalancingConfig>
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(const grpc_json* json,
const char* field_name,
grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
GPR_ASSERT(g_state != nullptr);
const grpc_json* policy =
LoadBalancingPolicy::ParseLoadBalancingConfig(json, field_name, error);
const grpc_json* policy = ParseLoadBalancingConfigHelper(json, error);
if (policy == nullptr) {
return nullptr;
} else {
GPR_DEBUG_ASSERT(*error == GRPC_ERROR_NONE);
GPR_DEBUG_ASSERT(*error == GRPC_ERROR_NONE && json != nullptr);
// Find factory.
LoadBalancingPolicyFactory* factory =
g_state->GetLoadBalancingPolicyFactory(policy->key);
if (factory == nullptr) {
char* msg;
gpr_asprintf(&msg, "field:%s error:Factory not found to create policy",
field_name);
json->key);
*error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return nullptr;
@ -127,4 +189,23 @@ LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(const grpc_json* json,
}
}
grpc_error* LoadBalancingPolicyRegistry::ParseDeprecatedLoadBalancingPolicy(
const grpc_json* json) {
GPR_DEBUG_ASSERT(g_state != nullptr);
GPR_DEBUG_ASSERT(json != nullptr);
if (json->type != GRPC_JSON_STRING) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:type should be string");
}
auto* factory = g_state->GetLoadBalancingPolicyFactory(json->value);
if (factory == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:Unknown lb policy");
}
grpc_error* error = GRPC_ERROR_NONE;
// Check if the load balancing policy allows an empty config
factory->ParseLoadBalancingConfig(nullptr, &error);
return error;
}
} // namespace grpc_core

@ -52,8 +52,14 @@ class LoadBalancingPolicyRegistry {
/// registry.
static bool LoadBalancingPolicyExists(const char* name);
/// Returns a parsed object of the load balancing policy to be used from a
/// LoadBalancingConfig array \a json. \a field_name specifies
static UniquePtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
const grpc_json* json, const char* field_name, grpc_error** error);
const grpc_json* json, grpc_error** error);
/// Validates if the deprecated loadBalancingPolicy field can be parsed.
/// Returns GRPC_ERROR_NONE if successfully parsed.
static grpc_error* ParseDeprecatedLoadBalancingPolicy(const grpc_json* json);
};
} // namespace grpc_core

@ -59,13 +59,13 @@ void ClientChannelServiceConfigParser::Register() {
}
ProcessedResolverResult::ProcessedResolverResult(
Resolver::Result* resolver_result, bool parse_retry)
: service_config_(resolver_result->service_config) {
const Resolver::Result& resolver_result)
: service_config_(resolver_result.service_config) {
// If resolver did not return a service config, use the default
// specified via the client API.
if (service_config_ == nullptr) {
const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(resolver_result->args, GRPC_ARG_SERVICE_CONFIG));
grpc_channel_args_find(resolver_result.args, GRPC_ARG_SERVICE_CONFIG));
if (service_config_json != nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
service_config_ = ServiceConfig::Create(service_config_json, &error);
@ -74,60 +74,62 @@ ProcessedResolverResult::ProcessedResolverResult(
}
}
// Process service config.
ProcessServiceConfig(*resolver_result, parse_retry);
// If no LB config was found above, just find the LB policy name then.
if (lb_policy_name_ == nullptr) ProcessLbPolicyName(*resolver_result);
ProcessServiceConfig(resolver_result);
ProcessLbPolicy(resolver_result);
}
void ProcessedResolverResult::ProcessServiceConfig(
const Resolver::Result& resolver_result, bool parse_retry) {
const Resolver::Result& resolver_result) {
if (service_config_ == nullptr) return;
health_check_ = static_cast<HealthCheckParsedObject*>(
service_config_->GetParsedGlobalServiceConfigObject(
HealthCheckParser::ParserIndex()));
service_config_json_ = service_config_->service_config_json();
auto* parsed_object = static_cast<ClientChannelGlobalParsedObject*>(
auto* parsed_object = static_cast<const ClientChannelGlobalParsedObject*>(
service_config_->GetParsedGlobalServiceConfigObject(
ClientChannelServiceConfigParser::
client_channel_service_config_parser_index()));
if (!parsed_object) {
return;
}
if (parse_retry) {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(channel_arg);
GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path;
if (parsed_object->retry_throttling().has_value()) {
retry_throttle_data_ =
grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
server_name_,
parsed_object->retry_throttling().value().max_milli_tokens,
parsed_object->retry_throttling().value().milli_token_ratio);
}
grpc_uri_destroy(uri);
}
if (parsed_object->parsed_lb_config()) {
lb_policy_name_.reset(
gpr_strdup(parsed_object->parsed_lb_config()->name()));
lb_policy_config_ = parsed_object->parsed_lb_config();
} else {
lb_policy_name_.reset(
gpr_strdup(parsed_object->parsed_deprecated_lb_policy()));
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(channel_arg);
GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
if (parsed_object->retry_throttling().has_value()) {
char* server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path;
retry_throttle_data_ = internal::ServerRetryThrottleMap::GetDataForServer(
server_name, parsed_object->retry_throttling().value().max_milli_tokens,
parsed_object->retry_throttling().value().milli_token_ratio);
}
grpc_uri_destroy(uri);
}
void ProcessedResolverResult::ProcessLbPolicyName(
void ProcessedResolverResult::ProcessLbPolicy(
const Resolver::Result& resolver_result) {
// Prefer the LB policy name found in the service config.
if (lb_policy_name_ != nullptr) {
char* lb_policy_name = lb_policy_name_.get();
for (size_t i = 0; i < strlen(lb_policy_name); ++i) {
lb_policy_name[i] = tolower(lb_policy_name[i]);
if (service_config_ != nullptr) {
auto* parsed_object = static_cast<const ClientChannelGlobalParsedObject*>(
service_config_->GetParsedGlobalServiceConfigObject(
ClientChannelServiceConfigParser::
client_channel_service_config_parser_index()));
if (parsed_object != nullptr) {
if (parsed_object->parsed_lb_config()) {
lb_policy_name_.reset(
gpr_strdup(parsed_object->parsed_lb_config()->name()));
lb_policy_config_ = parsed_object->parsed_lb_config();
} else {
lb_policy_name_.reset(
gpr_strdup(parsed_object->parsed_deprecated_lb_policy()));
if (lb_policy_name_ != nullptr) {
char* lb_policy_name = lb_policy_name_.get();
for (size_t i = 0; i < strlen(lb_policy_name); ++i) {
lb_policy_name[i] = tolower(lb_policy_name[i]);
}
}
}
}
}
// Otherwise, find the LB policy name set by the client API.
@ -342,8 +344,7 @@ ClientChannelServiceConfigParser::ParseGlobalParams(const grpc_json* json,
InlinedVector<grpc_error*, 4> error_list;
UniquePtr<ParsedLoadBalancingConfig> parsed_lb_config;
const char* lb_policy_name = nullptr;
grpc_core::Optional<ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttling;
Optional<ClientChannelGlobalParsedObject::RetryThrottling> retry_throttling;
for (grpc_json* field = json->child; field != nullptr; field = field->next) {
if (field->key == nullptr) {
continue; // Not the LB config global parameter
@ -356,8 +357,8 @@ ClientChannelServiceConfigParser::ParseGlobalParams(const grpc_json* json,
} else {
grpc_error* parse_error = GRPC_ERROR_NONE;
parsed_lb_config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
field, "loadBalancingConfig", &parse_error);
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(field,
&parse_error);
if (parsed_lb_config == nullptr) {
error_list.push_back(parse_error);
}
@ -375,12 +376,15 @@ ClientChannelServiceConfigParser::ParseGlobalParams(const grpc_json* json,
field->value)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:Unknown lb policy"));
} else if (strcmp(field->value, "xds_experimental") == 0) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:xds not supported with this "
"field. Please use loadBalancingConfig"));
} else {
lb_policy_name = field->value;
grpc_error* parsing_error =
LoadBalancingPolicyRegistry::ParseDeprecatedLoadBalancingPolicy(
field);
if (parsing_error != GRPC_ERROR_NONE) {
error_list.push_back(parsing_error);
} else {
lb_policy_name = field->value;
}
}
}
// Parse retry throttling
@ -392,8 +396,8 @@ ClientChannelServiceConfigParser::ParseGlobalParams(const grpc_json* json,
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:retryThrottling error:Duplicate entry"));
} else {
grpc_core::Optional<int> max_milli_tokens(false, 0);
grpc_core::Optional<int> milli_token_ratio(false, 0);
Optional<int> max_milli_tokens(false, 0);
Optional<int> milli_token_ratio(false, 0);
for (grpc_json* sub_field = field->child; sub_field != nullptr;
sub_field = sub_field->next) {
if (sub_field->key == nullptr) continue;

@ -47,12 +47,12 @@ class ClientChannelGlobalParsedObject : public ServiceConfigParsedObject {
ClientChannelGlobalParsedObject(
UniquePtr<ParsedLoadBalancingConfig> parsed_lb_config,
const char* parsed_deprecated_lb_policy,
const grpc_core::Optional<RetryThrottling>& retry_throttling)
const Optional<RetryThrottling>& retry_throttling)
: parsed_lb_config_(std::move(parsed_lb_config)),
parsed_deprecated_lb_policy_(parsed_deprecated_lb_policy),
retry_throttling_(retry_throttling) {}
grpc_core::Optional<RetryThrottling> retry_throttling() const {
Optional<RetryThrottling> retry_throttling() const {
return retry_throttling_;
}
@ -67,7 +67,7 @@ class ClientChannelGlobalParsedObject : public ServiceConfigParsedObject {
private:
UniquePtr<ParsedLoadBalancingConfig> parsed_lb_config_;
const char* parsed_deprecated_lb_policy_ = nullptr;
grpc_core::Optional<RetryThrottling> retry_throttling_;
Optional<RetryThrottling> retry_throttling_;
};
class ClientChannelMethodParsedObject : public ServiceConfigParsedObject {
@ -116,9 +116,8 @@ class ClientChannelServiceConfigParser : public ServiceConfigParser {
class ProcessedResolverResult {
public:
// Processes the resolver result and populates the relative members
// for later consumption. Tries to parse retry parameters only if parse_retry
// is true.
ProcessedResolverResult(Resolver::Result* resolver_result, bool parse_retry);
// for later consumption.
ProcessedResolverResult(const Resolver::Result& resolver_result);
// Getters. Any managed object's ownership is transferred.
const char* service_config_json() { return service_config_json_; }
@ -137,11 +136,10 @@ class ProcessedResolverResult {
private:
// Finds the service config; extracts LB config and (maybe) retry throttle
// params from it.
void ProcessServiceConfig(const Resolver::Result& resolver_result,
bool parse_retry);
void ProcessServiceConfig(const Resolver::Result& resolver_result);
// Finds the LB policy name (when no LB config was found).
void ProcessLbPolicyName(const Resolver::Result& resolver_result);
// Extracts the LB policy.
void ProcessLbPolicy(const Resolver::Result& resolver_result);
// Parses the service config. Intended to be used by
// ServiceConfig::ParseGlobalParams.
@ -159,7 +157,6 @@ class ProcessedResolverResult {
UniquePtr<char> lb_policy_name_;
const ParsedLoadBalancingConfig* lb_policy_config_ = nullptr;
// Retry throttle data.
char* server_name_ = nullptr;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
const HealthCheckParsedObject* health_check_ = nullptr;
};

@ -539,7 +539,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
bool service_config_changed = false;
if (process_resolver_result_ != nullptr) {
service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, &result, &lb_policy_name,
process_resolver_result_user_data_, result, &lb_policy_name,
&lb_policy_config, &health_check);
} else {
lb_policy_name = child_policy_name_.get();

@ -66,7 +66,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// lb_policy_name and lb_policy_config to point to the right data.
// Returns true if the service config has changed since the last result.
typedef bool (*ProcessResolverResultCallback)(
void* user_data, Resolver::Result* result, const char** lb_policy_name,
void* user_data, const Resolver::Result& result,
const char** lb_policy_name,
const ParsedLoadBalancingConfig** lb_policy_config,
const HealthCheckParsedObject** health_check);
// If error is set when this returns, then construction failed, and

@ -100,33 +100,6 @@ class ServiceConfig : public RefCounted<ServiceConfig> {
const char* service_config_json() const { return service_config_json_.get(); }
/// Invokes \a process_json() for each global parameter in the service
/// config. \a arg is passed as the second argument to \a process_json().
template <typename T>
using ProcessJson = void (*)(const grpc_json*, T*);
template <typename T>
void ParseGlobalParams(ProcessJson<T> process_json, T* arg) const;
/// Creates a method config table based on the data in \a json.
/// The table's keys are request paths. The table's value type is
/// returned by \a create_value(), based on data parsed from the JSON tree.
/// Returns null on error.
template <typename T>
using CreateValue = RefCountedPtr<T> (*)(const grpc_json* method_config_json);
template <typename T>
RefCountedPtr<SliceHashTable<RefCountedPtr<T>>> CreateMethodConfigTable(
CreateValue<T> create_value) const;
/// A helper function for looking up values in the table returned by
/// \a CreateMethodConfigTable().
/// Gets the method config for the specified \a path, which should be of
/// the form "/service/method".
/// Returns null if the method has no config.
/// Caller does NOT own a reference to the result.
template <typename T>
static RefCountedPtr<T> MethodConfigTableLookup(
const SliceHashTable<RefCountedPtr<T>>& table, const grpc_slice& path);
/// Retrieves the parsed global service config object at index \a index.
ServiceConfigParsedObject* GetParsedGlobalServiceConfigObject(size_t index) {
GPR_DEBUG_ASSERT(index < parsed_global_service_config_objects_.size());
@ -190,14 +163,6 @@ class ServiceConfig : public RefCounted<ServiceConfig> {
static UniquePtr<char> ParseJsonMethodName(grpc_json* json,
grpc_error** error);
// Parses the method config from \a json. Adds an entry to \a entries for
// each name found, incrementing \a idx for each entry added.
// Returns false on error.
template <typename T>
static bool ParseJsonMethodConfig(
grpc_json* json, CreateValue<T> create_value,
typename SliceHashTable<RefCountedPtr<T>>::Entry* entries, size_t* idx);
grpc_error* ParseJsonMethodConfigToServiceConfigObjectsTable(
const grpc_json* json,
SliceHashTable<const ServiceConfigObjectsVector*>::Entry* entries,
@ -220,133 +185,6 @@ class ServiceConfig : public RefCounted<ServiceConfig> {
service_config_objects_vectors_storage_;
};
//
// implementation -- no user-serviceable parts below
//
template <typename T>
void ServiceConfig::ParseGlobalParams(ProcessJson<T> process_json,
T* arg) const {
if (json_tree_->type != GRPC_JSON_OBJECT || json_tree_->key != nullptr) {
return;
}
for (grpc_json* field = json_tree_->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) return;
if (strcmp(field->key, "methodConfig") == 0) continue;
process_json(field, arg);
}
}
template <typename T>
bool ServiceConfig::ParseJsonMethodConfig(
grpc_json* json, CreateValue<T> create_value,
typename SliceHashTable<RefCountedPtr<T>>::Entry* entries, size_t* idx) {
// Construct value.
RefCountedPtr<T> method_config = create_value(json);
if (method_config == nullptr) return false;
// Construct list of paths.
InlinedVector<UniquePtr<char>, 10> paths;
for (grpc_json* child = json->child; child != nullptr; child = child->next) {
if (child->key == nullptr) continue;
if (strcmp(child->key, "name") == 0) {
if (child->type != GRPC_JSON_ARRAY) return false;
for (grpc_json* name = child->child; name != nullptr; name = name->next) {
grpc_error* error = GRPC_ERROR_NONE;
UniquePtr<char> path = ParseJsonMethodName(name, &error);
// We are not reporting the error here.
GRPC_ERROR_UNREF(error);
if (path == nullptr) return false;
paths.push_back(std::move(path));
}
}
}
if (paths.size() == 0) return false; // No names specified.
// Add entry for each path.
for (size_t i = 0; i < paths.size(); ++i) {
entries[*idx].key = grpc_slice_from_copied_string(paths[i].get());
entries[*idx].value = method_config; // Takes a new ref.
++*idx;
}
// Success.
return true;
}
template <typename T>
RefCountedPtr<SliceHashTable<RefCountedPtr<T>>>
ServiceConfig::CreateMethodConfigTable(CreateValue<T> create_value) const {
// Traverse parsed JSON tree.
if (json_tree_->type != GRPC_JSON_OBJECT || json_tree_->key != nullptr) {
return nullptr;
}
size_t num_entries = 0;
typename SliceHashTable<RefCountedPtr<T>>::Entry* entries = nullptr;
for (grpc_json* field = json_tree_->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) return nullptr;
if (strcmp(field->key, "methodConfig") == 0) {
if (entries != nullptr) return nullptr; // Duplicate.
if (field->type != GRPC_JSON_ARRAY) return nullptr;
// Find number of entries.
for (grpc_json* method = field->child; method != nullptr;
method = method->next) {
int count = CountNamesInMethodConfig(method);
if (count <= 0) return nullptr;
num_entries += static_cast<size_t>(count);
}
// Populate method config table entries.
entries = static_cast<typename SliceHashTable<RefCountedPtr<T>>::Entry*>(
gpr_zalloc(num_entries *
sizeof(typename SliceHashTable<RefCountedPtr<T>>::Entry)));
size_t idx = 0;
for (grpc_json* method = field->child; method != nullptr;
method = method->next) {
if (!ParseJsonMethodConfig(method, create_value, entries, &idx)) {
for (size_t i = 0; i < idx; ++i) {
grpc_slice_unref_internal(entries[i].key);
entries[i].value.reset();
}
gpr_free(entries);
return nullptr;
}
}
GPR_ASSERT(idx == num_entries);
}
}
// Instantiate method config table.
RefCountedPtr<SliceHashTable<RefCountedPtr<T>>> method_config_table;
if (entries != nullptr) {
method_config_table =
SliceHashTable<RefCountedPtr<T>>::Create(num_entries, entries, nullptr);
gpr_free(entries);
}
return method_config_table;
}
template <typename T>
RefCountedPtr<T> ServiceConfig::MethodConfigTableLookup(
const SliceHashTable<RefCountedPtr<T>>& table, const grpc_slice& path) {
const RefCountedPtr<T>* value = table.Get(path);
// If we didn't find a match for the path, try looking for a wildcard
// entry (i.e., change "/service/method" to "/service/*").
if (value == nullptr) {
char* path_str = grpc_slice_to_c_string(path);
const char* sep = strrchr(path_str, '/') + 1;
const size_t len = (size_t)(sep - path_str);
char* buf = (char*)gpr_malloc(len + 2); // '*' and NUL
memcpy(buf, path_str, len);
buf[len] = '*';
buf[len + 1] = '\0';
grpc_slice wildcard_path = grpc_slice_from_copied_string(buf);
gpr_free(buf);
value = table.Get(wildcard_path);
grpc_slice_unref_internal(wildcard_path);
gpr_free(path_str);
if (value == nullptr) return nullptr;
}
return RefCountedPtr<T>(*value);
}
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVICE_CONFIG_H */

@ -565,7 +565,6 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
"subchannel");
grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE,
"subchannel");
if (health_check != nullptr) {
health_check_service_name_ =
UniquePtr<char>(gpr_strdup(health_check->service_name()));

@ -291,7 +291,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
// Destructor for channel_data.
static void destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
chand->svc_cfg.reset();
chand->~channel_data();
}
const grpc_channel_filter grpc_message_size_filter = {
@ -355,6 +355,7 @@ void grpc_message_size_filter_init(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_message_size_filter, nullptr);
grpc_core::MessageSizeParser::Register();
}
void grpc_message_size_filter_shutdown(void) {}

@ -546,12 +546,13 @@ TEST_F(ClientChannelParserTest, LoadBalancingPolicyXdsNotAllowed) {
auto svc_cfg = ServiceConfig::Create(test_json, &error);
gpr_log(GPR_ERROR, "%s", grpc_error_string(error));
ASSERT_TRUE(error != GRPC_ERROR_NONE);
std::regex e(
std::string("(Service config parsing "
"error)(.*)(referenced_errors)(.*)(Global "
"Params)(.*)(referenced_errors)(.*)(Client channel global "
"parser)(.*)(referenced_errors)(.*)(field:"
"loadBalancingPolicy error:xds not supported)"));
std::regex e(std::string(
"(Service config parsing "
"error)(.*)(referenced_errors)(.*)(Global "
"Params)(.*)(referenced_errors)(.*)(Client channel global "
"parser)(.*)(referenced_errors)(.*)(field:loadBalancingPolicy error:Xds "
"Parser has required field - balancerName. Please use "
"loadBalancingConfig instead of the deprecated loadBalancingPolicy)"));
std::smatch match;
std::string s(grpc_error_string(error));
EXPECT_TRUE(std::regex_search(s, match, e));

Loading…
Cancel
Save