|
|
|
@ -122,7 +122,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
const char* name() const override { return kXds; } |
|
|
|
|
|
|
|
|
|
void UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
grpc_json* lb_config) override; |
|
|
|
|
RefCountedPtr<Config> lb_config) override; |
|
|
|
|
void ResetBackoffLocked() override; |
|
|
|
|
void FillChildRefsForChannelz( |
|
|
|
|
channelz::ChildRefsList* child_subchannels, |
|
|
|
@ -242,9 +242,9 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// Parses the xds config given the JSON node of the first child of XdsConfig.
|
|
|
|
|
// If parsing succeeds, updates \a balancer_name, and updates \a
|
|
|
|
|
// child_policy_json_dump_ and \a fallback_policy_json_dump_ if they are also
|
|
|
|
|
// child_policy_config_ and \a fallback_policy_config_ if they are also
|
|
|
|
|
// found. Does nothing upon failure.
|
|
|
|
|
void ParseLbConfig(grpc_json* xds_config_json); |
|
|
|
|
void ParseLbConfig(Config* xds_config); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the balancer channel and call.
|
|
|
|
|
void StartBalancerCallLocked(); |
|
|
|
@ -303,7 +303,8 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// Timeout in milliseconds for before using fallback backend addresses.
|
|
|
|
|
// 0 means not using fallback.
|
|
|
|
|
UniquePtr<char> fallback_policy_json_string_; |
|
|
|
|
UniquePtr<char> fallback_policy_name_; |
|
|
|
|
RefCountedPtr<Config> fallback_policy_config_; |
|
|
|
|
int lb_fallback_timeout_ms_ = 0; |
|
|
|
|
// The backend addresses from the resolver.
|
|
|
|
|
UniquePtr<ServerAddressList> fallback_backend_addresses_; |
|
|
|
@ -313,8 +314,9 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
grpc_closure lb_on_fallback_; |
|
|
|
|
|
|
|
|
|
// The policy to use for the backends.
|
|
|
|
|
UniquePtr<char> child_policy_name_; |
|
|
|
|
RefCountedPtr<Config> child_policy_config_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
|
UniquePtr<char> child_policy_json_string_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -952,8 +954,7 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
// ctor and dtor
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// TODO(vishalpowar): Use lb_config in args to configure LB policy.
|
|
|
|
|
XdsLb::XdsLb(LoadBalancingPolicy::Args args) |
|
|
|
|
XdsLb::XdsLb(Args args) |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), |
|
|
|
|
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), |
|
|
|
|
lb_call_backoff_( |
|
|
|
@ -1087,11 +1088,12 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { |
|
|
|
|
grpc_channel_args_destroy(lb_channel_args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::ParseLbConfig(grpc_json* xds_config_json) { |
|
|
|
|
void XdsLb::ParseLbConfig(Config* xds_config) { |
|
|
|
|
const grpc_json* xds_config_json = xds_config->json(); |
|
|
|
|
const char* balancer_name = nullptr; |
|
|
|
|
grpc_json* child_policy = nullptr; |
|
|
|
|
grpc_json* fallback_policy = nullptr; |
|
|
|
|
for (grpc_json* field = xds_config_json; field != nullptr; |
|
|
|
|
for (const grpc_json* field = xds_config_json; field != nullptr; |
|
|
|
|
field = field->next) { |
|
|
|
|
if (field->key == nullptr) return; |
|
|
|
|
if (strcmp(field->key, "balancerName") == 0) { |
|
|
|
@ -1108,19 +1110,22 @@ void XdsLb::ParseLbConfig(grpc_json* xds_config_json) { |
|
|
|
|
} |
|
|
|
|
if (balancer_name == nullptr) return; // Required field.
|
|
|
|
|
if (child_policy != nullptr) { |
|
|
|
|
child_policy_json_string_ = |
|
|
|
|
UniquePtr<char>(grpc_json_dump_to_string(child_policy, 0 /* indent */)); |
|
|
|
|
child_policy_name_ = UniquePtr<char>(gpr_strdup(child_policy->key)); |
|
|
|
|
child_policy_config_ = MakeRefCounted<Config>(child_policy->child, |
|
|
|
|
xds_config->service_config()); |
|
|
|
|
} |
|
|
|
|
if (fallback_policy != nullptr) { |
|
|
|
|
fallback_policy_json_string_ = UniquePtr<char>( |
|
|
|
|
grpc_json_dump_to_string(fallback_policy, 0 /* indent */)); |
|
|
|
|
fallback_policy_name_ = UniquePtr<char>(gpr_strdup(fallback_policy->key)); |
|
|
|
|
fallback_policy_config_ = MakeRefCounted<Config>( |
|
|
|
|
fallback_policy->child, xds_config->service_config()); |
|
|
|
|
} |
|
|
|
|
balancer_name_ = UniquePtr<char>(gpr_strdup(balancer_name)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { |
|
|
|
|
void XdsLb::UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
RefCountedPtr<Config> lb_config) { |
|
|
|
|
const bool is_initial_update = lb_channel_ == nullptr; |
|
|
|
|
ParseLbConfig(lb_config); |
|
|
|
|
ParseLbConfig(lb_config.get()); |
|
|
|
|
// TODO(juanlishen): Pass fallback policy config update after fallback policy
|
|
|
|
|
// is added.
|
|
|
|
|
if (balancer_name_ == nullptr) { |
|
|
|
@ -1285,30 +1290,13 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, |
|
|
|
|
// code for interacting with the child policy
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void XdsLb::CreateChildPolicyLocked(const char* name, Args args) { |
|
|
|
|
GPR_ASSERT(child_policy_ == nullptr); |
|
|
|
|
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
|
|
|
|
name, std::move(args)); |
|
|
|
|
if (GPR_UNLIKELY(child_policy_ == nullptr)) { |
|
|
|
|
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Add the xDS's interested_parties pollset_set to that of the newly created
|
|
|
|
|
// child policy. This will make the child policy progress upon activity on
|
|
|
|
|
// xDS LB, which in turn is tied to the application's call.
|
|
|
|
|
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), |
|
|
|
|
interested_parties()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
bool is_backend_from_grpclb_load_balancer = false; |
|
|
|
|
// This should never be invoked if we do not have serverlist_, as fallback
|
|
|
|
|
// mode is disabled for xDS plugin.
|
|
|
|
|
GPR_ASSERT(serverlist_ != nullptr); |
|
|
|
|
GPR_ASSERT(serverlist_->num_servers > 0); |
|
|
|
|
UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_); |
|
|
|
|
GPR_ASSERT(addresses != nullptr); |
|
|
|
|
is_backend_from_grpclb_load_balancer = true; |
|
|
|
|
// Replace the server address list in the channel args that we pass down to
|
|
|
|
|
// the subchannel.
|
|
|
|
|
static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; |
|
|
|
@ -1318,7 +1306,7 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
// grpclb load balancer.
|
|
|
|
|
grpc_channel_arg_integer_create( |
|
|
|
|
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER), |
|
|
|
|
is_backend_from_grpclb_load_balancer), |
|
|
|
|
1), |
|
|
|
|
}; |
|
|
|
|
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( |
|
|
|
|
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, |
|
|
|
@ -1326,25 +1314,27 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
return args; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::CreateChildPolicyLocked(const char* name, Args args) { |
|
|
|
|
GPR_ASSERT(child_policy_ == nullptr); |
|
|
|
|
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
|
|
|
|
name, std::move(args)); |
|
|
|
|
if (GPR_UNLIKELY(child_policy_ == nullptr)) { |
|
|
|
|
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Add the xDS's interested_parties pollset_set to that of the newly created
|
|
|
|
|
// child policy. This will make the child policy progress upon activity on
|
|
|
|
|
// xDS LB, which in turn is tied to the application's call.
|
|
|
|
|
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), |
|
|
|
|
interested_parties()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
grpc_channel_args* args = CreateChildPolicyArgsLocked(); |
|
|
|
|
GPR_ASSERT(args != nullptr); |
|
|
|
|
const char* child_policy_name = nullptr; |
|
|
|
|
grpc_json* child_policy_config = nullptr; |
|
|
|
|
grpc_json* child_policy_json = |
|
|
|
|
grpc_json_parse_string(child_policy_json_string_.get()); |
|
|
|
|
// TODO(juanlishen): If the child policy is not configured via service config,
|
|
|
|
|
// use whatever algorithm is specified by the balancer.
|
|
|
|
|
if (child_policy_json != nullptr) { |
|
|
|
|
child_policy_name = child_policy_json->key; |
|
|
|
|
child_policy_config = child_policy_json->child; |
|
|
|
|
} else { |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] No valid child policy LB config", this); |
|
|
|
|
} |
|
|
|
|
child_policy_name = "round_robin"; |
|
|
|
|
} |
|
|
|
|
// TODO(juanlishen): Switch policy according to child_policy_config->key.
|
|
|
|
|
if (child_policy_ == nullptr) { |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
@ -1352,7 +1342,10 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
|
lb_policy_args.channel_control_helper = |
|
|
|
|
UniquePtr<ChannelControlHelper>(New<Helper>(Ref())); |
|
|
|
|
CreateChildPolicyLocked(child_policy_name, std::move(lb_policy_args)); |
|
|
|
|
CreateChildPolicyLocked(child_policy_name_ == nullptr |
|
|
|
|
? "round_robin" |
|
|
|
|
: child_policy_name_.get(), |
|
|
|
|
std::move(lb_policy_args)); |
|
|
|
|
if (grpc_lb_xds_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this, |
|
|
|
|
child_policy_.get()); |
|
|
|
@ -1362,9 +1355,8 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Updating child policy %p", this, |
|
|
|
|
child_policy_.get()); |
|
|
|
|
} |
|
|
|
|
child_policy_->UpdateLocked(*args, child_policy_config); |
|
|
|
|
child_policy_->UpdateLocked(*args, child_policy_config_); |
|
|
|
|
grpc_channel_args_destroy(args); |
|
|
|
|
grpc_json_destroy(child_policy_json); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|