Merge pull request #18254 from grpc/revert-18078-grpclb_child_policy_configurable

Revert "Make grpclb child policy configurable"
pull/18268/head
Mark D. Roth 6 years ago committed by GitHub
commit 6dcf6d1645
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 346
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 144
      test/cpp/end2end/grpclb_end2end_test.cc

@ -39,14 +39,15 @@
/// the balancer, we update the round_robin policy with the new list of /// the balancer, we update the round_robin policy with the new list of
/// addresses. If we cannot communicate with the balancer on startup, /// addresses. If we cannot communicate with the balancer on startup,
/// however, we may enter fallback mode, in which case we will populate /// however, we may enter fallback mode, in which case we will populate
/// the child policy's addresses from the backend addresses returned by the /// the RR policy's addresses from the backend addresses returned by the
/// resolver. /// resolver.
/// ///
/// Once a child policy instance is in place (and getting updated as described), /// Once an RR policy instance is in place (and getting updated as described),
/// calls for a pick, a ping, or a cancellation will be serviced right /// calls for a pick, a ping, or a cancellation will be serviced right
/// away by forwarding them to the child policy instance. Any time there's no /// away by forwarding them to the RR instance. Any time there's no RR
/// child policy available (i.e., right after the creation of the gRPCLB /// policy available (i.e., right after the creation of the gRPCLB policy),
/// policy), pick requests are queued. /// pick and ping requests are added to a list of pending picks and pings
/// to be flushed and serviced when the RR policy instance becomes available.
/// ///
/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
/// high level design and details. /// high level design and details.
@ -278,23 +279,16 @@ class GrpcLb : public LoadBalancingPolicy {
UniquePtr<SubchannelPicker> picker) override; UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override; void RequestReresolution() override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private: private:
bool CalledByPendingChild() const;
bool CalledByCurrentChild() const;
RefCountedPtr<GrpcLb> parent_; RefCountedPtr<GrpcLb> parent_;
LoadBalancingPolicy* child_ = nullptr;
}; };
~GrpcLb(); ~GrpcLb();
void ShutdownLocked() override; void ShutdownLocked() override;
// Helper functions used in UpdateLocked(). // Helper function used in UpdateLocked().
void ProcessChannelArgsLocked(const grpc_channel_args& args); void ProcessChannelArgsLocked(const grpc_channel_args& args);
void ParseLbConfig(Config* grpclb_config);
// Methods for dealing with the balancer channel and call. // Methods for dealing with the balancer channel and call.
void StartBalancerCallLocked(); void StartBalancerCallLocked();
@ -302,11 +296,10 @@ class GrpcLb : public LoadBalancingPolicy {
void StartBalancerCallRetryTimerLocked(); void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
// Methods for dealing with the child policy. // Methods for dealing with the RR policy.
grpc_channel_args* CreateChildPolicyArgsLocked(); grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( void CreateRoundRobinPolicyLocked(Args args);
const char* name, grpc_channel_args* args); void CreateOrUpdateRoundRobinPolicyLocked();
void CreateOrUpdateChildPolicyLocked();
// Who the client is trying to communicate with. // Who the client is trying to communicate with.
const char* server_name_ = nullptr; const char* server_name_ = nullptr;
@ -352,14 +345,8 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_; grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_; grpc_closure lb_on_fallback_;
// The child policy to use for the backends. // The RR policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> rr_policy_;
// When switching child policies, the new policy will be stored here
// until it reports READY, at which point it will be moved to child_policy_.
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// The child policy name and config.
UniquePtr<char> child_policy_name_;
RefCountedPtr<Config> child_policy_config_;
}; };
// //
@ -571,30 +558,14 @@ GrpcLb::Picker::PickResult GrpcLb::Picker::Pick(PickState* pick,
// GrpcLb::Helper // GrpcLb::Helper
// //
bool GrpcLb::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_child_policy_.get();
}
bool GrpcLb::Helper::CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->child_policy_.get();
}
Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) { Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
if (parent_->shutting_down_ || if (parent_->shutting_down_) return nullptr;
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(args);
} }
grpc_channel* GrpcLb::Helper::CreateChannel(const char* target, grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (parent_->shutting_down_ || if (parent_->shutting_down_) return nullptr;
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateChannel(target, args); return parent_->channel_control_helper()->CreateChannel(target, args);
} }
@ -605,50 +576,31 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
GRPC_ERROR_UNREF(state_error); GRPC_ERROR_UNREF(state_error);
return; return;
} }
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] pending child policy %p reports state=%s",
parent_.get(), this, parent_->pending_child_policy_.get(),
grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) {
GRPC_ERROR_UNREF(state_error);
return;
}
parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return;
}
// There are three cases to consider here: // There are three cases to consider here:
// 1. We're in fallback mode. In this case, we're always going to use // 1. We're in fallback mode. In this case, we're always going to use
// the child policy's result, so we pass its picker through as-is. // RR's result, so we pass its picker through as-is.
// 2. The serverlist contains only drop entries. In this case, we // 2. The serverlist contains only drop entries. In this case, we
// want to use our own picker so that we can return the drops. // want to use our own picker so that we can return the drops.
// 3. Not in fallback mode and serverlist is not all drops (i.e., it // 3. Not in fallback mode and serverlist is not all drops (i.e., it
// may be empty or contain at least one backend address). There are // may be empty or contain at least one backend address). There are
// two sub-cases: // two sub-cases:
// a. The child policy is reporting state READY. In this case, we wrap // a. RR is reporting state READY. In this case, we wrap RR's
// the child's picker in our own, so that we can handle drops and LB // picker in our own, so that we can handle drops and LB token
// token metadata for each pick. // metadata for each pick.
// b. The child policy is reporting a state other than READY. In this // b. RR is reporting a state other than READY. In this case, we
// case, we don't want to use our own picker, because we don't want // don't want to use our own picker, because we don't want to
// to process drops for picks that yield a QUEUE result; this would // process drops for picks that yield a QUEUE result; this would
// result in dropping too many calls, since we will see the // result in dropping too many calls, since we will see the
// queued picks multiple times, and we'd consider each one a // queued picks multiple times, and we'd consider each one a
// separate call for the drop calculation. // separate call for the drop calculation.
// //
// Cases 1 and 3b: return picker from the child policy as-is. // Cases 1 and 3b: return picker from RR as-is.
if (parent_->serverlist_ == nullptr || if (parent_->serverlist_ == nullptr ||
(!parent_->serverlist_->ContainsAllDropEntries() && (!parent_->serverlist_->ContainsAllDropEntries() &&
state != GRPC_CHANNEL_READY)) { state != GRPC_CHANNEL_READY)) {
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s passing child picker %p as-is", "[grpclb %p helper %p] state=%s passing RR picker %p as-is",
parent_.get(), this, grpc_connectivity_state_name(state), parent_.get(), this, grpc_connectivity_state_name(state),
picker.get()); picker.get());
} }
@ -656,9 +608,9 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
std::move(picker)); std::move(picker));
return; return;
} }
// Cases 2 and 3a: wrap picker from the child in our own picker. // Cases 2 and 3a: wrap picker from RR in our own picker.
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p", gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping RR picker %p",
parent_.get(), this, grpc_connectivity_state_name(state), parent_.get(), this, grpc_connectivity_state_name(state),
picker.get()); picker.get());
} }
@ -676,19 +628,15 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
void GrpcLb::Helper::RequestReresolution() { void GrpcLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return; if (parent_->shutting_down_) return;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated pending child).
if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p] Re-resolution requested from child policy (%p).", "[grpclb %p] Re-resolution requested from the internal RR policy "
parent_.get(), child_); "(%p).",
parent_.get(), parent_->rr_policy_.get());
} }
// If we are talking to a balancer, we expect to get updated addresses // If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from // from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the // the RR policy. Otherwise, pass the re-resolution request up to the
// channel. // channel.
if (parent_->lb_calld_ == nullptr || if (parent_->lb_calld_ == nullptr ||
!parent_->lb_calld_->seen_initial_response()) { !parent_->lb_calld_->seen_initial_response()) {
@ -1036,7 +984,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
// instance will be destroyed either upon the next update or when the // instance will be destroyed either upon the next update or when the
// GrpcLb instance is destroyed. // GrpcLb instance is destroyed.
grpclb_policy->serverlist_ = std::move(serverlist_wrapper); grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
grpclb_policy->CreateOrUpdateChildPolicyLocked(); grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
} }
} else { } else {
// No valid initial response or serverlist found. // No valid initial response or serverlist found.
@ -1252,8 +1200,7 @@ void GrpcLb::ShutdownLocked() {
if (fallback_timer_callback_pending_) { if (fallback_timer_callback_pending_) {
grpc_timer_cancel(&lb_fallback_timer_); grpc_timer_cancel(&lb_fallback_timer_);
} }
child_policy_.reset(); rr_policy_.reset();
pending_child_policy_.reset();
// We destroy the LB channel here instead of in our destructor because // We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to // destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be // OnBalancerChannelConnectivityChangedLocked(), and we need to be
@ -1273,24 +1220,17 @@ void GrpcLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) { if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_); grpc_channel_reset_connect_backoff(lb_channel_);
} }
if (child_policy_ != nullptr) { if (rr_policy_ != nullptr) {
child_policy_->ResetBackoffLocked(); rr_policy_->ResetBackoffLocked();
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
} }
} }
void GrpcLb::FillChildRefsForChannelz( void GrpcLb::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) { channelz::ChildRefsList* child_channels) {
// delegate to the child policy to fill the children subchannels. // delegate to the RoundRobin to fill the children subchannels.
if (child_policy_ != nullptr) { if (rr_policy_ != nullptr) {
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
} }
gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_); gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_);
if (uuid != 0) { if (uuid != 0) {
@ -1298,32 +1238,6 @@ void GrpcLb::FillChildRefsForChannelz(
} }
} }
void GrpcLb::UpdateLocked(const grpc_channel_args& args,
RefCountedPtr<Config> lb_config) {
const bool is_initial_update = lb_channel_ == nullptr;
ParseLbConfig(lb_config.get());
ProcessChannelArgsLocked(args);
// Update the existing child policy.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback timer.
if (is_initial_update) {
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
StartBalancerCallLocked();
}
}
//
// helpers for UpdateLocked()
//
// Returns the backend addresses extracted from the given addresses. // Returns the backend addresses extracted from the given addresses.
UniquePtr<ServerAddressList> ExtractBackendAddresses( UniquePtr<ServerAddressList> ExtractBackendAddresses(
const ServerAddressList& addresses) { const ServerAddressList& addresses) {
@ -1385,26 +1299,25 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args); grpc_channel_args_destroy(lb_channel_args);
} }
void GrpcLb::ParseLbConfig(Config* grpclb_config) { void GrpcLb::UpdateLocked(const grpc_channel_args& args,
const grpc_json* child_policy = nullptr; RefCountedPtr<Config> lb_config) {
if (grpclb_config != nullptr) { const bool is_initial_update = lb_channel_ == nullptr;
const grpc_json* grpclb_config_json = grpclb_config->json(); ProcessChannelArgsLocked(args);
for (const grpc_json* field = grpclb_config_json; field != nullptr; // Update the existing RR policy.
field = field->next) { if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked();
if (field->key == nullptr) return; // If this is the initial update, start the fallback timer and the
if (strcmp(field->key, "childPolicy") == 0) { // balancer call.
if (child_policy != nullptr) return; // Duplicate. if (is_initial_update) {
child_policy = ParseLoadBalancingConfig(field); if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
} !fallback_timer_callback_pending_) {
} grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
} }
if (child_policy != nullptr) { StartBalancerCallLocked();
child_policy_name_ = UniquePtr<char>(gpr_strdup(child_policy->key));
child_policy_config_ = MakeRefCounted<Config>(
child_policy->child, grpclb_config->service_config());
} else {
child_policy_name_.reset();
child_policy_config_.reset();
} }
} }
@ -1439,7 +1352,7 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
grpclb_policy); grpclb_policy);
} }
GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr); GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
grpclb_policy->CreateOrUpdateChildPolicyLocked(); grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
} }
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
} }
@ -1483,10 +1396,10 @@ void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
} }
// //
// code for interacting with the child policy // code for interacting with the RR policy
// //
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() { grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
ServerAddressList tmp_addresses; ServerAddressList tmp_addresses;
ServerAddressList* addresses = &tmp_addresses; ServerAddressList* addresses = &tmp_addresses;
bool is_backend_from_grpclb_load_balancer = false; bool is_backend_from_grpclb_load_balancer = false;
@ -1495,7 +1408,7 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() {
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true; is_backend_from_grpclb_load_balancer = true;
} else { } else {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends // received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be // returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested // empty, in which case the new round_robin policy will keep the requested
@ -1522,134 +1435,49 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() {
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
++num_args_to_add; ++num_args_to_add;
} }
return grpc_channel_args_copy_and_add_and_remove( grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
num_args_to_add); num_args_to_add);
return args;
} }
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked( void GrpcLb::CreateRoundRobinPolicyLocked(Args args) {
const char* name, grpc_channel_args* args) { GPR_ASSERT(rr_policy_ == nullptr);
Helper* helper = New<Helper>(Ref()); rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args lb_policy_args; "round_robin", std::move(args));
lb_policy_args.combiner = combiner(); if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
lb_policy_args.args = args; gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
lb_policy_args.channel_control_helper = this);
UniquePtr<ChannelControlHelper>(helper); return;
OrphanablePtr<LoadBalancingPolicy> lb_policy = }
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this, gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
name, lb_policy.get()); rr_policy_.get());
} }
// Add the gRPC LB's interested_parties pollset_set to that of the newly // Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon // created RR policy. This will make the RR policy progress upon activity on
// activity on gRPC LB, which in turn is tied to the application's call. // gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
interested_parties()); interested_parties());
return lb_policy;
} }
void GrpcLb::CreateOrUpdateChildPolicyLocked() { void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
if (shutting_down_) return; if (shutting_down_) return;
grpc_channel_args* args = CreateChildPolicyArgsLocked(); grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
GPR_ASSERT(args != nullptr); GPR_ASSERT(args != nullptr);
// If the child policy name changes, we need to create a new child if (rr_policy_ == nullptr) {
// policy. When this happens, we leave child_policy_ as-is and store LoadBalancingPolicy::Args lb_policy_args;
// the new child policy in pending_child_policy_. Once the new child lb_policy_args.combiner = combiner();
// policy transitions into state READY, we swap it into child_policy_, lb_policy_args.args = args;
// replacing the original child policy. So pending_child_policy_ is lb_policy_args.channel_control_helper =
// non-null only between when we apply an update that changes the child UniquePtr<ChannelControlHelper>(New<Helper>(Ref()));
// policy name and when the new child reports state READY. CreateRoundRobinPolicyLocked(std::move(lb_policy_args));
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* child_policy_name =
child_policy_name_ == nullptr ? "round_robin" : child_policy_name_.get();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
} }
lb_policy = CreateChildPolicyLocked(child_policy_name, args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this, gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "", rr_policy_.get());
policy_to_update);
} }
policy_to_update->UpdateLocked(*args, child_policy_config_); rr_policy_->UpdateLocked(*args, nullptr);
// Clean up.
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
} }

@ -723,150 +723,6 @@ TEST_F(SingleBalancerTest, SelectGrpclbWithMigrationServiceConfig) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
} }
TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) {
SetNextResolutionAllBalancers(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"pick_first\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
const size_t kNumRpcs = num_backends_ * 2;
CheckRpcSendOk(kNumRpcs, 1000 /* timeout_ms */, true /* wait_for_ready */);
balancers_[0]->NotifyDoneWithServerlists();
// Check that all requests went to the first backend. This verifies
// that we used pick_first instead of round_robin as the child policy.
EXPECT_EQ(backend_servers_[0].service_->request_count(), kNumRpcs);
for (size_t i = 1; i < backends_.size(); ++i) {
EXPECT_EQ(backend_servers_[i].service_->request_count(), 0UL);
}
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, SwapChildPolicy) {
SetNextResolutionAllBalancers(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"pick_first\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
const size_t kNumRpcs = num_backends_ * 2;
CheckRpcSendOk(kNumRpcs, 1000 /* timeout_ms */, true /* wait_for_ready */);
// Check that all requests went to the first backend. This verifies
// that we used pick_first instead of round_robin as the child policy.
EXPECT_EQ(backend_servers_[0].service_->request_count(), kNumRpcs);
for (size_t i = 1; i < backends_.size(); ++i) {
EXPECT_EQ(backend_servers_[i].service_->request_count(), 0UL);
}
// Send new resolution that removes child policy from service config.
SetNextResolutionAllBalancers("{}");
WaitForAllBackends();
CheckRpcSendOk(kNumRpcs, 1000 /* timeout_ms */, true /* wait_for_ready */);
// Check that every backend saw the same number of requests. This verifies
// that we used round_robin.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(backend_servers_[i].service_->request_count(), 2UL);
}
// Done.
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, UpdatesGoToMostRecentChildPolicy) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
int unreachable_balancer_port = grpc_pick_unused_port_or_die();
int unreachable_backend_port = grpc_pick_unused_port_or_die();
// Phase 1: Start with RR pointing to first backend.
gpr_log(GPR_INFO, "PHASE 1: Initial setup with RR with first backend");
SetNextResolution(
{
// Unreachable balancer.
{unreachable_balancer_port, true, ""},
// Fallback address: first backend.
{backend_servers_[0].port_, false, ""},
},
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"round_robin\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
// RPCs should go to first backend.
WaitForBackend(0);
// Phase 2: Switch to PF pointing to unreachable backend.
gpr_log(GPR_INFO, "PHASE 2: Update to use PF with unreachable backend");
SetNextResolution(
{
// Unreachable balancer.
{unreachable_balancer_port, true, ""},
// Fallback address: unreachable backend.
{unreachable_backend_port, false, ""},
},
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"pick_first\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
// RPCs should continue to go to the first backend, because the new
// PF child policy will never go into state READY.
WaitForBackend(0);
// Phase 3: Switch back to RR pointing to second and third backends.
// This ensures that we create a new policy rather than updating the
// pending PF policy.
gpr_log(GPR_INFO, "PHASE 3: Update to use RR again with two backends");
SetNextResolution(
{
// Unreachable balancer.
{unreachable_balancer_port, true, ""},
// Fallback address: second and third backends.
{backend_servers_[1].port_, false, ""},
{backend_servers_[2].port_, false, ""},
},
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"childPolicy\":[\n"
" { \"round_robin\":{} }\n"
" ]\n"
" } }\n"
" ]\n"
"}");
// RPCs should go to the second and third backends.
WaitForBackend(1);
WaitForBackend(2);
}
TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
SetNextResolutionAllBalancers(); SetNextResolutionAllBalancers();
// Same backend listed twice. // Same backend listed twice.

Loading…
Cancel
Save