|
|
|
@ -32,6 +32,7 @@ |
|
|
|
|
#include "absl/status/status.h" |
|
|
|
|
#include "absl/status/statusor.h" |
|
|
|
|
#include "absl/strings/str_cat.h" |
|
|
|
|
#include "absl/strings/str_join.h" |
|
|
|
|
#include "absl/strings/string_view.h" |
|
|
|
|
#include "absl/types/optional.h" |
|
|
|
|
#include "absl/types/variant.h" |
|
|
|
@ -137,7 +138,7 @@ CircuitBreakerCallCounterMap::CallCounter::~CallCounter() { |
|
|
|
|
// LB policy
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental"; |
|
|
|
|
constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental"; |
|
|
|
|
|
|
|
|
|
// Config for xDS Cluster Impl LB policy.
|
|
|
|
|
class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
@ -155,7 +156,7 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
max_concurrent_requests_(max_concurrent_requests), |
|
|
|
|
drop_config_(std::move(drop_config)) {} |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kXdsClusterImpl; } |
|
|
|
|
absl::string_view name() const override { return kXdsClusterImpl; } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const { |
|
|
|
|
return child_policy_; |
|
|
|
@ -185,7 +186,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy { |
|
|
|
|
public: |
|
|
|
|
XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args); |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kXdsClusterImpl; } |
|
|
|
|
absl::string_view name() const override { return kXdsClusterImpl; } |
|
|
|
|
|
|
|
|
|
void UpdateLocked(UpdateArgs args) override; |
|
|
|
|
void ExitIdleLocked() override; |
|
|
|
@ -703,48 +704,41 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
std::move(args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kXdsClusterImpl; } |
|
|
|
|
absl::string_view name() const override { return kXdsClusterImpl; } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
|
|
|
|
const Json& json, grpc_error_handle* error) const override { |
|
|
|
|
GPR_DEBUG_ASSERT(error != nullptr && GRPC_ERROR_IS_NONE(*error)); |
|
|
|
|
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>> |
|
|
|
|
ParseLoadBalancingConfig(const Json& json) const override { |
|
|
|
|
if (json.type() == Json::Type::JSON_NULL) { |
|
|
|
|
// This policy was configured in the deprecated loadBalancingPolicy
|
|
|
|
|
// field or in the client API.
|
|
|
|
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
return absl::InvalidArgumentError( |
|
|
|
|
"field:loadBalancingPolicy error:xds_cluster_impl policy requires " |
|
|
|
|
"configuration. Please use loadBalancingConfig field of service " |
|
|
|
|
"config instead."); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
std::vector<grpc_error_handle> error_list; |
|
|
|
|
std::vector<std::string> errors; |
|
|
|
|
// Child policy.
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> child_policy; |
|
|
|
|
auto it = json.object_value().find("childPolicy"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:childPolicy error:required field missing")); |
|
|
|
|
errors.emplace_back("field:childPolicy error:required field missing"); |
|
|
|
|
} else { |
|
|
|
|
grpc_error_handle parse_error = GRPC_ERROR_NONE; |
|
|
|
|
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
|
|
|
|
it->second, &parse_error); |
|
|
|
|
if (child_policy == nullptr) { |
|
|
|
|
GPR_DEBUG_ASSERT(!GRPC_ERROR_IS_NONE(parse_error)); |
|
|
|
|
std::vector<grpc_error_handle> child_errors; |
|
|
|
|
child_errors.push_back(parse_error); |
|
|
|
|
error_list.push_back( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |
|
|
|
|
auto config = |
|
|
|
|
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second); |
|
|
|
|
if (!config.ok()) { |
|
|
|
|
errors.emplace_back(absl::StrCat("field:childPolicy error:", |
|
|
|
|
config.status().message())); |
|
|
|
|
} else { |
|
|
|
|
child_policy = std::move(*config); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Cluster name.
|
|
|
|
|
std::string cluster_name; |
|
|
|
|
it = json.object_value().find("clusterName"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:clusterName error:required field missing")); |
|
|
|
|
errors.emplace_back("field:clusterName error:required field missing"); |
|
|
|
|
} else if (it->second.type() != Json::Type::STRING) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:clusterName error:type should be string")); |
|
|
|
|
errors.emplace_back("field:clusterName error:type should be string"); |
|
|
|
|
} else { |
|
|
|
|
cluster_name = it->second.string_value(); |
|
|
|
|
} |
|
|
|
@ -753,8 +747,7 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
it = json.object_value().find("edsServiceName"); |
|
|
|
|
if (it != json.object_value().end()) { |
|
|
|
|
if (it->second.type() != Json::Type::STRING) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:edsServiceName error:type should be string")); |
|
|
|
|
errors.emplace_back("field:edsServiceName error:type should be string"); |
|
|
|
|
} else { |
|
|
|
|
eds_service_name = it->second.string_value(); |
|
|
|
|
} |
|
|
|
@ -764,16 +757,17 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
it = json.object_value().find("lrsLoadReportingServer"); |
|
|
|
|
if (it != json.object_value().end()) { |
|
|
|
|
if (it->second.type() != Json::Type::OBJECT) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:lrsLoadReportingServer error:type should be object")); |
|
|
|
|
errors.emplace_back( |
|
|
|
|
"field:lrsLoadReportingServer error:type should be object"); |
|
|
|
|
} else { |
|
|
|
|
grpc_error_handle parser_error; |
|
|
|
|
lrs_load_reporting_server = XdsBootstrap::XdsServer::Parse( |
|
|
|
|
it->second.object_value(), &parser_error); |
|
|
|
|
if (!GRPC_ERROR_IS_NONE(parser_error)) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING( |
|
|
|
|
absl::StrCat("errors parsing lrs_load_reporting_server"))); |
|
|
|
|
error_list.push_back(parser_error); |
|
|
|
|
errors.emplace_back( |
|
|
|
|
absl::StrCat("error parsing lrs_load_reporting_server: ", |
|
|
|
|
grpc_error_std_string(parser_error))); |
|
|
|
|
GRPC_ERROR_UNREF(parser_error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -782,8 +776,8 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
it = json.object_value().find("maxConcurrentRequests"); |
|
|
|
|
if (it != json.object_value().end()) { |
|
|
|
|
if (it->second.type() != Json::Type::NUMBER) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:max_concurrent_requests error:must be of type number")); |
|
|
|
|
errors.emplace_back( |
|
|
|
|
"field:max_concurrent_requests error:must be of type number"); |
|
|
|
|
} else { |
|
|
|
|
max_concurrent_requests = |
|
|
|
|
gpr_parse_nonnegative_int(it->second.string_value().c_str()); |
|
|
|
@ -793,20 +787,15 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
auto drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>(); |
|
|
|
|
it = json.object_value().find("dropCategories"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"field:dropCategories error:required field missing")); |
|
|
|
|
errors.emplace_back("field:dropCategories error:required field missing"); |
|
|
|
|
} else { |
|
|
|
|
std::vector<grpc_error_handle> child_errors = |
|
|
|
|
ParseDropCategories(it->second, drop_config.get()); |
|
|
|
|
if (!child_errors.empty()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR( |
|
|
|
|
"field:dropCategories", &child_errors)); |
|
|
|
|
} |
|
|
|
|
absl::Status status = ParseDropCategories(it->second, drop_config.get()); |
|
|
|
|
if (!status.ok()) errors.emplace_back(status.message()); |
|
|
|
|
} |
|
|
|
|
if (!error_list.empty()) { |
|
|
|
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
|
|
|
|
"xds_cluster_impl_experimental LB policy config", &error_list); |
|
|
|
|
return nullptr; |
|
|
|
|
if (!errors.empty()) { |
|
|
|
|
return absl::InvalidArgumentError(absl::StrCat( |
|
|
|
|
"errors parseing xds_cluster_impl_experimental LB policy config: [", |
|
|
|
|
absl::StrJoin(errors, "; "), "]")); |
|
|
|
|
} |
|
|
|
|
return MakeRefCounted<XdsClusterImplLbConfig>( |
|
|
|
|
std::move(child_policy), std::move(cluster_name), |
|
|
|
@ -815,65 +804,59 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static std::vector<grpc_error_handle> ParseDropCategories( |
|
|
|
|
static absl::Status ParseDropCategories( |
|
|
|
|
const Json& json, XdsEndpointResource::DropConfig* drop_config) { |
|
|
|
|
std::vector<grpc_error_handle> error_list; |
|
|
|
|
if (json.type() != Json::Type::ARRAY) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"dropCategories field is not an array")); |
|
|
|
|
return error_list; |
|
|
|
|
return absl::InvalidArgumentError("dropCategories field is not an array"); |
|
|
|
|
} |
|
|
|
|
std::vector<std::string> errors; |
|
|
|
|
for (size_t i = 0; i < json.array_value().size(); ++i) { |
|
|
|
|
const Json& entry = json.array_value()[i]; |
|
|
|
|
std::vector<grpc_error_handle> child_errors = |
|
|
|
|
ParseDropCategory(entry, drop_config); |
|
|
|
|
if (!child_errors.empty()) { |
|
|
|
|
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING( |
|
|
|
|
absl::StrCat("errors parsing index ", i)); |
|
|
|
|
for (size_t i = 0; i < child_errors.size(); ++i) { |
|
|
|
|
error = grpc_error_add_child(error, child_errors[i]); |
|
|
|
|
} |
|
|
|
|
error_list.push_back(error); |
|
|
|
|
absl::Status status = ParseDropCategory(entry, drop_config); |
|
|
|
|
if (!status.ok()) { |
|
|
|
|
errors.emplace_back( |
|
|
|
|
absl::StrCat("error parsing index ", i, ": ", status.message())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return error_list; |
|
|
|
|
if (!errors.empty()) { |
|
|
|
|
return absl::InvalidArgumentError( |
|
|
|
|
absl::StrCat("errors parsing dropCategories field: [", |
|
|
|
|
absl::StrJoin(errors, "; "), "]")); |
|
|
|
|
} |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static std::vector<grpc_error_handle> ParseDropCategory( |
|
|
|
|
static absl::Status ParseDropCategory( |
|
|
|
|
const Json& json, XdsEndpointResource::DropConfig* drop_config) { |
|
|
|
|
std::vector<grpc_error_handle> error_list; |
|
|
|
|
if (json.type() != Json::Type::OBJECT) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"dropCategories entry is not an object")); |
|
|
|
|
return error_list; |
|
|
|
|
return absl::InvalidArgumentError( |
|
|
|
|
"dropCategories entry is not an object"); |
|
|
|
|
} |
|
|
|
|
std::vector<std::string> errors; |
|
|
|
|
std::string category; |
|
|
|
|
auto it = json.object_value().find("category"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"\"category\" field not present")); |
|
|
|
|
errors.emplace_back("\"category\" field not present"); |
|
|
|
|
} else if (it->second.type() != Json::Type::STRING) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"\"category\" field is not a string")); |
|
|
|
|
errors.emplace_back("\"category\" field is not a string"); |
|
|
|
|
} else { |
|
|
|
|
category = it->second.string_value(); |
|
|
|
|
} |
|
|
|
|
uint32_t requests_per_million = 0; |
|
|
|
|
it = json.object_value().find("requests_per_million"); |
|
|
|
|
if (it == json.object_value().end()) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"\"requests_per_million\" field is not present")); |
|
|
|
|
errors.emplace_back("\"requests_per_million\" field is not present"); |
|
|
|
|
} else if (it->second.type() != Json::Type::NUMBER) { |
|
|
|
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"\"requests_per_million\" field is not a number")); |
|
|
|
|
errors.emplace_back("\"requests_per_million\" field is not a number"); |
|
|
|
|
} else { |
|
|
|
|
requests_per_million = |
|
|
|
|
gpr_parse_nonnegative_int(it->second.string_value().c_str()); |
|
|
|
|
} |
|
|
|
|
if (error_list.empty()) { |
|
|
|
|
drop_config->AddCategory(std::move(category), requests_per_million); |
|
|
|
|
if (!errors.empty()) { |
|
|
|
|
return absl::InvalidArgumentError(absl::StrJoin(errors, "; ")); |
|
|
|
|
} |
|
|
|
|
return error_list; |
|
|
|
|
drop_config->AddCategory(std::move(category), requests_per_million); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|