Parse xDS config

pull/17693/head
Juanli Shen 6 years ago
parent 9ed8734efb
commit 0c2fc6101d
  1. 27
      src/core/ext/filters/client_channel/lb_policy.cc
  2. 4
      src/core/ext/filters/client_channel/lb_policy.h
  3. 81
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  4. 42
      src/core/ext/filters/client_channel/resolver_result_parsing.cc

@ -20,6 +20,7 @@
#include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
@ -27,6 +28,32 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
namespace grpc_core { namespace grpc_core {
grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig(
const grpc_json* lb_config_array) {
if (lb_config_array == nullptr || lb_config_array->type != GRPC_JSON_ARRAY) {
return nullptr;
}
// Find the first LB policy that this client supports.
for (const grpc_json* lb_config = lb_config_array->child;
lb_config != nullptr; lb_config = lb_config->next) {
if (lb_config->type != GRPC_JSON_OBJECT) return nullptr;
grpc_json* policy = nullptr;
for (grpc_json* field = lb_config->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || field->type != GRPC_JSON_OBJECT)
return nullptr;
if (policy != nullptr) return nullptr; // Violate "oneof" type.
policy = field;
}
if (policy == nullptr) return nullptr;
// If we support this policy, then select it.
if (LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(policy->key)) {
return policy;
}
}
return nullptr;
}
LoadBalancingPolicy::LoadBalancingPolicy(Args args) LoadBalancingPolicy::LoadBalancingPolicy(Args args)
: InternallyRefCounted(&grpc_trace_lb_policy_refcount), : InternallyRefCounted(&grpc_trace_lb_policy_refcount),
combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")), combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),

@ -179,6 +179,10 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
/// Returns the JSON node of policy (with both policy name and config content)
/// given the JSON node of a LoadBalancingConfig array.
static grpc_json* ParseLoadBalancingConfig(const grpc_json* lb_config_array);
/// Sets the re-resolution closure to \a request_reresolution. /// Sets the re-resolution closure to \a request_reresolution.
void SetReresolutionClosureLocked(grpc_closure* request_reresolution) { void SetReresolutionClosureLocked(grpc_closure* request_reresolution) {
GPR_ASSERT(request_reresolution_ == nullptr); GPR_ASSERT(request_reresolution_ == nullptr);

@ -100,6 +100,7 @@
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/static_metadata.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -247,6 +248,12 @@ class XdsLb : public LoadBalancingPolicy {
// Helper function used in ctor and UpdateLocked(). // Helper function used in ctor and UpdateLocked().
void ProcessChannelArgsLocked(const grpc_channel_args& args); void ProcessChannelArgsLocked(const grpc_channel_args& args);
// Parses the xds config given the JSON node of the first child of XdsConfig.
// If parsing succeeds, updates \a balancer_name, and updates \a
// child_policy_json_dump_ and \a fallback_policy_json_dump_ if they are also
// found. Does nothing upon failure.
void ParseLbConfig(grpc_json* xds_config_json);
// Methods for dealing with the balancer channel and call. // Methods for dealing with the balancer channel and call.
void StartPickingLocked(); void StartPickingLocked();
void StartBalancerCallLocked(); void StartBalancerCallLocked();
@ -265,7 +272,7 @@ class XdsLb : public LoadBalancingPolicy {
// Methods for dealing with the child policy. // Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked(); void CreateOrUpdateChildPolicyLocked();
grpc_channel_args* CreateChildPolicyArgsLocked(); grpc_channel_args* CreateChildPolicyArgsLocked();
void CreateChildPolicyLocked(Args args); void CreateChildPolicyLocked(const char* name, Args args);
bool PickFromChildPolicyLocked(bool force_async, PendingPick* pp, bool PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error); grpc_error** error);
void UpdateConnectivityStateFromChildPolicyLocked( void UpdateConnectivityStateFromChildPolicyLocked(
@ -278,6 +285,9 @@ class XdsLb : public LoadBalancingPolicy {
// Who the client is trying to communicate with. // Who the client is trying to communicate with.
const char* server_name_ = nullptr; const char* server_name_ = nullptr;
// Name of the balancer to connect to.
UniquePtr<char> balancer_name_;
// Current channel args from the resolver. // Current channel args from the resolver.
grpc_channel_args* args_ = nullptr; grpc_channel_args* args_ = nullptr;
@ -318,6 +328,7 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for before using fallback backend addresses. // Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback. // 0 means not using fallback.
UniquePtr<char> fallback_policy_json_string_;
int lb_fallback_timeout_ms_ = 0; int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver. // The backend addresses from the resolver.
UniquePtr<ServerAddressList> fallback_backend_addresses_; UniquePtr<ServerAddressList> fallback_backend_addresses_;
@ -331,6 +342,7 @@ class XdsLb : public LoadBalancingPolicy {
// The policy to use for the backends. // The policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> child_policy_;
UniquePtr<char> child_policy_json_string_;
grpc_connectivity_state child_connectivity_state_; grpc_connectivity_state child_connectivity_state_;
grpc_closure on_child_connectivity_changed_; grpc_closure on_child_connectivity_changed_;
grpc_closure on_child_request_reresolution_; grpc_closure on_child_request_reresolution_;
@ -934,6 +946,8 @@ XdsLb::XdsLb(LoadBalancingPolicy::Args args)
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
// Parse the LB config.
ParseLbConfig(args.lb_config);
// Process channel args. // Process channel args.
ProcessChannelArgsLocked(*args.args); ProcessChannelArgsLocked(*args.args);
} }
@ -1184,8 +1198,44 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args); grpc_channel_args_destroy(lb_channel_args);
} }
// TODO(vishalpowar): Use lb_config to configure LB policy. void XdsLb::ParseLbConfig(grpc_json* xds_config_json) {
const char* balancer_name = nullptr;
grpc_json* child_policy = nullptr;
grpc_json* fallback_policy = nullptr;
for (grpc_json* field = xds_config_json; field != nullptr;
field = field->next) {
if (field->key == nullptr) return;
if (strcmp(field->key, "balancerName") == 0) {
if (balancer_name != nullptr) return; // Duplicate.
if (field->type != GRPC_JSON_STRING) return;
balancer_name = field->value;
} else if (strcmp(field->key, "childPolicy") == 0) {
if (child_policy != nullptr) return; // Duplicate.
child_policy = ParseLoadBalancingConfig(field);
} else if (strcmp(field->key, "fallbackPolicy") == 0) {
if (fallback_policy != nullptr) return; // Duplicate.
fallback_policy = ParseLoadBalancingConfig(field);
}
}
if (balancer_name == nullptr) return; // Required field.
if (child_policy != nullptr) {
child_policy_json_string_ =
UniquePtr<char>(grpc_json_dump_to_string(child_policy, 0 /* indent */));
}
if (fallback_policy != nullptr) {
fallback_policy_json_string_ = UniquePtr<char>(
grpc_json_dump_to_string(fallback_policy, 0 /* indent */));
}
balancer_name_ = UniquePtr<char>(gpr_strdup(balancer_name));
}
void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
ParseLbConfig(lb_config);
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if (balancer_name_ == nullptr) {
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
}
ProcessChannelArgsLocked(args); ProcessChannelArgsLocked(args);
// Update the existing child policy. // Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must // Note: We have disabled fallback mode in the code, so this child policy must
@ -1436,10 +1486,10 @@ bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
return pick_done; return pick_done;
} }
void XdsLb::CreateChildPolicyLocked(Args args) { void XdsLb::CreateChildPolicyLocked(const char* name, Args args) {
GPR_ASSERT(child_policy_ == nullptr); GPR_ASSERT(child_policy_ == nullptr);
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", std::move(args)); name, std::move(args));
if (GPR_UNLIKELY(child_policy_ == nullptr)) { if (GPR_UNLIKELY(child_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this); gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this);
return; return;
@ -1512,26 +1562,43 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return; if (shutting_down_) return;
grpc_channel_args* args = CreateChildPolicyArgsLocked(); grpc_channel_args* args = CreateChildPolicyArgsLocked();
GPR_ASSERT(args != nullptr); GPR_ASSERT(args != nullptr);
const char* child_policy_name = nullptr;
grpc_json* child_policy_config = nullptr;
grpc_json* child_policy_json =
grpc_json_parse_string(child_policy_json_string_.get());
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
if (child_policy_json != nullptr) {
child_policy_name = child_policy_json->key;
child_policy_config = child_policy_json->child;
} else {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] No valid child policy LB config", this);
}
child_policy_name = "round_robin";
}
// TODO(juanlishen): Switch policy according to child_policy_config->key.
if (child_policy_ != nullptr) { if (child_policy_ != nullptr) {
if (grpc_lb_xds_trace.enabled()) { if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this, gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this,
child_policy_.get()); child_policy_.get());
} }
// TODO(vishalpowar): Pass the correct LB config. child_policy_->UpdateLocked(*args, child_policy_config);
child_policy_->UpdateLocked(*args, nullptr);
} else { } else {
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner(); lb_policy_args.combiner = combiner();
lb_policy_args.client_channel_factory = client_channel_factory(); lb_policy_args.client_channel_factory = client_channel_factory();
lb_policy_args.subchannel_pool = subchannel_pool()->Ref(); lb_policy_args.subchannel_pool = subchannel_pool()->Ref();
lb_policy_args.args = args; lb_policy_args.args = args;
CreateChildPolicyLocked(std::move(lb_policy_args)); lb_policy_args.lb_config = child_policy_config;
CreateChildPolicyLocked(child_policy_name, std::move(lb_policy_args));
if (grpc_lb_xds_trace.enabled()) { if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this, gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this,
child_policy_.get()); child_policy_.get());
} }
} }
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
grpc_json_destroy(child_policy_json);
} }
void XdsLb::OnChildPolicyRequestReresolutionLocked(void* arg, void XdsLb::OnChildPolicyRequestReresolutionLocked(void* arg,

@ -141,42 +141,14 @@ void ProcessedResolverResult::ParseServiceConfig(
void ProcessedResolverResult::ParseLbConfigFromServiceConfig( void ProcessedResolverResult::ParseLbConfigFromServiceConfig(
const grpc_json* field) { const grpc_json* field) {
if (lb_policy_config_ != nullptr) return; // Already found. if (lb_policy_config_ != nullptr) return; // Already found.
// Find the LB config global parameter. if (field->key == nullptr || strcmp(field->key, "loadBalancingConfig") != 0) {
if (field->key == nullptr || strcmp(field->key, "loadBalancingConfig") != 0 || return; // Not the LB config global parameter.
field->type != GRPC_JSON_ARRAY) {
return; // Not valid lb config array.
} }
// Find the first LB policy that this client supports. const grpc_json* policy =
for (grpc_json* lb_config = field->child; lb_config != nullptr; LoadBalancingPolicy::ParseLoadBalancingConfig(field);
lb_config = lb_config->next) { if (policy != nullptr) {
if (lb_config->type != GRPC_JSON_OBJECT) return; lb_policy_name_.reset(gpr_strdup(policy->key));
// Find the policy object. lb_policy_config_ = policy->child;
grpc_json* policy = nullptr;
for (grpc_json* field = lb_config->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || strcmp(field->key, "policy") != 0 ||
field->type != GRPC_JSON_OBJECT) {
return;
}
if (policy != nullptr) return; // Duplicate.
policy = field;
}
// Find the specific policy content since the policy object is of type
// "oneof".
grpc_json* policy_content = nullptr;
for (grpc_json* field = policy->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || field->type != GRPC_JSON_OBJECT) return;
if (policy_content != nullptr) return; // Violate "oneof" type.
policy_content = field;
}
// If we support this policy, then select it.
if (grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(
policy_content->key)) {
lb_policy_name_.reset(gpr_strdup(policy_content->key));
lb_policy_config_ = policy_content->child;
return;
}
} }
} }

Loading…
Cancel
Save