LB policies: convert to new JSON API (#30468)

* Declarative JSON parser

* Automated change: Fix sanity tests

* fix

* shrinking stuff a little

* static vtables

* separate fns

* simpler?

* make maps work

* windows fixes

* Automated change: Fix sanity tests

* simplify code

* Automated change: Fix sanity tests

* vtable-test

* dont always create vec/map impls for every type

* comments

* make error consistent

* move method private

* progress

* durations!

* Automated change: Fix sanity tests

* fix

* fix

* fix

* Automated change: Fix sanity tests

* post-load

* Automated change: Fix sanity tests

* document JsonPostLoad() and add static_assert

* don't copy field names, to avoid length limitations

* use absl::Status

* accept either string or number for numeric values

* add test for direct data member of another struct type

* remove unused method

* add support for retaining part of the JSON wirthout processing

* update test for changes in Json::Parse() API

* add absl::optional support

* Automated change: Fix sanity tests

* fix tests, improve error messages, and add overload to parse to existing object

* remove overload of LoadFromJson()

* change special case for Json to instead use Json::Object

* fix build

* improve error structure, add missing types, and improve tests

* clang-format

* Automated change: Fix sanity tests

* update grpclb parsing

* fix build

* convert outlier_detection LB config

* convert priority LB policy

* convert ring_hash LB config

* add LoadJsonObjectField(), add LoadFromJson() overload that takes an ErrorList parameter, and add tests for parsing bare top-level types

* fix msan

* Automated change: Fix sanity tests

* WIP on RLS parsing conversion

* fix error message

* Automated change: Fix sanity tests

* fixed RLS parser tests

* fix error prefix in OD, priority, and ring_hash policies

* convert weighted_target policy

* convert xds_cluster_manager

* convert cds policy

* convert xds_cluster_resolver

* convert xds_cluster_impl

* Automated change: Fix sanity tests

* fix test

* fix xds_cluster_impl drop config parsing

* Automated change: Fix sanity tests

* attempt to fix mac build

* work around gcc6 problem

* Automated change: Fix sanity tests

* fix build

* add mechanism to conditionally disable individual fields

* fix build

* Automated change: Fix sanity tests

* fix move assignment operator

* fix build

* Automated change: Fix sanity tests

* fix build and simplify RLS duplicate key check

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* fix build

* iwyu

* fix build

* fix sanity

* add LoadRefCountedFromJson() and use it to eliminate some moves

* fix build

* Automated change: Fix sanity tests

Co-authored-by: Craig Tiller <craig.tiller@gmail.com>
Co-authored-by: ctiller <ctiller@users.noreply.github.com>
Co-authored-by: Craig Tiller <ctiller@google.com>
Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/31049/head^2
Mark D. Roth 3 years ago committed by GitHub
parent 5e0165bc02
commit e475e165a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      BUILD
  2. 90
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 174
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  4. 10
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h
  5. 169
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  6. 68
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  7. 14
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
  8. 762
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  9. 127
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  10. 38
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  11. 253
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  12. 145
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  13. 357
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  14. 10
      src/core/lib/json/json_object_loader.cc
  15. 25
      src/core/lib/json/json_object_loader.h
  16. 294
      test/core/client_channel/rls_lb_config_parser_test.cc
  17. 4
      test/core/client_channel/service_config_test.cc
  18. 37
      test/core/json/json_object_loader_test.cc
  19. 4
      test/cpp/end2end/xds/xds_rls_end2end_test.cc

22
BUILD

@ -4346,6 +4346,8 @@ grpc_cc_library(
"grpc_trace",
"iomgr_timer",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -4412,7 +4414,8 @@ grpc_cc_library(
"grpc_trace",
"iomgr_timer",
"json",
"json_util",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -4845,6 +4848,8 @@ grpc_cc_library(
"grpc_trace",
"grpc_xds_client",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -4914,6 +4919,7 @@ grpc_cc_library(
"grpc_trace",
"grpc_xds_client",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
@ -4956,6 +4962,7 @@ grpc_cc_library(
"grpc_trace",
"grpc_xds_client",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
@ -4996,6 +5003,8 @@ grpc_cc_library(
"grpc_trace",
"iomgr_timer",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -5119,6 +5128,8 @@ grpc_cc_library(
"grpc_lb_subchannel_list",
"grpc_trace",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -5173,6 +5184,9 @@ grpc_cc_library(
language = "c++",
deps = [
"gpr_platform",
"json",
"json_args",
"json_object_loader",
"time",
],
)
@ -5207,7 +5221,6 @@ grpc_cc_library(
"iomgr_fwd",
"iomgr_timer",
"json",
"json_util",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -5249,6 +5262,8 @@ grpc_cc_library(
"grpc_trace",
"iomgr_timer",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -5288,6 +5303,8 @@ grpc_cc_library(
"grpc_lb_address_filtering",
"grpc_trace",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -7786,6 +7803,7 @@ grpc_cc_library(
"json",
"json_args",
"no_destruct",
"ref_counted_ptr",
"time",
],
)

@ -124,6 +124,8 @@
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -164,10 +166,46 @@ constexpr absl::string_view kGrpclb = "grpclb";
class GrpcLbConfig : public LoadBalancingPolicy::Config {
public:
GrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string service_name)
: child_policy_(std::move(child_policy)),
service_name_(std::move(service_name)) {}
GrpcLbConfig() = default;
GrpcLbConfig(const GrpcLbConfig&) = delete;
GrpcLbConfig& operator=(const GrpcLbConfig&) = delete;
GrpcLbConfig(GrpcLbConfig&& other) = delete;
GrpcLbConfig& operator=(GrpcLbConfig&& other) = delete;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<GrpcLbConfig>()
// Note: "childPolicy" field requires custom parsing, so
// it's handled in JsonPostLoad() instead.
.OptionalField("serviceName", &GrpcLbConfig::service_name_)
.Finish();
return loader;
}
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors) {
ScopedField field(errors, ".childPolicy");
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
child_policy_config_json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
}};
child_policy_config_json = &child_policy_config_json_tmp;
} else {
child_policy_config_json = &it->second;
}
auto child_policy_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
*child_policy_config_json);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
return;
}
child_policy_ = std::move(*child_policy_config);
}
absl::string_view name() const override { return kGrpclb; }
@ -1867,48 +1905,8 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
if (json.type() == Json::Type::JSON_NULL) {
return MakeRefCounted<GrpcLbConfig>(nullptr, "");
}
std::vector<std::string> error_list;
std::string service_name;
auto it = json.object_value().find("serviceName");
if (it != json.object_value().end()) {
const Json& service_name_json = it->second;
if (service_name_json.type() != Json::Type::STRING) {
error_list.emplace_back(
"field:serviceName error:type should be string");
} else {
service_name = service_name_json.string_value();
}
}
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
child_policy_config_json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
}};
child_policy_config_json = &child_policy_config_json_tmp;
} else {
child_policy_config_json = &it->second;
}
auto child_policy_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
*child_policy_config_json);
if (!child_policy_config.ok()) {
error_list.emplace_back(
absl::StrCat("error parsing childPolicy field: ",
child_policy_config.status().message()));
}
if (error_list.empty()) {
return MakeRefCounted<GrpcLbConfig>(std::move(*child_policy_config),
std::move(service_name));
} else {
return absl::InvalidArgumentError(
absl::StrCat("errors parsing grpclb LB policy config: [",
absl::StrJoin(error_list, "; "), "]"));
}
return LoadRefCountedFromJson<GrpcLbConfig>(
json, JsonArgs(), "errors validating grpclb LB policy config");
}
};

@ -36,8 +36,6 @@
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
@ -63,7 +61,6 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_util.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -1035,95 +1032,34 @@ class OutlierDetectionLbFactory : public LoadBalancingPolicyFactory {
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
std::vector<grpc_error_handle> error_list;
// Outlier detection config
ErrorList errors;
OutlierDetectionConfig outlier_detection_config;
auto it = json.object_value().find("successRateEjection");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back(
"field:successRateEjection error:type must be object");
} else {
OutlierDetectionConfig::SuccessRateEjection success_config;
const Json::Object& object = it->second.object_value();
ParseJsonObjectField(object, "stdevFactor",
&success_config.stdev_factor, &error_list,
/*required=*/false);
ParseJsonObjectField(object, "enforcementPercentage",
&success_config.enforcement_percentage,
&error_list, /*required=*/false);
ParseJsonObjectField(object, "minimumHosts",
&success_config.minimum_hosts, &error_list,
/*required=*/false);
ParseJsonObjectField(object, "requestVolume",
&success_config.request_volume, &error_list,
/*required=*/false);
outlier_detection_config.success_rate_ejection = success_config;
}
}
it = json.object_value().find("failurePercentageEjection");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back(
"field:successRateEjection error:type must be object");
} else {
OutlierDetectionConfig::FailurePercentageEjection failure_config;
const Json::Object& object = it->second.object_value();
ParseJsonObjectField(object, "threshold", &failure_config.threshold,
&error_list, /*required=*/false);
ParseJsonObjectField(object, "enforcementPercentage",
&failure_config.enforcement_percentage,
&error_list, /*required=*/false);
ParseJsonObjectField(object, "minimumHosts",
&failure_config.minimum_hosts, &error_list,
/*required=*/false);
ParseJsonObjectField(object, "requestVolume",
&failure_config.request_volume, &error_list,
/*required=*/false);
outlier_detection_config.failure_percentage_ejection = failure_config;
}
}
ParseJsonObjectFieldAsDuration(json.object_value(), "interval",
&outlier_detection_config.interval,
&error_list, /*required=*/false);
ParseJsonObjectFieldAsDuration(json.object_value(), "baseEjectionTime",
&outlier_detection_config.base_ejection_time,
&error_list, /*required=*/false);
if (!ParseJsonObjectFieldAsDuration(
json.object_value(), "maxEjectionTime",
&outlier_detection_config.max_ejection_time, &error_list,
/*required=*/false)) {
outlier_detection_config.max_ejection_time = std::max(
outlier_detection_config.base_ejection_time, Duration::Seconds(300));
}
ParseJsonObjectField(json.object_value(), "maxEjectionPercent",
&outlier_detection_config.max_ejection_percent,
&error_list, /*required=*/false);
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors.emplace_back("field:childPolicy error:required field missing");
} else {
auto child_policy_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!child_policy_config.ok()) {
errors.emplace_back(
absl::StrCat("error parsing childPolicy field: ",
child_policy_config.status().message()));
} else {
child_policy = std::move(*child_policy_config);
{
ScopedField field(&errors, "[\"outlier_detection_experimental\"]");
outlier_detection_config =
LoadFromJson<OutlierDetectionConfig>(json, JsonArgs(), &errors);
// Parse childPolicy manually.
{
ScopedField field(&errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors.AddError("field not present");
} else {
auto child_policy_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!child_policy_config.ok()) {
errors.AddError(child_policy_config.status().message());
} else {
child_policy = std::move(*child_policy_config);
}
}
}
}
for (auto& error : error_list) {
errors.emplace_back(grpc_error_std_string(error));
GRPC_ERROR_UNREF(error);
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("outlier_detection_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
if (!errors.ok()) {
return errors.status(
"errors validating outlier_detection LB policy config");
}
return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
std::move(child_policy));
@ -1132,6 +1068,68 @@ class OutlierDetectionLbFactory : public LoadBalancingPolicyFactory {
} // namespace
//
// OutlierDetectionConfig
//
const JsonLoaderInterface*
OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<SuccessRateEjection>()
.OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor)
.OptionalField("enforcementPercentage",
&SuccessRateEjection::enforcement_percentage)
.OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts)
.OptionalField("requestVolume", &SuccessRateEjection::request_volume)
.Finish();
return loader;
}
const JsonLoaderInterface*
OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<FailurePercentageEjection>()
.OptionalField("threshold", &FailurePercentageEjection::threshold)
.OptionalField("enforcementPercentage",
&FailurePercentageEjection::enforcement_percentage)
.OptionalField("minimumHosts",
&FailurePercentageEjection::minimum_hosts)
.OptionalField("requestVolume",
&FailurePercentageEjection::request_volume)
.Finish();
return loader;
}
const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<OutlierDetectionConfig>()
.OptionalField("interval", &OutlierDetectionConfig::interval)
.OptionalField("baseEjectionTime",
&OutlierDetectionConfig::base_ejection_time)
.OptionalField("maxEjectionTime",
&OutlierDetectionConfig::max_ejection_time)
.OptionalField("maxEjectionPercent",
&OutlierDetectionConfig::max_ejection_percent)
.OptionalField("successRateEjection",
&OutlierDetectionConfig::success_rate_ejection)
.OptionalField("failurePercentageEjection",
&OutlierDetectionConfig::failure_percentage_ejection)
.Finish();
return loader;
}
void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&,
ErrorList* /*errors*/) {
if (json.object_value().find("maxEjectionTime") ==
json.object_value().end()) {
max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300));
}
}
//
// Plugin registration
//
void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) {
if (XdsOutlierDetectionEnabled()) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(

@ -24,6 +24,9 @@
#include "absl/types/optional.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
namespace grpc_core {
@ -46,6 +49,8 @@ struct OutlierDetectionConfig {
minimum_hosts == other.minimum_hosts &&
request_volume == other.request_volume;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
};
struct FailurePercentageEjection {
uint32_t threshold = 85;
@ -59,6 +64,8 @@ struct OutlierDetectionConfig {
minimum_hosts == other.minimum_hosts &&
request_volume == other.request_volume;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
};
absl::optional<SuccessRateEjection> success_rate_ejection;
absl::optional<FailurePercentageEjection> failure_percentage_ejection;
@ -71,6 +78,9 @@ struct OutlierDetectionConfig {
success_rate_ejection == other.success_rate_ejection &&
failure_percentage_ejection == other.failure_percentage_ejection;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
};
} // namespace grpc_core

@ -17,11 +17,11 @@
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <stddef.h>
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
@ -55,6 +55,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -85,11 +87,18 @@ class PriorityLbConfig : public LoadBalancingPolicy::Config {
struct PriorityLbChild {
RefCountedPtr<LoadBalancingPolicy::Config> config;
bool ignore_reresolution_requests = false;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
};
PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
std::vector<std::string> priorities)
: children_(std::move(children)), priorities_(std::move(priorities)) {}
PriorityLbConfig() = default;
PriorityLbConfig(const PriorityLbConfig&) = delete;
PriorityLbConfig& operator=(const PriorityLbConfig&) = delete;
PriorityLbConfig(PriorityLbConfig&& other) = delete;
PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete;
absl::string_view name() const override { return kPriority; }
@ -98,9 +107,12 @@ class PriorityLbConfig : public LoadBalancingPolicy::Config {
}
const std::vector<std::string>& priorities() const { return priorities_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
private:
const std::map<std::string, PriorityLbChild> children_;
const std::vector<std::string> priorities_;
std::map<std::string, PriorityLbChild> children_;
std::vector<std::string> priorities_;
};
// priority LB policy.
@ -851,6 +863,61 @@ void PriorityLb::ChildPriority::Helper::AddTraceEvent(
// factory
//
const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<PriorityLbChild>()
// Note: The "config" field requires custom parsing, so it's
// handled in JsonPostLoad() instead of here.
.OptionalField("ignore_reresolution_requests",
&PriorityLbChild::ignore_reresolution_requests)
.Finish();
return loader;
}
void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json,
const JsonArgs&,
ErrorList* errors) {
ScopedField field(errors, ".config");
auto it = json.object_value().find("config");
if (it == json.object_value().end()) {
errors->AddError("field not present");
return;
}
auto lb_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
return;
}
config = std::move(*lb_config);
}
const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<PriorityLbConfig>()
.Field("children", &PriorityLbConfig::children_)
.Field("priorities", &PriorityLbConfig::priorities_)
.Finish();
return loader;
}
void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&,
ErrorList* errors) {
std::set<std::string> unknown_priorities;
for (const std::string& priority : priorities_) {
if (children_.find(priority) == children_.end()) {
unknown_priorities.insert(priority);
}
}
if (!unknown_priorities.empty()) {
errors->AddError(absl::StrCat("unknown priorit(ies): [",
absl::StrJoin(unknown_priorities, ", "),
"]"));
}
}
class PriorityLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
@ -870,94 +937,8 @@ class PriorityLbFactory : public LoadBalancingPolicyFactory {
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
// Children.
std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
auto it = json.object_value().find("children");
if (it == json.object_value().end()) {
errors.emplace_back("field:children error:required field missing");
} else if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back("field:children error:type should be object");
} else {
const Json::Object& object = it->second.object_value();
for (const auto& p : object) {
const std::string& child_name = p.first;
const Json& element = p.second;
if (element.type() != Json::Type::OBJECT) {
errors.emplace_back(absl::StrCat("field:children key:", child_name,
" error:should be type object"));
} else {
auto it2 = element.object_value().find("config");
if (it2 == element.object_value().end()) {
errors.emplace_back(absl::StrCat("field:children key:", child_name,
" error:missing 'config' field"));
} else {
bool ignore_resolution_requests = false;
// If present, ignore_reresolution_requests must be of type
// boolean.
auto it3 =
element.object_value().find("ignore_reresolution_requests");
if (it3 != element.object_value().end()) {
if (it3->second.type() == Json::Type::JSON_TRUE) {
ignore_resolution_requests = true;
} else if (it3->second.type() != Json::Type::JSON_FALSE) {
errors.emplace_back(
absl::StrCat("field:children key:", child_name,
" field:ignore_reresolution_requests:should "
"be type boolean"));
}
}
auto config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it2->second);
if (!config.ok()) {
errors.emplace_back(
absl::StrCat("field:children key:", child_name, ": ",
config.status().message()));
} else {
children[child_name].config = std::move(*config);
children[child_name].ignore_reresolution_requests =
ignore_resolution_requests;
}
}
}
}
}
// Priorities.
std::vector<std::string> priorities;
it = json.object_value().find("priorities");
if (it == json.object_value().end()) {
errors.emplace_back("field:priorities error:required field missing");
} else if (it->second.type() != Json::Type::ARRAY) {
errors.emplace_back("field:priorities error:type should be array");
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
const Json& element = array[i];
if (element.type() != Json::Type::STRING) {
errors.emplace_back(absl::StrCat("field:priorities element:", i,
" error:should be type string"));
} else if (children.find(element.string_value()) == children.end()) {
errors.emplace_back(absl::StrCat("field:priorities element:", i,
" error:unknown child '",
element.string_value(), "'"));
} else {
priorities.emplace_back(element.string_value());
}
}
if (priorities.size() != children.size()) {
errors.emplace_back(absl::StrCat(
"field:priorities error:priorities size (", priorities.size(),
") != children size (", children.size(), ")"));
}
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("priority_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<PriorityLbConfig>(std::move(children),
std::move(priorities));
return LoadRefCountedFromJson<PriorityLbConfig>(
json, JsonArgs(), "errors validating priority LB policy config");
}
};

@ -24,7 +24,6 @@
#include <algorithm>
#include <atomic>
#include <cmath>
#include <map>
#include <memory>
#include <string>
#include <utility>
@ -38,7 +37,6 @@
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -54,7 +52,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -82,49 +79,35 @@ UniqueTypeName RequestHashAttributeName() {
}
// Helper Parser method
absl::StatusOr<RingHashConfig> ParseRingHashLbConfig(const Json& json) {
if (json.type() != Json::Type::OBJECT) {
return absl::InvalidArgumentError(
"ring_hash_experimental should be of type object");
}
RingHashConfig config;
std::vector<std::string> errors;
const Json::Object& ring_hash = json.object_value();
auto ring_hash_it = ring_hash.find("min_ring_size");
if (ring_hash_it != ring_hash.end()) {
if (ring_hash_it->second.type() != Json::Type::NUMBER) {
errors.emplace_back(
"field:min_ring_size error: should be of type number");
} else {
config.min_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
const JsonLoaderInterface* RingHashConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<RingHashConfig>()
.OptionalField("min_ring_size", &RingHashConfig::min_ring_size)
.OptionalField("max_ring_size", &RingHashConfig::max_ring_size)
.Finish();
return loader;
}
void RingHashConfig::JsonPostLoad(const Json&, const JsonArgs&,
ErrorList* errors) {
{
ScopedField field(errors, ".min_ring_size");
if (!errors->FieldHasErrors() &&
(min_ring_size == 0 || min_ring_size > 8388608)) {
errors->AddError("must be in the range [1, 8388608]");
}
}
ring_hash_it = ring_hash.find("max_ring_size");
if (ring_hash_it != ring_hash.end()) {
if (ring_hash_it->second.type() != Json::Type::NUMBER) {
errors.emplace_back(
"field:max_ring_size error: should be of type number");
} else {
config.max_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
{
ScopedField field(errors, ".max_ring_size");
if (!errors->FieldHasErrors() &&
(max_ring_size == 0 || max_ring_size > 8388608)) {
errors->AddError("must be in the range [1, 8388608]");
}
}
if (config.min_ring_size == 0 || config.min_ring_size > 8388608 ||
config.max_ring_size == 0 || config.max_ring_size > 8388608 ||
config.min_ring_size > config.max_ring_size) {
errors.emplace_back(
"field:max_ring_size and or min_ring_size error: "
"values need to be in the range of 1 to 8388608 "
"and max_ring_size cannot be smaller than "
"min_ring_size");
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("errors parsing ring hash LB config: [",
absl::StrJoin(errors, "; "), "]"));
if (min_ring_size > max_ring_size) {
errors->AddError("max_ring_size cannot be smaller than min_ring_size");
}
return config;
}
namespace {
@ -890,7 +873,8 @@ class RingHashFactory : public LoadBalancingPolicyFactory {
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
auto config = ParseRingHashLbConfig(json);
auto config = LoadFromJson<RingHashConfig>(
json, JsonArgs(), "errors validating ring_hash LB policy config");
if (!config.ok()) return config.status();
return MakeRefCounted<RingHashLbConfig>(config->min_ring_size,
config->max_ring_size);

@ -19,12 +19,12 @@
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include "absl/status/statusor.h"
#include <stdint.h>
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
namespace grpc_core {
@ -33,10 +33,12 @@ UniqueTypeName RequestHashAttributeName();
// Helper Parsing method to parse ring hash policy configs; for example, ring
// hash size validity.
struct RingHashConfig {
size_t min_ring_size = 1024;
size_t max_ring_size = 8388608;
uint64_t min_ring_size = 1024;
uint64_t max_ring_size = 8388608;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
};
absl::StatusOr<RingHashConfig> ParseRingHashLbConfig(const Json& json);
} // namespace grpc_core

@ -82,7 +82,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_util.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -142,25 +143,24 @@ class RlsLbConfig : public LoadBalancingPolicy::Config {
struct RouteLookupConfig {
KeyBuilderMap key_builder_map;
std::string lookup_service;
Duration lookup_service_timeout;
Duration max_age;
Duration stale_age;
Duration lookup_service_timeout = kDefaultLookupServiceTimeout;
Duration max_age = kMaxMaxAge;
Duration stale_age = kMaxMaxAge;
int64_t cache_size_bytes = 0;
std::string default_target;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args,
ErrorList* errors);
};
RlsLbConfig(RouteLookupConfig route_lookup_config,
std::string rls_channel_service_config, Json child_policy_config,
std::string child_policy_config_target_field_name,
RefCountedPtr<LoadBalancingPolicy::Config>
default_child_policy_parsed_config)
: route_lookup_config_(std::move(route_lookup_config)),
rls_channel_service_config_(std::move(rls_channel_service_config)),
child_policy_config_(std::move(child_policy_config)),
child_policy_config_target_field_name_(
std::move(child_policy_config_target_field_name)),
default_child_policy_parsed_config_(
std::move(default_child_policy_parsed_config)) {}
RlsLbConfig() = default;
RlsLbConfig(const RlsLbConfig&) = delete;
RlsLbConfig& operator=(const RlsLbConfig&) = delete;
RlsLbConfig(RlsLbConfig&& other) = delete;
RlsLbConfig& operator=(RlsLbConfig&& other) = delete;
absl::string_view name() const override { return kRls; }
@ -193,6 +193,9 @@ class RlsLbConfig : public LoadBalancingPolicy::Config {
return default_child_policy_parsed_config_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
private:
RouteLookupConfig route_lookup_config_;
std::string rls_channel_service_config_;
@ -742,28 +745,32 @@ void RlsLb::ChildPolicyWrapper::Orphan() {
picker_.reset();
}
grpc_error_handle InsertOrUpdateChildPolicyField(const std::string& field,
const std::string& value,
Json* config) {
bool InsertOrUpdateChildPolicyField(const std::string& field,
const std::string& value, Json* config,
ErrorList* errors) {
if (config->type() != Json::Type::ARRAY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"child policy configuration is not an array");
errors->AddError("is not an array");
return false;
}
std::vector<grpc_error_handle> error_list;
for (Json& child_json : *config->mutable_array()) {
bool success = true;
for (size_t i = 0; i < config->array_value().size(); ++i) {
Json& child_json = (*config->mutable_array())[i];
ScopedField json_field(errors, absl::StrCat("[", i, "]"));
if (child_json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"child policy item is not an object"));
errors->AddError("is not an object");
success = false;
} else {
Json::Object& child = *child_json.mutable_object();
if (child.size() != 1) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"child policy item contains more than one field"));
errors->AddError("child policy object contains more than one field");
success = false;
} else {
ScopedField json_field(
errors, absl::StrCat("[\"", child.begin()->first, "\"]"));
Json& child_config_json = child.begin()->second;
if (child_config_json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"child policy item config is not an object"));
errors->AddError("child policy config is not an object");
success = false;
} else {
Json::Object& child_config = *child_config_json.mutable_object();
child_config[field] = Json(value);
@ -771,18 +778,15 @@ grpc_error_handle InsertOrUpdateChildPolicyField(const std::string& field,
}
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("errors when inserting field \"", field,
"\" for child policy"),
&error_list);
return success;
}
void RlsLb::ChildPolicyWrapper::StartUpdate() {
Json child_policy_config = lb_policy_->config_->child_policy_config();
grpc_error_handle error = InsertOrUpdateChildPolicyField(
ErrorList errors;
GPR_ASSERT(InsertOrUpdateChildPolicyField(
lb_policy_->config_->child_policy_config_target_field_name(), target_,
&child_policy_config);
GPR_ASSERT(GRPC_ERROR_IS_NONE(error));
&child_policy_config, &errors));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(
GPR_INFO,
@ -2143,346 +2147,338 @@ void RlsLb::UpdatePickerLocked() {
// RlsLbFactory
//
grpc_error_handle ParseJsonHeaders(size_t idx, const Json& json,
std::string* key,
std::vector<std::string>* headers) {
if (json.type() != Json::Type::OBJECT) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"field:headers index:", idx, " error:type should be OBJECT"));
}
std::vector<grpc_error_handle> error_list;
// requiredMatch must not be present.
if (json.object_value().find("requiredMatch") != json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:requiredMatch error:must not be present"));
}
// Find key.
if (ParseJsonObjectField(json.object_value(), "key", key, &error_list) &&
key->empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:key error:must be non-empty"));
}
// Find headers.
const Json::Array* headers_json = nullptr;
ParseJsonObjectField(json.object_value(), "names", &headers_json,
&error_list);
if (headers_json != nullptr) {
if (headers_json->empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:names error:list is empty"));
} else {
size_t name_idx = 0;
for (const Json& name_json : *headers_json) {
if (name_json.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"field:names index:", name_idx, " error:type should be STRING")));
} else if (name_json.string_value().empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("field:names index:", name_idx,
" error:header name must be non-empty")));
} else {
headers->push_back(name_json.string_value());
struct GrpcKeyBuilder {
struct Name {
std::string service;
std::string method;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader = JsonObjectLoader<Name>()
.Field("service", &Name::service)
.OptionalField("method", &Name::method)
.Finish();
return loader;
}
};
struct NameMatcher {
std::string key;
std::vector<std::string> names;
absl::optional<bool> required_match;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<NameMatcher>()
.Field("key", &NameMatcher::key)
.Field("names", &NameMatcher::names)
.OptionalField("requiredMatch", &NameMatcher::required_match)
.Finish();
return loader;
}
void JsonPostLoad(const Json&, const JsonArgs&, ErrorList* errors) {
// key must be non-empty.
{
ScopedField field(errors, ".key");
if (!errors->FieldHasErrors() && key.empty()) {
errors->AddError("must be non-empty");
}
++name_idx;
}
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("field:headers index:", idx), &error_list);
}
std::string ParseJsonMethodName(size_t idx, const Json& json,
grpc_error_handle* error) {
if (json.type() != Json::Type::OBJECT) {
*error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"field:names index:", idx, " error:type should be OBJECT"));
return "";
}
std::vector<grpc_error_handle> error_list;
// Find service name.
absl::string_view service_name;
ParseJsonObjectField(json.object_value(), "service", &service_name,
&error_list);
// Find method name.
absl::string_view method_name;
ParseJsonObjectField(json.object_value(), "method", &method_name, &error_list,
/*required=*/false);
// Return error, if any.
*error = GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("field:names index:", idx), &error_list);
// Construct path.
return absl::StrCat("/", service_name, "/", method_name);
}
grpc_error_handle ParseGrpcKeybuilder(
size_t idx, const Json& json, RlsLbConfig::KeyBuilderMap* key_builder_map) {
if (json.type() != Json::Type::OBJECT) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"field:grpc_keybuilders index:", idx, " error:type should be OBJECT"));
}
std::vector<grpc_error_handle> error_list;
// Parse names.
std::set<std::string> names;
const Json::Array* names_array = nullptr;
if (ParseJsonObjectField(json.object_value(), "names", &names_array,
&error_list)) {
if (names_array->empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:names error:list is empty"));
} else {
size_t name_idx = 0;
for (const Json& name_json : *names_array) {
grpc_error_handle child_error = GRPC_ERROR_NONE;
std::string name =
ParseJsonMethodName(name_idx++, name_json, &child_error);
if (!GRPC_ERROR_IS_NONE(child_error)) {
error_list.push_back(child_error);
} else {
bool inserted = names.insert(name).second;
if (!inserted) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("field:names error:duplicate entry for ", name)));
// List of header names must be non-empty.
{
ScopedField field(errors, ".names");
if (!errors->FieldHasErrors() && names.empty()) {
errors->AddError("must be non-empty");
}
// Individual header names must be non-empty.
for (size_t i = 0; i < names.size(); ++i) {
ScopedField field(errors, absl::StrCat("[", i, "]"));
if (!errors->FieldHasErrors() && names[i].empty()) {
errors->AddError("must be non-empty");
}
}
}
}
}
// Helper function to check for duplicate keys.
std::set<std::string> all_keys;
auto duplicate_key_check_func = [&all_keys,
&error_list](const std::string& key) {
auto it = all_keys.find(key);
if (it != all_keys.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("key \"", key, "\" listed multiple times")));
} else {
all_keys.insert(key);
// requiredMatch must not be present.
{
ScopedField field(errors, ".requiredMatch");
if (required_match.has_value()) {
errors->AddError("must not be present");
}
}
}
};
// Parse headers.
RlsLbConfig::KeyBuilder key_builder;
const Json::Array* headers_array = nullptr;
ParseJsonObjectField(json.object_value(), "headers", &headers_array,
&error_list, /*required=*/false);
if (headers_array != nullptr) {
size_t header_idx = 0;
for (const Json& header_json : *headers_array) {
std::string key;
std::vector<std::string> headers;
grpc_error_handle child_error =
ParseJsonHeaders(header_idx++, header_json, &key, &headers);
if (!GRPC_ERROR_IS_NONE(child_error)) {
error_list.push_back(child_error);
} else {
duplicate_key_check_func(key);
key_builder.header_keys.emplace(key, std::move(headers));
}
struct ExtraKeys {
absl::optional<std::string> host_key;
absl::optional<std::string> service_key;
absl::optional<std::string> method_key;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<ExtraKeys>()
.OptionalField("host", &ExtraKeys::host_key)
.OptionalField("service", &ExtraKeys::service_key)
.OptionalField("method", &ExtraKeys::method_key)
.Finish();
return loader;
}
void JsonPostLoad(const Json&, const JsonArgs&, ErrorList* errors) {
auto check_field = [&](const std::string& field_name,
absl::optional<std::string>* struct_field) {
ScopedField field(errors, absl::StrCat(".", field_name));
if (struct_field->has_value() && (*struct_field)->empty()) {
errors->AddError("must be non-empty if set");
}
};
check_field("host", &host_key);
check_field("service", &service_key);
check_field("method", &method_key);
}
};
std::vector<Name> names;
std::vector<NameMatcher> headers;
ExtraKeys extra_keys;
std::map<std::string /*key*/, std::string /*value*/> constant_keys;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<GrpcKeyBuilder>()
.Field("names", &GrpcKeyBuilder::names)
.OptionalField("headers", &GrpcKeyBuilder::headers)
.OptionalField("extraKeys", &GrpcKeyBuilder::extra_keys)
.OptionalField("constantKeys", &GrpcKeyBuilder::constant_keys)
.Finish();
return loader;
}
// Parse extraKeys.
const Json::Object* extra_keys = nullptr;
ParseJsonObjectField(json.object_value(), "extraKeys", &extra_keys,
&error_list, /*required=*/false);
if (extra_keys != nullptr) {
std::vector<grpc_error_handle> extra_keys_errors;
if (ParseJsonObjectField(*extra_keys, "host", &key_builder.host_key,
&extra_keys_errors, /*required=*/false) &&
key_builder.host_key.empty()) {
extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:host error:must be non-empty"));
void JsonPostLoad(const Json&, const JsonArgs&, ErrorList* errors) {
// The names field must be non-empty.
{
ScopedField field(errors, ".names");
if (!errors->FieldHasErrors() && names.empty()) {
errors->AddError("must be non-empty");
}
}
if (!key_builder.host_key.empty()) {
duplicate_key_check_func(key_builder.host_key);
// Make sure no key in constantKeys is empty.
if (constant_keys.find("") != constant_keys.end()) {
ScopedField field(errors, ".constantKeys[\"\"]");
errors->AddError("key must be non-empty");
}
if (ParseJsonObjectField(*extra_keys, "service", &key_builder.service_key,
&extra_keys_errors, /*required=*/false) &&
key_builder.service_key.empty()) {
extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:service error:must be non-empty"));
// Check for duplicate keys.
std::set<absl::string_view> keys_seen;
auto duplicate_key_check_func = [&keys_seen, errors](
const std::string& key,
const std::string& field_name) {
if (key.empty()) return; // Already generated an error about this.
ScopedField field(errors, field_name);
auto it = keys_seen.find(key);
if (it != keys_seen.end()) {
errors->AddError(absl::StrCat("duplicate key \"", key, "\""));
} else {
keys_seen.insert(key);
}
};
for (size_t i = 0; i < headers.size(); ++i) {
NameMatcher& header = headers[i];
duplicate_key_check_func(header.key,
absl::StrCat(".headers[", i, "].key"));
}
if (!key_builder.service_key.empty()) {
duplicate_key_check_func(key_builder.service_key);
for (const auto& p : constant_keys) {
duplicate_key_check_func(
p.first, absl::StrCat(".constantKeys[\"", p.first, "\"]"));
}
if (ParseJsonObjectField(*extra_keys, "method", &key_builder.method_key,
&extra_keys_errors, /*required=*/false) &&
key_builder.method_key.empty()) {
extra_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:method error:must be non-empty"));
if (extra_keys.host_key.has_value()) {
duplicate_key_check_func(*extra_keys.host_key, ".extraKeys.host");
}
if (!key_builder.method_key.empty()) {
duplicate_key_check_func(key_builder.method_key);
if (extra_keys.service_key.has_value()) {
duplicate_key_check_func(*extra_keys.service_key, ".extraKeys.service");
}
if (!extra_keys_errors.empty()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:extraKeys", &extra_keys_errors));
if (extra_keys.method_key.has_value()) {
duplicate_key_check_func(*extra_keys.method_key, ".extraKeys.method");
}
}
// Parse constantKeys.
const Json::Object* constant_keys = nullptr;
ParseJsonObjectField(json.object_value(), "constantKeys", &constant_keys,
&error_list, /*required=*/false);
if (constant_keys != nullptr) {
std::vector<grpc_error_handle> constant_keys_errors;
for (const auto& p : *constant_keys) {
const std::string& key = p.first;
const Json& value = p.second;
if (key.empty()) {
constant_keys_errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"error:keys must be non-empty"));
};
const JsonLoaderInterface* RlsLbConfig::RouteLookupConfig::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<RouteLookupConfig>()
// Note: Some fields require manual processing and are handled in
// JsonPostLoad() instead.
.Field("lookupService", &RouteLookupConfig::lookup_service)
.OptionalField("lookupServiceTimeout",
&RouteLookupConfig::lookup_service_timeout)
.OptionalField("maxAge", &RouteLookupConfig::max_age)
.OptionalField("staleAge", &RouteLookupConfig::stale_age)
.Field("cacheSizeBytes", &RouteLookupConfig::cache_size_bytes)
.OptionalField("defaultTarget", &RouteLookupConfig::default_target)
.Finish();
return loader;
}
void RlsLbConfig::RouteLookupConfig::JsonPostLoad(const Json& json,
const JsonArgs& args,
ErrorList* errors) {
// Parse grpcKeybuilders.
auto grpc_keybuilders = LoadJsonObjectField<std::vector<GrpcKeyBuilder>>(
json.object_value(), args, "grpcKeybuilders", errors);
if (grpc_keybuilders.has_value()) {
ScopedField field(errors, ".grpcKeybuilders");
for (size_t i = 0; i < grpc_keybuilders->size(); ++i) {
ScopedField field(errors, absl::StrCat("[", i, "]"));
auto& grpc_keybuilder = (*grpc_keybuilders)[i];
// Construct KeyBuilder.
RlsLbConfig::KeyBuilder key_builder;
for (const auto& header : grpc_keybuilder.headers) {
key_builder.header_keys.emplace(header.key, header.names);
}
if (grpc_keybuilder.extra_keys.host_key.has_value()) {
key_builder.host_key = std::move(*grpc_keybuilder.extra_keys.host_key);
}
if (grpc_keybuilder.extra_keys.service_key.has_value()) {
key_builder.service_key =
std::move(*grpc_keybuilder.extra_keys.service_key);
}
if (grpc_keybuilder.extra_keys.method_key.has_value()) {
key_builder.method_key =
std::move(*grpc_keybuilder.extra_keys.method_key);
}
key_builder.constant_keys = std::move(grpc_keybuilder.constant_keys);
// Add entries to map.
for (const auto& name : grpc_keybuilder.names) {
std::string path = absl::StrCat("/", name.service, "/", name.method);
bool inserted = key_builder_map.emplace(path, key_builder).second;
if (!inserted) {
errors->AddError(absl::StrCat("duplicate entry for \"", path, "\""));
}
}
duplicate_key_check_func(key);
ExtractJsonString(value, key, &key_builder.constant_keys[key],
&constant_keys_errors);
}
if (!constant_keys_errors.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
"field:constantKeys", &constant_keys_errors));
}
// Validate lookupService.
{
ScopedField field(errors, ".lookupService");
if (!errors->FieldHasErrors() &&
!CoreConfiguration::Get().resolver_registry().IsValidTarget(
lookup_service)) {
errors->AddError("must be valid gRPC target URI");
}
}
// Insert key_builder into key_builder_map.
for (const std::string& name : names) {
bool inserted = key_builder_map->emplace(name, key_builder).second;
if (!inserted) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("field:names error:duplicate entry for ", name)));
// Clamp maxAge to the max allowed value.
if (max_age > kMaxMaxAge) max_age = kMaxMaxAge;
// If staleAge is set, then maxAge must also be set.
if (json.object_value().find("staleAge") != json.object_value().end() &&
json.object_value().find("maxAge") == json.object_value().end()) {
ScopedField field(errors, ".maxAge");
errors->AddError("must be set if staleAge is set");
}
// Ignore staleAge if greater than or equal to maxAge.
if (stale_age >= max_age) stale_age = max_age;
// Validate cacheSizeBytes.
{
ScopedField field(errors, ".cacheSizeBytes");
if (!errors->FieldHasErrors() && cache_size_bytes <= 0) {
errors->AddError("must be greater than 0");
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("index:", idx), &error_list);
}
RlsLbConfig::KeyBuilderMap ParseGrpcKeybuilders(
const Json::Array& key_builder_list, grpc_error_handle* error) {
RlsLbConfig::KeyBuilderMap key_builder_map;
if (key_builder_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:grpcKeybuilders error:list is empty");
return key_builder_map;
// Clamp cacheSizeBytes to the max allowed value.
if (cache_size_bytes > kMaxCacheSizeBytes) {
cache_size_bytes = kMaxCacheSizeBytes;
}
std::vector<grpc_error_handle> error_list;
size_t idx = 0;
for (const Json& key_builder : key_builder_list) {
grpc_error_handle child_error =
ParseGrpcKeybuilder(idx++, key_builder, &key_builder_map);
if (!GRPC_ERROR_IS_NONE(child_error)) error_list.push_back(child_error);
// Validate defaultTarget.
{
ScopedField field(errors, ".defaultTarget");
if (!errors->FieldHasErrors() &&
json.object_value().find("defaultTarget") !=
json.object_value().end() &&
default_target.empty()) {
errors->AddError("must be non-empty if set");
}
}
*error = GRPC_ERROR_CREATE_FROM_VECTOR("field:grpcKeybuilders", &error_list);
return key_builder_map;
}
RlsLbConfig::RouteLookupConfig ParseRouteLookupConfig(
const Json::Object& json, grpc_error_handle* error) {
std::vector<grpc_error_handle> error_list;
RlsLbConfig::RouteLookupConfig route_lookup_config;
// Parse grpcKeybuilders.
const Json::Array* keybuilder_list = nullptr;
ParseJsonObjectField(json, "grpcKeybuilders", &keybuilder_list, &error_list);
if (keybuilder_list != nullptr) {
const JsonLoaderInterface* RlsLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<RlsLbConfig>()
// Note: Some fields require manual processing and are handled in
// JsonPostLoad() instead.
.Field("routeLookupConfig", &RlsLbConfig::route_lookup_config_)
.Field("childPolicyConfigTargetFieldName",
&RlsLbConfig::child_policy_config_target_field_name_)
.Finish();
return loader;
}
void RlsLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
ErrorList* errors) {
// Parse routeLookupChannelServiceConfig.
auto it = json.object_value().find("routeLookupChannelServiceConfig");
if (it != json.object_value().end()) {
ScopedField field(errors, ".routeLookupChannelServiceConfig");
grpc_error_handle child_error = GRPC_ERROR_NONE;
route_lookup_config.key_builder_map =
ParseGrpcKeybuilders(*keybuilder_list, &child_error);
if (!GRPC_ERROR_IS_NONE(child_error)) error_list.push_back(child_error);
}
// Parse lookupService.
if (ParseJsonObjectField(json, "lookupService",
&route_lookup_config.lookup_service, &error_list)) {
if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
route_lookup_config.lookup_service)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lookupService error:must be valid gRPC target URI"));
rls_channel_service_config_ = it->second.Dump();
auto service_config = MakeRefCounted<ServiceConfigImpl>(
ChannelArgs(), rls_channel_service_config_, it->second, &child_error);
if (!GRPC_ERROR_IS_NONE(child_error)) {
errors->AddError(grpc_error_std_string(child_error));
GRPC_ERROR_UNREF(child_error);
}
}
// Parse lookupServiceTimeout.
route_lookup_config.lookup_service_timeout = kDefaultLookupServiceTimeout;
ParseJsonObjectFieldAsDuration(json, "lookupServiceTimeout",
&route_lookup_config.lookup_service_timeout,
&error_list, /*required=*/false);
// Parse maxAge.
route_lookup_config.max_age = kMaxMaxAge;
bool max_age_set = ParseJsonObjectFieldAsDuration(
json, "maxAge", &route_lookup_config.max_age, &error_list,
/*required=*/false);
// Clamp maxAge to the max allowed value.
if (route_lookup_config.max_age > kMaxMaxAge) {
route_lookup_config.max_age = kMaxMaxAge;
}
// Parse staleAge.
route_lookup_config.stale_age = kMaxMaxAge;
bool stale_age_set = ParseJsonObjectFieldAsDuration(
json, "staleAge", &route_lookup_config.stale_age, &error_list,
/*required=*/false);
// If staleAge is set, then maxAge must also be set.
if (stale_age_set && !max_age_set) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:maxAge error:must be set if staleAge is set"));
}
// Ignore staleAge if greater than or equal to maxAge.
if (route_lookup_config.stale_age >= route_lookup_config.max_age) {
route_lookup_config.stale_age = route_lookup_config.max_age;
}
// Parse cacheSizeBytes.
ParseJsonObjectField(json, "cacheSizeBytes",
&route_lookup_config.cache_size_bytes, &error_list);
if (route_lookup_config.cache_size_bytes <= 0) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:cacheSizeBytes error:must be greater than 0"));
}
// Clamp cacheSizeBytes to the max allowed value.
if (route_lookup_config.cache_size_bytes > kMaxCacheSizeBytes) {
route_lookup_config.cache_size_bytes = kMaxCacheSizeBytes;
}
// Parse defaultTarget.
if (ParseJsonObjectField(json, "defaultTarget",
&route_lookup_config.default_target, &error_list,
/*required=*/false)) {
if (route_lookup_config.default_target.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:defaultTarget error:must be non-empty if set"));
// Validate childPolicyConfigTargetFieldName.
{
ScopedField field(errors, ".childPolicyConfigTargetFieldName");
if (!errors->FieldHasErrors() &&
child_policy_config_target_field_name_.empty()) {
errors->AddError("must be non-empty");
}
}
*error =
GRPC_ERROR_CREATE_FROM_VECTOR("field:routeLookupConfig", &error_list);
return route_lookup_config;
}
grpc_error_handle ValidateChildPolicyList(
const Json& child_policy_list,
const std::string& child_policy_config_target_field_name,
const std::string& default_target, Json* child_policy_config,
RefCountedPtr<LoadBalancingPolicy::Config>*
default_child_policy_parsed_config) {
// Add target to each entry in the config proto.
*child_policy_config = child_policy_list;
std::string target =
default_target.empty() ? kFakeTargetFieldValue : default_target;
grpc_error_handle error = InsertOrUpdateChildPolicyField(
child_policy_config_target_field_name, target, child_policy_config);
if (!GRPC_ERROR_IS_NONE(error)) return error;
// Parse the config.
auto parsed_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
*child_policy_config);
if (!parsed_config.ok()) {
return absl_status_to_grpc_error(parsed_config.status());
}
// Find the chosen config and return it in JSON form.
// We remove all non-selected configs, and in the selected config, we leave
// the target field in place, set to the default value. This slightly
// optimizes what we need to do later when we update a child policy for a
// given target.
for (Json& config : *(child_policy_config->mutable_array())) {
if (config.object_value().begin()->first == (*parsed_config)->name()) {
Json save_config = std::move(config);
child_policy_config->mutable_array()->clear();
child_policy_config->mutable_array()->push_back(std::move(save_config));
break;
// Parse childPolicy.
{
ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
// Add target to all child policy configs in the list.
child_policy_config_ = it->second;
std::string target = route_lookup_config_.default_target.empty()
? kFakeTargetFieldValue
: route_lookup_config_.default_target;
if (InsertOrUpdateChildPolicyField(child_policy_config_target_field_name_,
target, &child_policy_config_,
errors)) {
// Parse the config.
auto parsed_config =
CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(child_policy_config_);
if (!parsed_config.ok()) {
errors->AddError(parsed_config.status().message());
} else {
// Find the chosen config and return it in JSON form.
// We remove all non-selected configs, and in the selected config,
// we leave the target field in place, set to the default value.
// This slightly optimizes what we need to do later when we update
// a child policy for a given target.
for (Json& config : *(child_policy_config_.mutable_array())) {
if (config.object_value().begin()->first ==
(*parsed_config)->name()) {
Json save_config = std::move(config);
child_policy_config_.mutable_array()->clear();
child_policy_config_.mutable_array()->push_back(
std::move(save_config));
break;
}
}
// If default target is set, set the default child config.
if (!route_lookup_config_.default_target.empty()) {
default_child_policy_parsed_config_ = std::move(*parsed_config);
}
}
}
}
}
// If default target is set, return the parsed config.
if (!default_target.empty()) {
*default_child_policy_parsed_config = std::move(*parsed_config);
}
return GRPC_ERROR_NONE;
}
class RlsLbFactory : public LoadBalancingPolicyFactory {
@ -2495,83 +2491,9 @@ class RlsLbFactory : public LoadBalancingPolicyFactory {
}
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& config) const override {
std::vector<grpc_error_handle> error_list;
// Parse routeLookupConfig.
RlsLbConfig::RouteLookupConfig route_lookup_config;
const Json::Object* route_lookup_config_json = nullptr;
if (ParseJsonObjectField(config.object_value(), "routeLookupConfig",
&route_lookup_config_json, &error_list)) {
grpc_error_handle child_error = GRPC_ERROR_NONE;
route_lookup_config =
ParseRouteLookupConfig(*route_lookup_config_json, &child_error);
if (!GRPC_ERROR_IS_NONE(child_error)) error_list.push_back(child_error);
}
// Parse routeLookupChannelServiceConfig.
std::string rls_channel_service_config;
const Json::Object* rls_channel_service_config_json_obj = nullptr;
if (ParseJsonObjectField(config.object_value(),
"routeLookupChannelServiceConfig",
&rls_channel_service_config_json_obj, &error_list,
/*required=*/false)) {
grpc_error_handle child_error = GRPC_ERROR_NONE;
Json rls_channel_service_config_json(
*rls_channel_service_config_json_obj);
rls_channel_service_config = rls_channel_service_config_json.Dump();
auto service_config = MakeRefCounted<ServiceConfigImpl>(
ChannelArgs(), rls_channel_service_config,
std::move(rls_channel_service_config_json), &child_error);
if (!GRPC_ERROR_IS_NONE(child_error)) {
error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"field:routeLookupChannelServiceConfig", &child_error, 1));
GRPC_ERROR_UNREF(child_error);
}
}
// Parse childPolicyConfigTargetFieldName.
std::string child_policy_config_target_field_name;
if (ParseJsonObjectField(
config.object_value(), "childPolicyConfigTargetFieldName",
&child_policy_config_target_field_name, &error_list)) {
if (child_policy_config_target_field_name.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicyConfigTargetFieldName error:must be non-empty"));
}
}
// Parse childPolicy.
Json child_policy_config;
RefCountedPtr<LoadBalancingPolicy::Config>
default_child_policy_parsed_config;
auto it = config.object_value().find("childPolicy");
if (it == config.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:does not exist."));
} else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:childPolicy error:type should be ARRAY"));
} else {
grpc_error_handle child_error = ValidateChildPolicyList(
it->second, child_policy_config_target_field_name,
route_lookup_config.default_target, &child_policy_config,
&default_child_policy_parsed_config);
if (!GRPC_ERROR_IS_NONE(child_error)) {
error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"field:childPolicy", &child_error, 1));
GRPC_ERROR_UNREF(child_error);
}
}
// Return result.
if (!error_list.empty()) {
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_VECTOR(
"errors parsing RLS LB policy config", &error_list);
std::string error_string = grpc_error_std_string(error);
GRPC_ERROR_UNREF(error);
return absl::InvalidArgumentError(error_string);
}
return MakeRefCounted<RlsLbConfig>(
std::move(route_lookup_config), std::move(rls_channel_service_config),
std::move(child_policy_config),
std::move(child_policy_config_target_field_name),
std::move(default_child_policy_parsed_config));
ParseLoadBalancingConfig(const Json& json) const override {
return LoadRefCountedFromJson<RlsLbConfig>(
json, JsonArgs(), "errors validing RLS LB policy config");
}
};
@ -2582,6 +2504,4 @@ void RegisterRlsLbPolicy(CoreConfiguration::Builder* builder) {
absl::make_unique<RlsLbFactory>());
}
void RlsLbPluginShutdown() {}
} // namespace grpc_core

@ -44,7 +44,6 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -54,6 +53,8 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -84,17 +85,27 @@ class WeightedTargetLbConfig : public LoadBalancingPolicy::Config {
struct ChildConfig {
uint32_t weight;
RefCountedPtr<LoadBalancingPolicy::Config> config;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
};
using TargetMap = std::map<std::string, ChildConfig>;
explicit WeightedTargetLbConfig(TargetMap target_map)
: target_map_(std::move(target_map)) {}
WeightedTargetLbConfig() = default;
WeightedTargetLbConfig(const WeightedTargetLbConfig&) = delete;
WeightedTargetLbConfig& operator=(const WeightedTargetLbConfig&) = delete;
WeightedTargetLbConfig(WeightedTargetLbConfig&& other) = delete;
WeightedTargetLbConfig& operator=(WeightedTargetLbConfig&& other) = delete;
absl::string_view name() const override { return kWeightedTarget; }
const TargetMap& target_map() const { return target_map_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
private:
TargetMap target_map_;
};
@ -691,6 +702,44 @@ void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(
// factory
//
const JsonLoaderInterface* WeightedTargetLbConfig::ChildConfig::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<ChildConfig>()
// Note: The config field requires custom parsing, so it's
// handled in JsonPostLoad() instead.
.Field("weight", &ChildConfig::weight)
.Finish();
return loader;
}
void WeightedTargetLbConfig::ChildConfig::JsonPostLoad(const Json& json,
const JsonArgs&,
ErrorList* errors) {
ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
return;
}
auto lb_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
return;
}
config = std::move(*lb_config);
}
const JsonLoaderInterface* WeightedTargetLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<WeightedTargetLbConfig>()
.Field("targets", &WeightedTargetLbConfig::target_map_)
.Finish();
return loader;
}
class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
@ -710,76 +759,8 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
// Weight map.
WeightedTargetLbConfig::TargetMap target_map;
auto it = json.object_value().find("targets");
if (it == json.object_value().end()) {
errors.emplace_back("field:targets error:required field not present");
} else if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back("field:targets error:type should be object");
} else {
for (const auto& p : it->second.object_value()) {
auto config = ParseChildConfig(p.second);
if (!config.ok()) {
errors.emplace_back(config.status().message());
} else {
target_map[p.first] = std::move(*config);
}
}
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("weighted_target_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<WeightedTargetLbConfig>(std::move(target_map));
}
private:
static absl::StatusOr<WeightedTargetLbConfig::ChildConfig> ParseChildConfig(
const Json& json) {
if (json.type() != Json::Type::OBJECT) {
return absl::InvalidArgumentError("value should be of type object");
}
WeightedTargetLbConfig::ChildConfig child_config;
std::vector<std::string> errors;
// Weight.
auto it = json.object_value().find("weight");
if (it == json.object_value().end()) {
errors.emplace_back("required field \"weight\" not specified");
} else if (it->second.type() != Json::Type::NUMBER) {
errors.emplace_back("field:weight error:must be of type number");
} else {
int weight = gpr_parse_nonnegative_int(it->second.string_value().c_str());
if (weight == -1) {
errors.emplace_back("field:weight error:unparseable value");
} else if (weight == 0) {
errors.emplace_back(
"field:weight error:value must be greater than zero");
} else {
child_config.weight = weight;
}
}
// Child policy.
it = json.object_value().find("childPolicy");
if (it != json.object_value().end()) {
auto config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!config.ok()) {
errors.emplace_back(
absl::StrCat("field:childPolicy: ", config.status().message()));
} else {
child_config.config = std::move(*config);
}
}
// Return result.
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrCat(
"errors parsing target config: [", absl::StrJoin(errors, "; "), "]"));
}
return child_config;
return LoadRefCountedFromJson<WeightedTargetLbConfig>(
json, JsonArgs(), "errors validating weighted_target LB policy config");
}
};

@ -28,7 +28,6 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -58,6 +57,8 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -83,10 +84,24 @@ constexpr int kMaxAggregateClusterRecursionDepth = 16;
// Config for this LB policy.
class CdsLbConfig : public LoadBalancingPolicy::Config {
public:
explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {}
CdsLbConfig() = default;
CdsLbConfig(const CdsLbConfig&) = delete;
CdsLbConfig& operator=(const CdsLbConfig&) = delete;
CdsLbConfig(CdsLbConfig&& other) = delete;
CdsLbConfig& operator=(CdsLbConfig&& other) = delete;
const std::string& cluster() const { return cluster_; }
absl::string_view name() const override { return kCds; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader = JsonObjectLoader<CdsLbConfig>()
.Field("cluster", &CdsLbConfig::cluster_)
.Finish();
return loader;
}
private:
std::string cluster_;
};
@ -742,23 +757,8 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
"field:loadBalancingPolicy error:cds policy requires configuration. "
"Please use loadBalancingConfig field of service config instead.");
}
std::vector<std::string> errors;
// cluster name.
std::string cluster;
auto it = json.object_value().find("cluster");
if (it == json.object_value().end()) {
errors.emplace_back("required field 'cluster' not present");
} else if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("field:cluster error:type should be string");
} else {
cluster = it->second.string_value();
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("errors parsing CDS LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<CdsLbConfig>(std::move(cluster));
return LoadRefCountedFromJson<CdsLbConfig>(
json, JsonArgs(), "errors validating cds LB policy config");
}
};

@ -19,7 +19,6 @@
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <map>
#include <memory>
@ -32,7 +31,6 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
@ -52,7 +50,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -60,6 +57,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
@ -146,18 +144,13 @@ constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental";
// Config for xDS Cluster Impl LB policy.
class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
public:
XdsClusterImplLbConfig(
RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string cluster_name, std::string eds_service_name,
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server,
uint32_t max_concurrent_requests,
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config)
: child_policy_(std::move(child_policy)),
cluster_name_(std::move(cluster_name)),
eds_service_name_(std::move(eds_service_name)),
lrs_load_reporting_server_(std::move(lrs_load_reporting_server)),
max_concurrent_requests_(max_concurrent_requests),
drop_config_(std::move(drop_config)) {}
XdsClusterImplLbConfig() = default;
XdsClusterImplLbConfig(const XdsClusterImplLbConfig&) = delete;
XdsClusterImplLbConfig& operator=(const XdsClusterImplLbConfig&) = delete;
XdsClusterImplLbConfig(XdsClusterImplLbConfig&& other) = delete;
XdsClusterImplLbConfig& operator=(XdsClusterImplLbConfig&& other) = delete;
absl::string_view name() const override { return kXdsClusterImpl; }
@ -175,6 +168,9 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
return drop_config_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args, ErrorList* errors);
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string cluster_name_;
@ -693,6 +689,88 @@ void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
// factory
//
struct DropCategory {
std::string category;
uint32_t requests_per_million;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<DropCategory>()
.Field("category", &DropCategory::category)
.Field("requests_per_million", &DropCategory::requests_per_million)
.Finish();
return loader;
}
};
const JsonLoaderInterface* XdsClusterImplLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<XdsClusterImplLbConfig>()
// Note: Some fields require custom processing, so they are
// handled in JsonPostLoad() instead.
.Field("clusterName", &XdsClusterImplLbConfig::cluster_name_)
.OptionalField("edsServiceName",
&XdsClusterImplLbConfig::eds_service_name_)
// FIXME: merge in the other PR
//.OptionalField("lrsLoadReportingServer",
// &DiscoveryMechanism::lrs_load_reporting_server)
.OptionalField("maxConcurrentRequests",
&XdsClusterImplLbConfig::max_concurrent_requests_)
.Finish();
return loader;
}
void XdsClusterImplLbConfig::JsonPostLoad(const Json& json,
const JsonArgs& args,
ErrorList* errors) {
// LRS load reporting server name.
auto it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) {
ScopedField field(errors, ".lrsLoadReportingServer");
if (it->second.type() != Json::Type::OBJECT) {
errors->AddError("is not an object");
} else {
auto xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(it->second);
if (!xds_server.ok()) {
errors->AddError(xds_server.status().ToString());
} else {
lrs_load_reporting_server_ = std::move(*xds_server);
}
}
}
// Parse "childPolicy" field.
{
ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
auto lb_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
} else {
child_policy_ = std::move(*lb_config);
}
}
}
// Parse "dropCategories" field.
{
auto value = LoadJsonObjectField<std::vector<DropCategory>>(
json.object_value(), args, "dropCategories", errors);
if (value.has_value()) {
drop_config_ = MakeRefCounted<XdsEndpointResource::DropConfig>();
for (size_t i = 0; i < value->size(); ++i) {
DropCategory& drop_category = (*value)[i];
drop_config_->AddCategory(std::move(drop_category.category),
drop_category.requests_per_million);
}
}
}
}
class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
@ -721,148 +799,9 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
// Child policy.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors.emplace_back("field:childPolicy error:required field missing");
} else {
auto config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!config.ok()) {
errors.emplace_back(absl::StrCat("field:childPolicy error:",
config.status().message()));
} else {
child_policy = std::move(*config);
}
}
// Cluster name.
std::string cluster_name;
it = json.object_value().find("clusterName");
if (it == json.object_value().end()) {
errors.emplace_back("field:clusterName error:required field missing");
} else if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("field:clusterName error:type should be string");
} else {
cluster_name = it->second.string_value();
}
// EDS service name.
std::string eds_service_name;
it = json.object_value().find("edsServiceName");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("field:edsServiceName error:type should be string");
} else {
eds_service_name = it->second.string_value();
}
}
// LRS load reporting server name.
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server;
it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back(
"field:lrsLoadReportingServer error:type should be object");
} else {
auto xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(it->second);
if (!xds_server.ok()) {
errors.emplace_back(
absl::StrCat("error parsing lrs_load_reporting_server: ",
xds_server.status().ToString()));
} else {
lrs_load_reporting_server = std::move(*xds_server);
}
}
}
// Max concurrent requests.
uint32_t max_concurrent_requests = 1024;
it = json.object_value().find("maxConcurrentRequests");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::NUMBER) {
errors.emplace_back(
"field:max_concurrent_requests error:must be of type number");
} else {
max_concurrent_requests =
gpr_parse_nonnegative_int(it->second.string_value().c_str());
}
}
// Drop config.
auto drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
it = json.object_value().find("dropCategories");
if (it == json.object_value().end()) {
errors.emplace_back("field:dropCategories error:required field missing");
} else {
absl::Status status = ParseDropCategories(it->second, drop_config.get());
if (!status.ok()) errors.emplace_back(status.message());
}
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrCat(
"errors parseing xds_cluster_impl_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<XdsClusterImplLbConfig>(
std::move(child_policy), std::move(cluster_name),
std::move(eds_service_name), std::move(lrs_load_reporting_server),
max_concurrent_requests, std::move(drop_config));
}
private:
static absl::Status ParseDropCategories(
const Json& json, XdsEndpointResource::DropConfig* drop_config) {
if (json.type() != Json::Type::ARRAY) {
return absl::InvalidArgumentError("dropCategories field is not an array");
}
std::vector<std::string> errors;
for (size_t i = 0; i < json.array_value().size(); ++i) {
const Json& entry = json.array_value()[i];
absl::Status status = ParseDropCategory(entry, drop_config);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("error parsing index ", i, ": ", status.message()));
}
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("errors parsing dropCategories field: [",
absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}
static absl::Status ParseDropCategory(
const Json& json, XdsEndpointResource::DropConfig* drop_config) {
if (json.type() != Json::Type::OBJECT) {
return absl::InvalidArgumentError(
"dropCategories entry is not an object");
}
std::vector<std::string> errors;
std::string category;
auto it = json.object_value().find("category");
if (it == json.object_value().end()) {
errors.emplace_back("\"category\" field not present");
} else if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("\"category\" field is not a string");
} else {
category = it->second.string_value();
}
uint32_t requests_per_million = 0;
it = json.object_value().find("requests_per_million");
if (it == json.object_value().end()) {
errors.emplace_back("\"requests_per_million\" field is not present");
} else if (it->second.type() != Json::Type::NUMBER) {
errors.emplace_back("\"requests_per_million\" field is not a number");
} else {
requests_per_million =
gpr_parse_nonnegative_int(it->second.string_value().c_str());
}
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrJoin(errors, "; "));
}
drop_config->AddCategory(std::move(category), requests_per_million);
return absl::OkStatus();
return LoadRefCountedFromJson<XdsClusterImplLbConfig>(
json, JsonArgs(),
"errors validating xds_cluster_impl LB policy config");
}
};

@ -21,7 +21,6 @@
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
@ -54,6 +53,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -75,18 +76,34 @@ constexpr absl::string_view kXdsClusterManager =
// Config for xds_cluster_manager LB policy.
class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
public:
using ClusterMap =
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
struct Child {
RefCountedPtr<LoadBalancingPolicy::Config> config;
explicit XdsClusterManagerLbConfig(ClusterMap cluster_map)
: cluster_map_(std::move(cluster_map)) {}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
};
XdsClusterManagerLbConfig() = default;
XdsClusterManagerLbConfig(const XdsClusterManagerLbConfig&) = delete;
XdsClusterManagerLbConfig& operator=(const XdsClusterManagerLbConfig&) =
delete;
XdsClusterManagerLbConfig(XdsClusterManagerLbConfig&& other) = delete;
XdsClusterManagerLbConfig& operator=(XdsClusterManagerLbConfig&& other) =
delete;
absl::string_view name() const override { return kXdsClusterManager; }
const ClusterMap& cluster_map() const { return cluster_map_; }
const std::map<std::string, Child>& cluster_map() const {
return cluster_map_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
private:
ClusterMap cluster_map_;
std::map<std::string, Child> cluster_map_;
};
// xds_cluster_manager LB policy.
@ -295,7 +312,7 @@ absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
std::vector<std::string> errors;
for (const auto& p : config_->cluster_map()) {
const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second.config;
auto& child = children_[name];
if (child == nullptr) {
child = MakeOrphanable<ClusterChild>(Ref(DEBUG_LOCATION, "ClusterChild"),
@ -631,6 +648,52 @@ void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
// factory
//
const JsonLoaderInterface* XdsClusterManagerLbConfig::Child::JsonLoader(
const JsonArgs&) {
// Note: The "childPolicy" field requires custom processing, so
// it's handled in JsonPostLoad() instead.
static const auto* loader = JsonObjectLoader<Child>().Finish();
return loader;
}
void XdsClusterManagerLbConfig::Child::JsonPostLoad(const Json& json,
const JsonArgs&,
ErrorList* errors) {
ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
return;
}
auto lb_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
return;
}
config = std::move(*lb_config);
}
const JsonLoaderInterface* XdsClusterManagerLbConfig::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<XdsClusterManagerLbConfig>()
.Field("children", &XdsClusterManagerLbConfig::cluster_map_)
.Finish();
return loader;
}
void XdsClusterManagerLbConfig::JsonPostLoad(const Json&, const JsonArgs&,
ErrorList* errors) {
if (cluster_map_.empty()) {
ScopedField field(errors, ".children");
if (!errors->FieldHasErrors()) {
errors->AddError("no valid children configured");
}
}
}
class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
@ -650,69 +713,9 @@ class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
XdsClusterManagerLbConfig::ClusterMap cluster_map;
std::set<std::string /*cluster_name*/> clusters_to_be_used;
auto it = json.object_value().find("children");
if (it == json.object_value().end()) {
errors.emplace_back("field:children error:required field not present");
} else if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back("field:children error:type should be object");
} else {
for (const auto& p : it->second.object_value()) {
const std::string& child_name = p.first;
if (child_name.empty()) {
errors.emplace_back("field:children error: name cannot be empty");
continue;
}
auto config = ParseChildConfig(p.second);
if (!config.ok()) {
errors.emplace_back(
absl::StrCat("field:children name:", child_name,
" error:", config.status().message()));
} else {
cluster_map[child_name] = std::move(*config);
clusters_to_be_used.insert(child_name);
}
}
}
if (cluster_map.empty()) {
errors.emplace_back("no valid children configured");
}
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrCat(
"errors parsing xds_cluster_manager_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<XdsClusterManagerLbConfig>(std::move(cluster_map));
}
private:
static absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseChildConfig(const Json& json) {
if (json.type() != Json::Type::OBJECT) {
return absl::InvalidArgumentError("value should be of type object");
}
RefCountedPtr<LoadBalancingPolicy::Config> child_config;
std::vector<std::string> errors;
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors.emplace_back("did not find childPolicy");
} else {
auto config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!config.ok()) {
errors.emplace_back(absl::StrCat("field:childPolicy error:",
config.status().message()));
} else {
child_config = std::move(*config);
}
}
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrJoin(errors, "; "));
}
return child_config;
return LoadRefCountedFromJson<XdsClusterManagerLbConfig>(
json, JsonArgs(),
"errors validating xds_cluster_manager LB policy config");
}
};

@ -56,14 +56,13 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
@ -73,7 +72,6 @@
#include "src/core/lib/resolver/resolver_registry.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
@ -102,6 +100,13 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
DiscoveryMechanismType type;
std::string eds_service_name;
std::string dns_hostname;
// This is type Json::Object instead of OutlierDetectionConfig, because we
// don't actually need to validate the contents of the outlier detection
// config here. In this case, the JSON is generated by the CDS policy
// instead of coming from service config, so it's not actually any better
// to catch the problem here than it is to catch it in the
// outlier_detection policy itself, so here we just act as a pass-through.
absl::optional<Json::Object> outlier_detection_lb_config;
bool operator==(const DiscoveryMechanism& other) const {
@ -113,12 +118,21 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
dns_hostname == other.dns_hostname &&
outlier_detection_lb_config == other.outlier_detection_lb_config);
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args,
ErrorList* errors);
};
XdsClusterResolverLbConfig(
std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
: discovery_mechanisms_(std::move(discovery_mechanisms)),
xds_lb_policy_(std::move(xds_lb_policy)) {}
XdsClusterResolverLbConfig() = default;
XdsClusterResolverLbConfig(const XdsClusterResolverLbConfig&) = delete;
XdsClusterResolverLbConfig& operator=(const XdsClusterResolverLbConfig&) =
delete;
XdsClusterResolverLbConfig(XdsClusterResolverLbConfig&& other) = delete;
XdsClusterResolverLbConfig& operator=(XdsClusterResolverLbConfig&& other) =
delete;
absl::string_view name() const override { return kXdsClusterResolver; }
@ -128,9 +142,12 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
const Json& xds_lb_policy() const { return xds_lb_policy_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args, ErrorList* errors);
private:
std::vector<DiscoveryMechanism> discovery_mechanisms_;
Json xds_lb_policy_;
Json xds_lb_policy_ = Json::Object{{"ROUND_ROBIN", Json::Object()}};
};
// Xds Cluster Resolver LB policy.
@ -1059,6 +1076,133 @@ XdsClusterResolverLb::CreateChildPolicyLocked(const ChannelArgs& args) {
// factory
//
const JsonLoaderInterface*
XdsClusterResolverLbConfig::DiscoveryMechanism::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<DiscoveryMechanism>()
// Note: Several fields requires custom processing,
// so they are handled in JsonPostLoad() instead.
.Field("clusterName", &DiscoveryMechanism::cluster_name)
// FIXME: merge in the other PR
//.OptionalField("lrsLoadReportingServer",
// &DiscoveryMechanism::lrs_load_reporting_server)
.OptionalField("max_concurrent_requests",
&DiscoveryMechanism::max_concurrent_requests)
.OptionalField("outlierDetection",
&DiscoveryMechanism::outlier_detection_lb_config,
"outlier_detection")
.Finish();
return loader;
}
void XdsClusterResolverLbConfig::DiscoveryMechanism::JsonPostLoad(
const Json& json, const JsonArgs& args, ErrorList* errors) {
// LRS load reporting server name.
auto it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) {
ScopedField field(errors, ".lrsLoadReportingServer");
if (it->second.type() != Json::Type::OBJECT) {
errors->AddError("is not an object");
} else {
auto xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(it->second);
if (!xds_server.ok()) {
errors->AddError(xds_server.status().ToString());
} else {
lrs_load_reporting_server.emplace(std::move(*xds_server));
}
}
}
// Parse "type".
{
auto type_field = LoadJsonObjectField<std::string>(json.object_value(),
args, "type", errors);
if (type_field.has_value()) {
if (*type_field == "EDS") {
type = DiscoveryMechanismType::EDS;
} else if (*type_field == "LOGICAL_DNS") {
type = DiscoveryMechanismType::LOGICAL_DNS;
} else {
ScopedField field(errors, ".type");
errors->AddError(absl::StrCat("unknown type \"", *type_field, "\""));
}
}
}
// Parse "edsServiceName" if type is EDS.
if (type == DiscoveryMechanismType::EDS) {
auto value = LoadJsonObjectField<std::string>(json.object_value(), args,
"edsServiceName", errors,
/*required=*/false);
if (value.has_value()) eds_service_name = std::move(*value);
}
// Parse "dnsHostname" if type is LOGICAL_DNS.
if (type == DiscoveryMechanismType::LOGICAL_DNS) {
auto value = LoadJsonObjectField<std::string>(json.object_value(), args,
"dnsHostname", errors);
if (value.has_value()) dns_hostname = std::move(*value);
}
}
const JsonLoaderInterface* XdsClusterResolverLbConfig::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<XdsClusterResolverLbConfig>()
// Note: The "xdsLbPolicy" field requires custom processing,
// so it's handled in JsonPostLoad() instead.
.Field("discoveryMechanisms",
&XdsClusterResolverLbConfig::discovery_mechanisms_)
.Finish();
return loader;
}
void XdsClusterResolverLbConfig::JsonPostLoad(const Json& json,
const JsonArgs& args,
ErrorList* errors) {
// Validate discoveryMechanisms.
{
ScopedField field(errors, ".discoveryMechanisms");
if (!errors->FieldHasErrors() && discovery_mechanisms_.empty()) {
errors->AddError("must be non-empty");
}
}
// Parse "xdsLbPolicy".
{
ScopedField field(errors, ".xdsLbPolicy");
auto it = json.object_value().find("xdsLbPolicy");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::ARRAY) {
errors->AddError("is not an array");
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
ScopedField field(errors, absl::StrCat("[", i, "]"));
if (array[i].type() != Json::Type::OBJECT) {
errors->AddError("is not an object");
continue;
}
const Json::Object& policy = array[i].object_value();
auto policy_it = policy.find("ROUND_ROBIN");
if (policy_it != policy.end()) {
ScopedField field(errors, "[\"ROUND_ROBIN\"]");
if (policy_it->second.type() != Json::Type::OBJECT) {
errors->AddError("is not an object");
}
break;
}
{
ScopedField field(errors, "[\"RING_HASH\"]");
policy_it = policy.find("RING_HASH");
if (policy_it != policy.end()) {
LoadFromJson<RingHashConfig>(policy_it->second, args, errors);
xds_lb_policy_ = array[i];
}
}
}
}
}
}
}
class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
@ -1087,200 +1231,19 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
"requires configuration. "
"Please use loadBalancingConfig field of service config instead.");
}
std::vector<grpc_error_handle> error_list;
std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
discovery_mechanisms;
auto it = json.object_value().find("discoveryMechanisms");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:discoveryMechanisms error:required field missing"));
} else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:discoveryMechanisms error:type should be array"));
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism;
std::vector<grpc_error_handle> discovery_mechanism_errors =
ParseDiscoveryMechanism(array[i], &discovery_mechanism);
if (!discovery_mechanism_errors.empty()) {
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("field:discovery_mechanism element: ", i, " error"));
for (const grpc_error_handle& discovery_mechanism_error :
discovery_mechanism_errors) {
error = grpc_error_add_child(error, discovery_mechanism_error);
}
error_list.push_back(error);
}
discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
class XdsJsonArgs : public JsonArgs {
public:
bool IsEnabled(absl::string_view key) const override {
if (key == "outlier_detection") return XdsOutlierDetectionEnabled();
return true;
}
}
if (discovery_mechanisms.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:discovery_mechanism error:list is missing or empty"));
}
Json xds_lb_policy = Json::Object{
{"ROUND_ROBIN", Json::Object()},
};
it = json.object_value().find("xdsLbPolicy");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:xdsLbPolicy error:type should be array"));
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
if (array[i].type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:xdsLbPolicy error:element should be of type object"));
continue;
}
const Json::Object& policy = array[i].object_value();
auto policy_it = policy.find("ROUND_ROBIN");
if (policy_it != policy.end()) {
if (policy_it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:ROUND_ROBIN error:type should be object"));
}
break;
}
policy_it = policy.find("RING_HASH");
if (policy_it != policy.end()) {
xds_lb_policy = array[i];
auto config = ParseRingHashLbConfig(policy_it->second);
if (!config.ok()) {
error_list.emplace_back(
absl_status_to_grpc_error(config.status()));
}
}
}
}
}
// Construct config.
if (error_list.empty()) {
return MakeRefCounted<XdsClusterResolverLbConfig>(
std::move(discovery_mechanisms), std::move(xds_lb_policy));
} else {
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_VECTOR(
"xds_cluster_resolver_experimental LB policy config", &error_list);
absl::Status status = grpc_error_to_absl_status(error);
GRPC_ERROR_UNREF(error);
return status;
}
return LoadRefCountedFromJson<XdsClusterResolverLbConfig>(
json, XdsJsonArgs(),
"errors validating xds_cluster_resolver LB policy config");
}
private:
static std::vector<grpc_error_handle> ParseDiscoveryMechanism(
const Json& json,
XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) {
std::vector<grpc_error_handle> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
// Cluster name.
auto it = json.object_value().find("clusterName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:clusterName error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:clusterName error:type should be string"));
} else {
discovery_mechanism->cluster_name = it->second.string_value();
}
// LRS load reporting server name.
it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServer error:type should be object"));
} else {
auto xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(it->second);
if (!xds_server.ok()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("error parsing lrs_load_reporting_server: ",
xds_server.status().ToString())));
} else {
discovery_mechanism->lrs_load_reporting_server.emplace(
std::move(*xds_server));
}
}
}
// Max concurrent requests.
discovery_mechanism->max_concurrent_requests = 1024;
it = json.object_value().find("max_concurrent_requests");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::NUMBER) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_concurrent_requests error:must be of type number"));
} else {
discovery_mechanism->max_concurrent_requests =
gpr_parse_nonnegative_int(it->second.string_value().c_str());
}
}
if (XdsOutlierDetectionEnabled()) {
it = json.object_value().find("outlierDetection");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:outlierDetection error:type should be object"));
} else {
// No need to validate the contents of the outlier detection config,
// because in this particular case, the JSON is generated by the CDS
// policy instead of coming from service config, so it's not actually
// any better to catch the problem here than it is to catch it in the
// outlier_detection policy itself, so here we just act as a
// pass-through.
discovery_mechanism->outlier_detection_lb_config =
it->second.object_value();
}
}
}
// Discovery Mechanism type
it = json.object_value().find("type");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:type error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:type error:type should be string"));
} else {
if (it->second.string_value() == "EDS") {
discovery_mechanism->type = XdsClusterResolverLbConfig::
DiscoveryMechanism::DiscoveryMechanismType::EDS;
it = json.object_value().find("edsServiceName");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:edsServiceName error:type should be string"));
} else {
discovery_mechanism->eds_service_name = it->second.string_value();
}
}
} else if (it->second.string_value() == "LOGICAL_DNS") {
discovery_mechanism->type = XdsClusterResolverLbConfig::
DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS;
it = json.object_value().find("dnsHostname");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:dnsHostname error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:dnsHostname error:type should be string"));
} else {
discovery_mechanism->dns_hostname = it->second.string_value();
}
} else {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:type error:invalid type"));
}
}
return error_list;
}
class XdsClusterResolverChildHandler : public ChildPolicyHandler {
public:
XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,

@ -44,6 +44,10 @@ bool ErrorList::FieldHasErrors() const {
}
absl::Status ErrorList::status() const {
return status("errors validating JSON");
}
absl::Status ErrorList::status(absl::string_view prefix) const {
if (field_errors_.empty()) return absl::OkStatus();
std::vector<std::string> errors;
for (const auto& p : field_errors_) {
@ -55,8 +59,8 @@ absl::Status ErrorList::status() const {
absl::StrCat("field:", p.first, " error:", p.second[0]));
}
}
return absl::InvalidArgumentError(absl::StrCat(
"errors validating JSON: [", absl::StrJoin(errors, "; "), "]"));
return absl::InvalidArgumentError(
absl::StrCat(prefix, ": [", absl::StrJoin(errors, "; "), "]"));
}
namespace json_detail {
@ -191,7 +195,9 @@ void LoadOptional::LoadInto(const Json& json, const JsonArgs& args, void* dst,
ErrorList* errors) const {
if (json.type() == Json::Type::JSON_NULL) return;
void* element = Emplace(dst);
size_t starting_error_size = errors->size();
ElementLoader()->LoadInto(json, args, element, errors);
if (errors->size() > starting_error_size) Reset(dst);
}
bool LoadObject(const Json& json, const JsonArgs& args, const Element* elements,

@ -32,6 +32,7 @@
#include "absl/types/optional.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
@ -85,6 +86,7 @@ class ErrorList {
// Returns the resulting status of parsing.
absl::Status status() const;
absl::Status status(absl::string_view prefix) const;
// Return true if there are no errors.
bool ok() const { return field_errors_.empty(); }
@ -292,6 +294,7 @@ class LoadOptional : public LoaderInterface {
private:
virtual void* Emplace(void* dst) const = 0;
virtual void Reset(void* dst) const = 0;
virtual const LoaderInterface* ElementLoader() const = 0;
};
@ -420,6 +423,9 @@ class AutoLoader<absl::optional<T>> final : public LoadOptional {
void* Emplace(void* dst) const final {
return &static_cast<absl::optional<T>*>(dst)->emplace();
}
void Reset(void* dst) const final {
static_cast<absl::optional<T>*>(dst)->reset();
}
const LoaderInterface* ElementLoader() const final {
return LoaderForType<T>();
}
@ -584,12 +590,25 @@ using JsonObjectLoader = json_detail::JsonObjectLoader<T>;
using JsonLoaderInterface = json_detail::LoaderInterface;
template <typename T>
absl::StatusOr<T> LoadFromJson(const Json& json,
const JsonArgs& args = JsonArgs()) {
absl::StatusOr<T> LoadFromJson(
const Json& json, const JsonArgs& args = JsonArgs(),
absl::string_view error_prefix = "errors validating JSON") {
ErrorList error_list;
T result{};
json_detail::LoaderForType<T>()->LoadInto(json, args, &result, &error_list);
if (!error_list.ok()) return error_list.status();
if (!error_list.ok()) return error_list.status(error_prefix);
return std::move(result);
}
template <typename T>
absl::StatusOr<RefCountedPtr<T>> LoadRefCountedFromJson(
const Json& json, const JsonArgs& args = JsonArgs(),
absl::string_view error_prefix = "errors validating JSON") {
ErrorList error_list;
auto result = MakeRefCounted<T>();
json_detail::LoaderForType<T>()->LoadInto(json, args, result.get(),
&error_list);
if (!error_list.ok()) return error_list.status(error_prefix);
return std::move(result);
}

@ -23,9 +23,6 @@
#include "src/core/lib/service_config/service_config_impl.h"
#include "test/core/util/test_config.h"
// A regular expression to enter referenced or child errors.
#define CHILD_ERROR_TAG ".*children.*"
namespace grpc_core {
namespace {
@ -85,12 +82,12 @@ TEST_F(RlsConfigParsingTest, TopLevelRequiredFieldsMissing) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig error:does not exist.*"
"field:childPolicyConfigTargetFieldName error:does not exist.*"
"field:childPolicy error:does not exist"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig error:field not present]"))
<< service_config.status();
}
@ -110,13 +107,14 @@ TEST_F(RlsConfigParsingTest, TopLevelFieldsWrongTypes) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig error:type should be OBJECT.*"
"field:routeLookupChannelServiceConfig error:type should be OBJECT.*"
"field:childPolicyConfigTargetFieldName error:type should be STRING.*"
"field:childPolicy error:type should be ARRAY"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:is not an array; "
"field:childPolicyConfigTargetFieldName error:is not a string; "
"field:routeLookupChannelServiceConfig error:"
"INVALID_ARGUMENT:JSON value is not an object; "
"field:routeLookupConfig error:is not an object]"))
<< service_config.status();
}
@ -136,12 +134,12 @@ TEST_F(RlsConfigParsingTest, TopLevelFieldsInvalidValues) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:childPolicyConfigTargetFieldName error:must be non-empty.*"
"field:childPolicy" CHILD_ERROR_TAG
"No known policies in list: unknown"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:No known policies in list: unknown; "
"field:childPolicyConfigTargetFieldName error:must be non-empty; "
"field:routeLookupConfig error:field not present]"))
<< service_config.status();
}
@ -160,12 +158,13 @@ TEST_F(RlsConfigParsingTest, InvalidChildPolicyConfig) {
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:childPolicy" CHILD_ERROR_TAG
"errors parsing grpclb LB policy config: \\["
"error parsing childPolicy field: type should be array\\]"))
EXPECT_THAT(
service_config.status().message(),
::testing::HasSubstr("errors validing RLS LB policy config: ["
"field:childPolicy error:"
"errors validating grpclb LB policy config: ["
"field:childPolicy error:type should be array]; "
"field:routeLookupConfig error:field not present]"))
<< service_config.status();
}
@ -190,11 +189,14 @@ TEST_F(RlsConfigParsingTest, InvalidRlsChannelServiceConfig) {
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupChannelServiceConfig" CHILD_ERROR_TAG
"Service config parsing errors: \\["
"error parsing client channel global parameters" CHILD_ERROR_TAG
"field:loadBalancingPolicy error:Unknown lb policy"))
"errors validing RLS LB policy config: \\["
"field:routeLookupChannelServiceConfig error:"
"INVALID_ARGUMENT:Service config parsing errors: \\["
"error parsing client channel global parameters: "
"UNKNOWN:Client channel global parser"
".*children:.*"
"\\[UNKNOWN:field:loadBalancingPolicy error:Unknown lb policy.*"
"field:routeLookupConfig error:field not present\\]"))
<< service_config.status();
}
@ -215,12 +217,15 @@ TEST_F(RlsConfigParsingTest, RouteLookupConfigRequiredFieldsMissing) {
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders error:does not exist.*"
"field:lookupService error:does not exist"))
EXPECT_THAT(
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders error:field not present; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -245,16 +250,20 @@ TEST_F(RlsConfigParsingTest, RouteLookupConfigFieldsWrongTypes) {
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders error:type should be ARRAY.*"
"field:lookupService error:type should be STRING.*"
"field:maxAge error:type should be STRING.*"
"field:staleAge error:type should be STRING.*"
"field:cacheSizeBytes error:failed to parse.*"
"field:defaultTarget error:type should be STRING"))
EXPECT_THAT(
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:"
"failed to parse number; "
"field:routeLookupConfig.defaultTarget error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders error:is not an array; "
"field:routeLookupConfig.lookupService error:is not a string; "
"field:routeLookupConfig.lookupServiceTimeout error:is not a string; "
"field:routeLookupConfig.maxAge error:is not a string; "
"field:routeLookupConfig.staleAge error:is not a string]"))
<< service_config.status();
}
@ -273,12 +282,17 @@ TEST_F(RlsConfigParsingTest, RouteLookupConfigFieldsInvalidValues) {
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:lookupService error:must be valid gRPC target URI.*"
"field:cacheSizeBytes error:must be greater than 0"))
EXPECT_THAT(
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:"
"must be greater than 0; "
"field:routeLookupConfig.grpcKeybuilders error:field not present; "
"field:routeLookupConfig.lookupService error:"
"must be valid gRPC target URI]"))
<< service_config.status();
}
@ -303,12 +317,16 @@ TEST_F(RlsConfigParsingTest, GrpcKeybuilderRequiredFieldsMissing) {
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG
"index:0" CHILD_ERROR_TAG "field:names error:does not exist"))
EXPECT_THAT(
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0].names error:"
"field not present; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -334,15 +352,21 @@ TEST_F(RlsConfigParsingTest, GrpcKeybuilderWrongFieldTypes) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
"field:names error:type should be ARRAY.*"
"field:headers error:type should be ARRAY.*"
"field:extraKeys error:type should be OBJECT.*"
"field:constantKeys error:type should be OBJECT"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0].constantKeys error:"
"is not an object; "
"field:routeLookupConfig.grpcKeybuilders[0].extraKeys error:"
"is not an object; "
"field:routeLookupConfig.grpcKeybuilders[0].headers error:"
"is not an array; "
"field:routeLookupConfig.grpcKeybuilders[0].names error:"
"is not an array; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -372,18 +396,24 @@ TEST_F(RlsConfigParsingTest, GrpcKeybuilderInvalidValues) {
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG
"index:0" CHILD_ERROR_TAG "field:names error:list is empty.*"
"field:extraKeys" CHILD_ERROR_TAG
"field:host error:type should be STRING.*"
"field:service error:type should be STRING.*"
"field:method error:type should be STRING.*"
"field:constantKeys" CHILD_ERROR_TAG
"field:key error:type should be STRING"))
EXPECT_THAT(
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0].constantKeys[\"key\"] "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].extraKeys.host "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].extraKeys.method "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].extraKeys.service "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].names "
"error:must be non-empty; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -425,25 +455,35 @@ TEST_F(RlsConfigParsingTest, GrpcKeybuilderInvalidHeaders) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
"field:headers index:0 error:type should be OBJECT.*"
"field:headers index:1" CHILD_ERROR_TAG
"field:key error:type should be STRING.*"
"field:names error:type should be ARRAY.*"
"field:headers index:2" CHILD_ERROR_TAG
"field:key error:does not exist.*"
"field:names error:list is empty.*"
"field:headers index:3" CHILD_ERROR_TAG
"field:key error:must be non-empty.*"
"field:names index:0 error:type should be STRING.*"
"field:names index:1 error:header name must be non-empty.*"
"field:extraKeys" CHILD_ERROR_TAG
"field:host error:must be non-empty.*"
"field:constantKeys" CHILD_ERROR_TAG "error:keys must be non-empty"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0].constantKeys[\"\"] "
"error:key must be non-empty; "
"field:routeLookupConfig.grpcKeybuilders[0].extraKeys.host "
"error:must be non-empty if set; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[0] "
"error:is not an object; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[1].key "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[1].names "
"error:is not an array; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[2].key "
"error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[2].names "
"error:must be non-empty; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[3].key "
"error:must be non-empty; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[3].names[0] "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].headers[3].names[1] "
"error:must be non-empty; "
"field:routeLookupConfig.grpcKeybuilders[0].names "
"error:field not present; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -472,15 +512,19 @@ TEST_F(RlsConfigParsingTest, GrpcKeybuilderNameWrongFieldTypes) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
"field:names index:0 error:type should be OBJECT.*"
"field:names index:1" CHILD_ERROR_TAG
"field:service error:type should be STRING.*"
"field:method error:type should be STRING"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0].names[0] "
"error:is not an object; "
"field:routeLookupConfig.grpcKeybuilders[0].names[1].method "
"error:is not a string; "
"field:routeLookupConfig.grpcKeybuilders[0].names[1].service "
"error:is not a string; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -512,12 +556,15 @@ TEST_F(RlsConfigParsingTest, DuplicateMethodNamesInSameKeyBuilder) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG "index:0" CHILD_ERROR_TAG
"field:names error:duplicate entry for /foo/bar"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[0] "
"error:duplicate entry for \"/foo/bar\"; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}
@ -553,12 +600,15 @@ TEST_F(RlsConfigParsingTest, DuplicateMethodNamesInDifferentKeyBuilders) {
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
EXPECT_EQ(service_config.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_THAT(
std::string(service_config.status().message()),
::testing::ContainsRegex(
"errors parsing RLS LB policy config" CHILD_ERROR_TAG
"field:routeLookupConfig" CHILD_ERROR_TAG
"field:grpcKeybuilders" CHILD_ERROR_TAG "index:1" CHILD_ERROR_TAG
"field:names error:duplicate entry for /foo/bar"))
service_config.status().message(),
::testing::HasSubstr(
"errors validing RLS LB policy config: ["
"field:childPolicy error:field not present; "
"field:childPolicyConfigTargetFieldName error:field not present; "
"field:routeLookupConfig.cacheSizeBytes error:field not present; "
"field:routeLookupConfig.grpcKeybuilders[1] "
"error:duplicate entry for \"/foo/bar\"; "
"field:routeLookupConfig.lookupService error:field not present]"))
<< service_config.status();
}

@ -563,8 +563,8 @@ TEST_F(ClientChannelParserTest, InvalidGrpclbLoadBalancingConfig) {
"Service config parsing errors: \\["
"error parsing client channel global parameters:" CHILD_ERROR_TAG
"field:loadBalancingConfig error:"
"errors parsing grpclb LB policy config: \\["
"error parsing childPolicy field: type should be array\\].*"));
"errors validating grpclb LB policy config: \\["
"field:childPolicy error:type should be array\\].*"));
}
TEST_F(ClientChannelParserTest, ValidLoadBalancingPolicy) {

@ -21,6 +21,9 @@
#include "absl/strings/str_join.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
namespace grpc_core {
namespace {
@ -890,6 +893,40 @@ TEST(JsonObjectLoader, CustomValidationInPostLoadHook) {
<< test_struct.status();
}
TEST(JsonObjectLoader, LoadRefCountedFromJson) {
struct TestStruct : public RefCounted<TestStruct> {
int32_t a = 0;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<TestStruct>().Field("a", &TestStruct::a).Finish();
return loader;
}
};
// Valid.
{
absl::string_view json_str = "{\"a\":1}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
absl::StatusOr<RefCountedPtr<TestStruct>> test_struct =
LoadRefCountedFromJson<TestStruct>(*json, JsonArgs());
ASSERT_TRUE(test_struct.ok()) << test_struct.status();
EXPECT_EQ((*test_struct)->a, 1);
}
// Invalid.
{
absl::string_view json_str = "{\"a\":\"foo\"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
absl::StatusOr<RefCountedPtr<TestStruct>> test_struct =
LoadRefCountedFromJson<TestStruct>(*json, JsonArgs());
EXPECT_EQ(test_struct.status().code(), absl::StatusCode::kInvalidArgument);
EXPECT_EQ(test_struct.status().message(),
"errors validating JSON: [field:a error:failed to parse number]")
<< test_struct.status();
}
}
TEST(JsonObjectLoader, LoadFromJsonWithErrorList) {
struct TestStruct {
int32_t a = 0;

@ -301,7 +301,9 @@ TEST_P(RlsTest, XdsRoutingRlsClusterSpecifierPluginNacksRequiredMatch) {
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT(
response_state->error_message,
::testing::HasSubstr("field:requiredMatch error:must not be present"));
::testing::HasSubstr(
"field:routeLookupConfig.grpcKeybuilders[0].headers[0].requiredMatch "
"error:must not be present"));
}
TEST_P(RlsTest, XdsRoutingClusterSpecifierPluginDisabled) {

Loading…
Cancel
Save