diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index c5d1ff22a9d..e21b1789172 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -39,15 +39,14 @@ /// the balancer, we update the round_robin policy with the new list of /// addresses. If we cannot communicate with the balancer on startup, /// however, we may enter fallback mode, in which case we will populate -/// the RR policy's addresses from the backend addresses returned by the +/// the child policy's addresses from the backend addresses returned by the /// resolver. /// -/// Once an RR policy instance is in place (and getting updated as described), +/// Once a child policy instance is in place (and getting updated as described), /// calls for a pick, a ping, or a cancellation will be serviced right -/// away by forwarding them to the RR instance. Any time there's no RR -/// policy available (i.e., right after the creation of the gRPCLB policy), -/// pick and ping requests are added to a list of pending picks and pings -/// to be flushed and serviced when the RR policy instance becomes available. +/// away by forwarding them to the child policy instance. Any time there's no +/// child policy available (i.e., right after the creation of the gRPCLB +/// policy), pick requests are queued. /// /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the /// high level design and details. @@ -279,16 +278,23 @@ class GrpcLb : public LoadBalancingPolicy { UniquePtr picker) override; void RequestReresolution() override; + void set_child(LoadBalancingPolicy* child) { child_ = child; } + private: + bool CalledByPendingChild() const; + bool CalledByCurrentChild() const; + RefCountedPtr parent_; + LoadBalancingPolicy* child_ = nullptr; }; ~GrpcLb(); void ShutdownLocked() override; - // Helper function used in UpdateLocked(). + // Helper functions used in UpdateLocked(). void ProcessChannelArgsLocked(const grpc_channel_args& args); + void ParseLbConfig(Config* grpclb_config); // Methods for dealing with the balancer channel and call. void StartBalancerCallLocked(); @@ -296,10 +302,11 @@ class GrpcLb : public LoadBalancingPolicy { void StartBalancerCallRetryTimerLocked(); static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); - // Methods for dealing with the RR policy. - grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); - void CreateRoundRobinPolicyLocked(Args args); - void CreateOrUpdateRoundRobinPolicyLocked(); + // Methods for dealing with the child policy. + grpc_channel_args* CreateChildPolicyArgsLocked(); + OrphanablePtr CreateChildPolicyLocked( + const char* name, grpc_channel_args* args); + void CreateOrUpdateChildPolicyLocked(); // Who the client is trying to communicate with. const char* server_name_ = nullptr; @@ -345,8 +352,17 @@ class GrpcLb : public LoadBalancingPolicy { grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; - // The RR policy to use for the backends. - OrphanablePtr rr_policy_; + // Lock held when modifying the value of child_policy_ or + // pending_child_policy_. + gpr_mu child_policy_mu_; + // The child policy to use for the backends. + OrphanablePtr child_policy_; + // When switching child policies, the new policy will be stored here + // until it reports READY, at which point it will be moved to child_policy_. + OrphanablePtr pending_child_policy_; + // The child policy name and config. + UniquePtr child_policy_name_; + RefCountedPtr child_policy_config_; }; // @@ -558,14 +574,30 @@ GrpcLb::Picker::PickResult GrpcLb::Picker::Pick(PickState* pick, // GrpcLb::Helper // +bool GrpcLb::Helper::CalledByPendingChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->pending_child_policy_.get(); +} + +bool GrpcLb::Helper::CalledByCurrentChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->child_policy_.get(); +} + Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) { - if (parent_->shutting_down_) return nullptr; + if (parent_->shutting_down_ || + (!CalledByPendingChild() && !CalledByCurrentChild())) { + return nullptr; + } return parent_->channel_control_helper()->CreateSubchannel(args); } grpc_channel* GrpcLb::Helper::CreateChannel(const char* target, const grpc_channel_args& args) { - if (parent_->shutting_down_) return nullptr; + if (parent_->shutting_down_ || + (!CalledByPendingChild() && !CalledByCurrentChild())) { + return nullptr; + } return parent_->channel_control_helper()->CreateChannel(target, args); } @@ -576,31 +608,51 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, GRPC_ERROR_UNREF(state_error); return; } + // If this request is from the pending child policy, ignore it until + // it reports READY, at which point we swap it into place. + if (CalledByPendingChild()) { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p helper %p] pending child policy %p reports state=%s", + parent_.get(), this, parent_->pending_child_policy_.get(), + grpc_connectivity_state_name(state)); + } + if (state != GRPC_CHANNEL_READY) { + GRPC_ERROR_UNREF(state_error); + return; + } + MutexLock lock(&parent_->child_policy_mu_); + parent_->child_policy_ = std::move(parent_->pending_child_policy_); + } else if (!CalledByCurrentChild()) { + // This request is from an outdated child, so ignore it. + GRPC_ERROR_UNREF(state_error); + return; + } // There are three cases to consider here: // 1. We're in fallback mode. In this case, we're always going to use - // RR's result, so we pass its picker through as-is. + // the child policy's result, so we pass its picker through as-is. // 2. The serverlist contains only drop entries. In this case, we // want to use our own picker so that we can return the drops. // 3. Not in fallback mode and serverlist is not all drops (i.e., it // may be empty or contain at least one backend address). There are // two sub-cases: - // a. RR is reporting state READY. In this case, we wrap RR's - // picker in our own, so that we can handle drops and LB token - // metadata for each pick. - // b. RR is reporting a state other than READY. In this case, we - // don't want to use our own picker, because we don't want to - // process drops for picks that yield a QUEUE result; this would + // a. The child policy is reporting state READY. In this case, we wrap + // the child's picker in our own, so that we can handle drops and LB + // token metadata for each pick. + // b. The child policy is reporting a state other than READY. In this + // case, we don't want to use our own picker, because we don't want + // to process drops for picks that yield a QUEUE result; this would // result in dropping too many calls, since we will see the // queued picks multiple times, and we'd consider each one a // separate call for the drop calculation. // - // Cases 1 and 3b: return picker from RR as-is. + // Cases 1 and 3b: return picker from the child policy as-is. if (parent_->serverlist_ == nullptr || (!parent_->serverlist_->ContainsAllDropEntries() && state != GRPC_CHANNEL_READY)) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p helper %p] state=%s passing RR picker %p as-is", + "[grpclb %p helper %p] state=%s passing child picker %p as-is", parent_.get(), this, grpc_connectivity_state_name(state), picker.get()); } @@ -608,9 +660,9 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, std::move(picker)); return; } - // Cases 2 and 3a: wrap picker from RR in our own picker. + // Cases 2 and 3a: wrap picker from the child in our own picker. if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping RR picker %p", + gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p", parent_.get(), this, grpc_connectivity_state_name(state), picker.get()); } @@ -628,15 +680,19 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::RequestReresolution() { if (parent_->shutting_down_) return; + // If there is a pending child policy, ignore re-resolution requests + // from the current child policy (or any outdated pending child). + if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { + return; + } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Re-resolution requested from the internal RR policy " - "(%p).", - parent_.get(), parent_->rr_policy_.get()); + "[grpclb %p] Re-resolution requested from child policy (%p).", + parent_.get(), child_); } // If we are talking to a balancer, we expect to get updated addresses // from the balancer, so we can ignore the re-resolution request from - // the RR policy. Otherwise, pass the re-resolution request up to the + // the child policy. Otherwise, pass the re-resolution request up to the // channel. if (parent_->lb_calld_ == nullptr || !parent_->lb_calld_->seen_initial_response()) { @@ -984,7 +1040,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( // instance will be destroyed either upon the next update or when the // GrpcLb instance is destroyed. grpclb_policy->serverlist_ = std::move(serverlist_wrapper); - grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); + grpclb_policy->CreateOrUpdateChildPolicyLocked(); } } else { // No valid initial response or serverlist found. @@ -1164,6 +1220,7 @@ GrpcLb::GrpcLb(Args args) .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { + gpr_mu_init(&child_policy_mu_); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -1189,6 +1246,7 @@ GrpcLb::GrpcLb(Args args) GrpcLb::~GrpcLb() { gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); + gpr_mu_destroy(&child_policy_mu_); } void GrpcLb::ShutdownLocked() { @@ -1200,7 +1258,11 @@ void GrpcLb::ShutdownLocked() { if (fallback_timer_callback_pending_) { grpc_timer_cancel(&lb_fallback_timer_); } - rr_policy_.reset(); + { + MutexLock lock(&child_policy_mu_); + child_policy_.reset(); + pending_child_policy_.reset(); + } // We destroy the LB channel here instead of in our destructor because // destroying the channel triggers a last callback to // OnBalancerChannelConnectivityChangedLocked(), and we need to be @@ -1220,17 +1282,30 @@ void GrpcLb::ResetBackoffLocked() { if (lb_channel_ != nullptr) { grpc_channel_reset_connect_backoff(lb_channel_); } - if (rr_policy_ != nullptr) { - rr_policy_->ResetBackoffLocked(); + if (child_policy_ != nullptr) { + child_policy_->ResetBackoffLocked(); + } + if (pending_child_policy_ != nullptr) { + pending_child_policy_->ResetBackoffLocked(); } } void GrpcLb::FillChildRefsForChannelz( channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_channels) { - // delegate to the RoundRobin to fill the children subchannels. - if (rr_policy_ != nullptr) { - rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); + { + // Delegate to the child policy to fill the children subchannels. + // This must be done holding child_policy_mu_, since this method + // does not run in the combiner. + MutexLock lock(&child_policy_mu_); + if (child_policy_ != nullptr) { + child_policy_->FillChildRefsForChannelz(child_subchannels, + child_channels); + } + if (pending_child_policy_ != nullptr) { + pending_child_policy_->FillChildRefsForChannelz(child_subchannels, + child_channels); + } } gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_); if (uuid != 0) { @@ -1238,6 +1313,32 @@ void GrpcLb::FillChildRefsForChannelz( } } +void GrpcLb::UpdateLocked(const grpc_channel_args& args, + RefCountedPtr lb_config) { + const bool is_initial_update = lb_channel_ == nullptr; + ParseLbConfig(lb_config.get()); + ProcessChannelArgsLocked(args); + // Update the existing child policy. + if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); + // If this is the initial update, start the fallback timer. + if (is_initial_update) { + if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && + !fallback_timer_callback_pending_) { + grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; + Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, + grpc_combiner_scheduler(combiner())); + fallback_timer_callback_pending_ = true; + grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + } + StartBalancerCallLocked(); + } +} + +// +// helpers for UpdateLocked() +// + // Returns the backend addresses extracted from the given addresses. UniquePtr ExtractBackendAddresses( const ServerAddressList& addresses) { @@ -1299,25 +1400,26 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { grpc_channel_args_destroy(lb_channel_args); } -void GrpcLb::UpdateLocked(const grpc_channel_args& args, - RefCountedPtr lb_config) { - const bool is_initial_update = lb_channel_ == nullptr; - ProcessChannelArgsLocked(args); - // Update the existing RR policy. - if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked(); - // If this is the initial update, start the fallback timer and the - // balancer call. - if (is_initial_update) { - if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && - !fallback_timer_callback_pending_) { - grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; - Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, - grpc_combiner_scheduler(combiner())); - fallback_timer_callback_pending_ = true; - grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); +void GrpcLb::ParseLbConfig(Config* grpclb_config) { + const grpc_json* child_policy = nullptr; + if (grpclb_config != nullptr) { + const grpc_json* grpclb_config_json = grpclb_config->json(); + for (const grpc_json* field = grpclb_config_json; field != nullptr; + field = field->next) { + if (field->key == nullptr) return; + if (strcmp(field->key, "childPolicy") == 0) { + if (child_policy != nullptr) return; // Duplicate. + child_policy = ParseLoadBalancingConfig(field); + } } - StartBalancerCallLocked(); + } + if (child_policy != nullptr) { + child_policy_name_ = UniquePtr(gpr_strdup(child_policy->key)); + child_policy_config_ = MakeRefCounted( + child_policy->child, grpclb_config->service_config()); + } else { + child_policy_name_.reset(); + child_policy_config_.reset(); } } @@ -1352,7 +1454,7 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { grpclb_policy); } GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr); - grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); + grpclb_policy->CreateOrUpdateChildPolicyLocked(); } grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); } @@ -1396,10 +1498,10 @@ void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { } // -// code for interacting with the RR policy +// code for interacting with the child policy // -grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { +grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() { ServerAddressList tmp_addresses; ServerAddressList* addresses = &tmp_addresses; bool is_backend_from_grpclb_load_balancer = false; @@ -1408,7 +1510,7 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); is_backend_from_grpclb_load_balancer = true; } else { - // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't + // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't // received any serverlist from the balancer, we use the fallback backends // returned by the resolver. Note that the fallback backend list may be // empty, in which case the new round_robin policy will keep the requested @@ -1435,49 +1537,139 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); ++num_args_to_add; } - grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( + return grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, num_args_to_add); - return args; } -void GrpcLb::CreateRoundRobinPolicyLocked(Args args) { - GPR_ASSERT(rr_policy_ == nullptr); - rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - "round_robin", std::move(args)); - if (GPR_UNLIKELY(rr_policy_ == nullptr)) { - gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", - this); - return; +OrphanablePtr GrpcLb::CreateChildPolicyLocked( + const char* name, grpc_channel_args* args) { + Helper* helper = New(Ref()); + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + UniquePtr(helper); + OrphanablePtr lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + name, std::move(lb_policy_args)); + if (GPR_UNLIKELY(lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this, + name); + return nullptr; } + helper->set_child(lb_policy.get()); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, - rr_policy_.get()); + gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this, + name, lb_policy.get()); } // Add the gRPC LB's interested_parties pollset_set to that of the newly - // created RR policy. This will make the RR policy progress upon activity on - // gRPC LB, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), + // created child policy. This will make the child policy progress upon + // activity on gRPC LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), interested_parties()); + return lb_policy; } -void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { +void GrpcLb::CreateOrUpdateChildPolicyLocked() { if (shutting_down_) return; - grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); + grpc_channel_args* args = CreateChildPolicyArgsLocked(); GPR_ASSERT(args != nullptr); - if (rr_policy_ == nullptr) { - LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = combiner(); - lb_policy_args.args = args; - lb_policy_args.channel_control_helper = - UniquePtr(New(Ref())); - CreateRoundRobinPolicyLocked(std::move(lb_policy_args)); + // If the child policy name changes, we need to create a new child + // policy. When this happens, we leave child_policy_ as-is and store + // the new child policy in pending_child_policy_. Once the new child + // policy transitions into state READY, we swap it into child_policy_, + // replacing the original child policy. So pending_child_policy_ is + // non-null only between when we apply an update that changes the child + // policy name and when the new child reports state READY. + // + // Updates can arrive at any point during this transition. We always + // apply updates relative to the most recently created child policy, + // even if the most recent one is still in pending_child_policy_. This + // is true both when applying the updates to an existing child policy + // and when determining whether we need to create a new policy. + // + // As a result of this, there are several cases to consider here: + // + // 1. We have no existing child policy (i.e., we have started up but + // have not yet received a serverlist from the balancer or gone + // into fallback mode; in this case, both child_policy_ and + // pending_child_policy_ are null). In this case, we create a + // new child policy and store it in child_policy_. + // + // 2. We have an existing child policy and have no pending child policy + // from a previous update (i.e., either there has not been a + // previous update that changed the policy name, or we have already + // finished swapping in the new policy; in this case, child_policy_ + // is non-null but pending_child_policy_ is null). In this case: + // a. If child_policy_->name() equals child_policy_name, then we + // update the existing child policy. + // b. If child_policy_->name() does not equal child_policy_name, + // we create a new policy. The policy will be stored in + // pending_child_policy_ and will later be swapped into + // child_policy_ by the helper when the new child transitions + // into state READY. + // + // 3. We have an existing child policy and have a pending child policy + // from a previous update (i.e., a previous update set + // pending_child_policy_ as per case 2b above and that policy has + // not yet transitioned into state READY and been swapped into + // child_policy_; in this case, both child_policy_ and + // pending_child_policy_ are non-null). In this case: + // a. If pending_child_policy_->name() equals child_policy_name, + // then we update the existing pending child policy. + // b. If pending_child_policy->name() does not equal + // child_policy_name, then we create a new policy. The new + // policy is stored in pending_child_policy_ (replacing the one + // that was there before, which will be immediately shut down) + // and will later be swapped into child_policy_ by the helper + // when the new child transitions into state READY. + const char* child_policy_name = + child_policy_name_ == nullptr ? "round_robin" : child_policy_name_.get(); + const bool create_policy = + // case 1 + child_policy_ == nullptr || + // case 2b + (pending_child_policy_ == nullptr && + strcmp(child_policy_->name(), child_policy_name) != 0) || + // case 3b + (pending_child_policy_ != nullptr && + strcmp(pending_child_policy_->name(), child_policy_name) != 0); + LoadBalancingPolicy* policy_to_update = nullptr; + if (create_policy) { + // Cases 1, 2b, and 3b: create a new child policy. + // If child_policy_ is null, we set it (case 1), else we set + // pending_child_policy_ (cases 2b and 3b). + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this, + child_policy_ == nullptr ? "" : "pending ", child_policy_name); + } + auto new_policy = CreateChildPolicyLocked(child_policy_name, args); + // Swap the policy into place. + auto& lb_policy = + child_policy_ == nullptr ? child_policy_ : pending_child_policy_; + { + MutexLock lock(&child_policy_mu_); + lb_policy = std::move(new_policy); + } + policy_to_update = lb_policy.get(); + } else { + // Cases 2a and 3a: update an existing policy. + // If we have a pending child policy, send the update to the pending + // policy (case 3a), else send it to the current policy (case 2a). + policy_to_update = pending_child_policy_ != nullptr + ? pending_child_policy_.get() + : child_policy_.get(); } + GPR_ASSERT(policy_to_update != nullptr); + // Update the policy. if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, - rr_policy_.get()); + gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this, + policy_to_update == pending_child_policy_.get() ? "pending " : "", + policy_to_update); } - rr_policy_->UpdateLocked(*args, nullptr); + policy_to_update->UpdateLocked(*args, child_policy_config_); + // Clean up. grpc_channel_args_destroy(args); } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 2288b88b517..31353ba1304 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -723,6 +723,150 @@ TEST_F(SingleBalancerTest, SelectGrpclbWithMigrationServiceConfig) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) { + SetNextResolutionAllBalancers( + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"grpclb\":{\n" + " \"childPolicy\":[\n" + " { \"pick_first\":{} }\n" + " ]\n" + " } }\n" + " ]\n" + "}"); + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + const size_t kNumRpcs = num_backends_ * 2; + CheckRpcSendOk(kNumRpcs, 1000 /* timeout_ms */, true /* wait_for_ready */); + balancers_[0]->NotifyDoneWithServerlists(); + // Check that all requests went to the first backend. This verifies + // that we used pick_first instead of round_robin as the child policy. + EXPECT_EQ(backend_servers_[0].service_->request_count(), kNumRpcs); + for (size_t i = 1; i < backends_.size(); ++i) { + EXPECT_EQ(backend_servers_[i].service_->request_count(), 0UL); + } + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(SingleBalancerTest, SwapChildPolicy) { + SetNextResolutionAllBalancers( + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"grpclb\":{\n" + " \"childPolicy\":[\n" + " { \"pick_first\":{} }\n" + " ]\n" + " } }\n" + " ]\n" + "}"); + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + const size_t kNumRpcs = num_backends_ * 2; + CheckRpcSendOk(kNumRpcs, 1000 /* timeout_ms */, true /* wait_for_ready */); + // Check that all requests went to the first backend. This verifies + // that we used pick_first instead of round_robin as the child policy. + EXPECT_EQ(backend_servers_[0].service_->request_count(), kNumRpcs); + for (size_t i = 1; i < backends_.size(); ++i) { + EXPECT_EQ(backend_servers_[i].service_->request_count(), 0UL); + } + // Send new resolution that removes child policy from service config. + SetNextResolutionAllBalancers("{}"); + WaitForAllBackends(); + CheckRpcSendOk(kNumRpcs, 1000 /* timeout_ms */, true /* wait_for_ready */); + // Check that every backend saw the same number of requests. This verifies + // that we used round_robin. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(backend_servers_[i].service_->request_count(), 2UL); + } + // Done. + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(SingleBalancerTest, UpdatesGoToMostRecentChildPolicy) { + const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); + ResetStub(kFallbackTimeoutMs); + int unreachable_balancer_port = grpc_pick_unused_port_or_die(); + int unreachable_backend_port = grpc_pick_unused_port_or_die(); + // Phase 1: Start with RR pointing to first backend. + gpr_log(GPR_INFO, "PHASE 1: Initial setup with RR with first backend"); + SetNextResolution( + { + // Unreachable balancer. + {unreachable_balancer_port, true, ""}, + // Fallback address: first backend. + {backend_servers_[0].port_, false, ""}, + }, + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"grpclb\":{\n" + " \"childPolicy\":[\n" + " { \"round_robin\":{} }\n" + " ]\n" + " } }\n" + " ]\n" + "}"); + // RPCs should go to first backend. + WaitForBackend(0); + // Phase 2: Switch to PF pointing to unreachable backend. + gpr_log(GPR_INFO, "PHASE 2: Update to use PF with unreachable backend"); + SetNextResolution( + { + // Unreachable balancer. + {unreachable_balancer_port, true, ""}, + // Fallback address: unreachable backend. + {unreachable_backend_port, false, ""}, + }, + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"grpclb\":{\n" + " \"childPolicy\":[\n" + " { \"pick_first\":{} }\n" + " ]\n" + " } }\n" + " ]\n" + "}"); + // RPCs should continue to go to the first backend, because the new + // PF child policy will never go into state READY. + WaitForBackend(0); + // Phase 3: Switch back to RR pointing to second and third backends. + // This ensures that we create a new policy rather than updating the + // pending PF policy. + gpr_log(GPR_INFO, "PHASE 3: Update to use RR again with two backends"); + SetNextResolution( + { + // Unreachable balancer. + {unreachable_balancer_port, true, ""}, + // Fallback address: second and third backends. + {backend_servers_[1].port_, false, ""}, + {backend_servers_[2].port_, false, ""}, + }, + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"grpclb\":{\n" + " \"childPolicy\":[\n" + " { \"round_robin\":{} }\n" + " ]\n" + " } }\n" + " ]\n" + "}"); + // RPCs should go to the second and third backends. + WaitForBackend(1); + WaitForBackend(2); +} + TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { SetNextResolutionAllBalancers(); // Same backend listed twice.