|
|
|
@ -125,8 +125,7 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
const char* name() const override { return kGrpclb; } |
|
|
|
|
|
|
|
|
|
void UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
RefCountedPtr<Config> lb_config) override; |
|
|
|
|
void UpdateLocked(UpdateArgs args) override; |
|
|
|
|
void ResetBackoffLocked() override; |
|
|
|
|
void FillChildRefsForChannelz( |
|
|
|
|
channelz::ChildRefsList* child_subchannels, |
|
|
|
@ -295,7 +294,8 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
void ShutdownLocked() override; |
|
|
|
|
|
|
|
|
|
// Helper functions used in UpdateLocked().
|
|
|
|
|
void ProcessChannelArgsLocked(const grpc_channel_args& args); |
|
|
|
|
void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses, |
|
|
|
|
const grpc_channel_args& args); |
|
|
|
|
void ParseLbConfig(Config* grpclb_config); |
|
|
|
|
static void OnBalancerChannelConnectivityChangedLocked(void* arg, |
|
|
|
|
grpc_error* error); |
|
|
|
@ -311,7 +311,8 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the child policy.
|
|
|
|
|
grpc_channel_args* CreateChildPolicyArgsLocked(); |
|
|
|
|
grpc_channel_args* CreateChildPolicyArgsLocked( |
|
|
|
|
bool is_backend_from_grpclb_load_balancer); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
|
|
|
|
const char* name, const grpc_channel_args* args); |
|
|
|
|
void CreateOrUpdateChildPolicyLocked(); |
|
|
|
@ -1204,7 +1205,6 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
const ServerAddressList& addresses, |
|
|
|
|
FakeResolverResponseGenerator* response_generator, |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses); |
|
|
|
|
// Channel args to remove.
|
|
|
|
|
static const char* args_to_remove[] = { |
|
|
|
|
// LB policy name, since we want to use the default (pick_first) in
|
|
|
|
@ -1217,15 +1217,6 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
// the LB channel than for the parent channel. The client channel
|
|
|
|
|
// factory will re-add this arg with the right value.
|
|
|
|
|
GRPC_ARG_SERVER_URI, |
|
|
|
|
// The resolved addresses, which will be generated by the name resolver
|
|
|
|
|
// used in the LB channel. Note that the LB channel will use the fake
|
|
|
|
|
// resolver, so this won't actually generate a query to DNS (or some
|
|
|
|
|
// other name service). However, the addresses returned by the fake
|
|
|
|
|
// resolver will have is_balancer=false, whereas our own addresses have
|
|
|
|
|
// is_balancer=true. We need the LB channel to return addresses with
|
|
|
|
|
// is_balancer=false so that it does not wind up recursively using the
|
|
|
|
|
// grpclb LB policy.
|
|
|
|
|
GRPC_ARG_SERVER_ADDRESS_LIST, |
|
|
|
|
// The fake resolver response generator, because we are replacing it
|
|
|
|
|
// with the one from the grpclb policy, used to propagate updates to
|
|
|
|
|
// the LB channel.
|
|
|
|
@ -1241,10 +1232,6 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
}; |
|
|
|
|
// Channel args to add.
|
|
|
|
|
const grpc_arg args_to_add[] = { |
|
|
|
|
// New address list.
|
|
|
|
|
// Note that we pass these in both when creating the LB channel
|
|
|
|
|
// and via the fake resolver. The latter is what actually gets used.
|
|
|
|
|
CreateServerAddressListChannelArg(&balancer_addresses), |
|
|
|
|
// The fake resolver response generator, which we use to inject
|
|
|
|
|
// address updates into the LB channel.
|
|
|
|
|
grpc_core::FakeResolverResponseGenerator::MakeChannelArg( |
|
|
|
@ -1262,7 +1249,7 @@ grpc_channel_args* BuildBalancerChannelArgs( |
|
|
|
|
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, |
|
|
|
|
GPR_ARRAY_SIZE(args_to_add)); |
|
|
|
|
// Make any necessary modifications for security.
|
|
|
|
|
return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); |
|
|
|
|
return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1388,11 +1375,10 @@ void GrpcLb::FillChildRefsForChannelz( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::UpdateLocked(const grpc_channel_args& args, |
|
|
|
|
RefCountedPtr<Config> lb_config) { |
|
|
|
|
void GrpcLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
const bool is_initial_update = lb_channel_ == nullptr; |
|
|
|
|
ParseLbConfig(lb_config.get()); |
|
|
|
|
ProcessChannelArgsLocked(args); |
|
|
|
|
ParseLbConfig(args.config.get()); |
|
|
|
|
ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args); |
|
|
|
|
// Update the existing child policy.
|
|
|
|
|
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); |
|
|
|
|
// If this is the initial update, start the fallback-at-startup checks
|
|
|
|
@ -1442,18 +1428,10 @@ ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) { |
|
|
|
|
return backend_addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { |
|
|
|
|
const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); |
|
|
|
|
if (addresses == nullptr) { |
|
|
|
|
// Ignore this update.
|
|
|
|
|
gpr_log( |
|
|
|
|
GPR_ERROR, |
|
|
|
|
"[grpclb %p] No valid LB addresses channel arg in update, ignoring.", |
|
|
|
|
this); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
void GrpcLb::ProcessAddressesAndChannelArgsLocked( |
|
|
|
|
const ServerAddressList& addresses, const grpc_channel_args& args) { |
|
|
|
|
// Update fallback address list.
|
|
|
|
|
fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); |
|
|
|
|
fallback_backend_addresses_ = ExtractBackendAddresses(addresses); |
|
|
|
|
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
|
|
|
|
|
// since we use this to trigger the client_load_reporting filter.
|
|
|
|
|
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; |
|
|
|
@ -1463,8 +1441,9 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { |
|
|
|
|
args_ = grpc_channel_args_copy_and_add_and_remove( |
|
|
|
|
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); |
|
|
|
|
// Construct args for balancer channel.
|
|
|
|
|
grpc_channel_args* lb_channel_args = |
|
|
|
|
BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); |
|
|
|
|
ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses); |
|
|
|
|
grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs( |
|
|
|
|
balancer_addresses, response_generator_.get(), &args); |
|
|
|
|
// Create balancer channel if needed.
|
|
|
|
|
if (lb_channel_ == nullptr) { |
|
|
|
|
char* uri_str; |
|
|
|
@ -1481,8 +1460,10 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { |
|
|
|
|
} |
|
|
|
|
// Propagate updates to the LB channel (pick_first) through the fake
|
|
|
|
|
// resolver.
|
|
|
|
|
response_generator_->SetResponse(lb_channel_args); |
|
|
|
|
grpc_channel_args_destroy(lb_channel_args); |
|
|
|
|
Resolver::Result result; |
|
|
|
|
result.addresses = std::move(balancer_addresses); |
|
|
|
|
result.args = lb_channel_args; |
|
|
|
|
response_generator_->SetResponse(std::move(result)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::ParseLbConfig(Config* grpclb_config) { |
|
|
|
@ -1649,25 +1630,9 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
// code for interacting with the child policy
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
ServerAddressList tmp_addresses; |
|
|
|
|
ServerAddressList* addresses = &tmp_addresses; |
|
|
|
|
bool is_backend_from_grpclb_load_balancer = false; |
|
|
|
|
if (fallback_mode_) { |
|
|
|
|
// Note: If fallback backend address list is empty, the child policy
|
|
|
|
|
// will go into state TRANSIENT_FAILURE.
|
|
|
|
|
addresses = &fallback_backend_addresses_; |
|
|
|
|
} else { |
|
|
|
|
tmp_addresses = serverlist_->GetServerAddressList( |
|
|
|
|
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); |
|
|
|
|
is_backend_from_grpclb_load_balancer = true; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(addresses != nullptr); |
|
|
|
|
// Replace the server address list in the channel args that we pass down to
|
|
|
|
|
// the subchannel.
|
|
|
|
|
static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; |
|
|
|
|
grpc_arg args_to_add[3] = { |
|
|
|
|
CreateServerAddressListChannelArg(addresses), |
|
|
|
|
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked( |
|
|
|
|
bool is_backend_from_grpclb_load_balancer) { |
|
|
|
|
grpc_arg args_to_add[2] = { |
|
|
|
|
// A channel arg indicating if the target is a backend inferred from a
|
|
|
|
|
// grpclb load balancer.
|
|
|
|
|
grpc_channel_arg_integer_create( |
|
|
|
@ -1675,15 +1640,12 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() { |
|
|
|
|
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), |
|
|
|
|
is_backend_from_grpclb_load_balancer), |
|
|
|
|
}; |
|
|
|
|
size_t num_args_to_add = 2; |
|
|
|
|
size_t num_args_to_add = 1; |
|
|
|
|
if (is_backend_from_grpclb_load_balancer) { |
|
|
|
|
args_to_add[2] = grpc_channel_arg_integer_create( |
|
|
|
|
args_to_add[num_args_to_add++] = grpc_channel_arg_integer_create( |
|
|
|
|
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); |
|
|
|
|
++num_args_to_add; |
|
|
|
|
} |
|
|
|
|
return grpc_channel_args_copy_and_add_and_remove( |
|
|
|
|
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, |
|
|
|
|
num_args_to_add); |
|
|
|
|
return grpc_channel_args_copy_and_add(args_, args_to_add, num_args_to_add); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked( |
|
|
|
@ -1717,8 +1679,25 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked( |
|
|
|
|
|
|
|
|
|
void GrpcLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
grpc_channel_args* args = CreateChildPolicyArgsLocked(); |
|
|
|
|
GPR_ASSERT(args != nullptr); |
|
|
|
|
// Construct update args.
|
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
bool is_backend_from_grpclb_load_balancer = false; |
|
|
|
|
if (fallback_mode_) { |
|
|
|
|
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
|
|
|
|
|
// received any serverlist from the balancer, we use the fallback backends
|
|
|
|
|
// returned by the resolver. Note that the fallback backend list may be
|
|
|
|
|
// empty, in which case the new round_robin policy will keep the requested
|
|
|
|
|
// picks pending.
|
|
|
|
|
update_args.addresses = fallback_backend_addresses_; |
|
|
|
|
} else { |
|
|
|
|
update_args.addresses = serverlist_->GetServerAddressList( |
|
|
|
|
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); |
|
|
|
|
is_backend_from_grpclb_load_balancer = true; |
|
|
|
|
} |
|
|
|
|
update_args.args = |
|
|
|
|
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer); |
|
|
|
|
GPR_ASSERT(update_args.args != nullptr); |
|
|
|
|
update_args.config = child_policy_config_; |
|
|
|
|
// If the child policy name changes, we need to create a new child
|
|
|
|
|
// policy. When this happens, we leave child_policy_ as-is and store
|
|
|
|
|
// the new child policy in pending_child_policy_. Once the new child
|
|
|
|
@ -1789,7 +1768,8 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this, |
|
|
|
|
child_policy_ == nullptr ? "" : "pending ", child_policy_name); |
|
|
|
|
} |
|
|
|
|
auto new_policy = CreateChildPolicyLocked(child_policy_name, args); |
|
|
|
|
auto new_policy = |
|
|
|
|
CreateChildPolicyLocked(child_policy_name, update_args.args); |
|
|
|
|
// Swap the policy into place.
|
|
|
|
|
auto& lb_policy = |
|
|
|
|
child_policy_ == nullptr ? child_policy_ : pending_child_policy_; |
|
|
|
@ -1813,9 +1793,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
policy_to_update == pending_child_policy_.get() ? "pending " : "", |
|
|
|
|
policy_to_update); |
|
|
|
|
} |
|
|
|
|
policy_to_update->UpdateLocked(*args, child_policy_config_); |
|
|
|
|
// Clean up.
|
|
|
|
|
grpc_channel_args_destroy(args); |
|
|
|
|
policy_to_update->UpdateLocked(std::move(update_args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|