diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index f248a873ba9..aadba3fe99d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -710,7 +710,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, return; } glb_policy->rr_policy = new_rr_policy; - grpc_error *rr_state_error = NULL; const grpc_connectivity_state rr_state = grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, @@ -736,7 +735,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, rr_connectivity->state = rr_state; /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -801,32 +800,31 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - - const bool shutting_down = glb_policy->shutting_down; - bool unref_needed = false; - GRPC_ERROR_REF(error); - - if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) { - /* RR policy shutting down. Don't renew subscription and free the arg of - * this callback. In addition we need to stash away the current policy to - * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last - * one, the policy would be destroyed, alongside the lock, which would - * result in a use-after-free */ - unref_needed = true; + if (glb_policy->shutting_down) { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "glb_rr_connectivity_cb"); gpr_free(rr_connectivity); - } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ - update_lb_connectivity_status_locked( - exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked( - exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, - &rr_connectivity->on_change); + return; } - if (unref_needed) { + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { + /* An RR policy that has transitioned into the SHUTDOWN connectivity state + * should not be considered for picks or updates: the SHUTDOWN state is a + * sink, policies can't transition back from it. .*/ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, + "rr_connectivity_shutdown"); + glb_policy->rr_policy = NULL; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "rr_connectivity_cb"); + "glb_rr_connectivity_cb"); + gpr_free(rr_connectivity); + return; } - GRPC_ERROR_UNREF(error); + /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ + update_lb_connectivity_status_locked( + exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); } static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, @@ -990,7 +988,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, gpr_free(glb_policy); return NULL; } - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); @@ -1047,7 +1044,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_policy->pending_picks = NULL; pending_ping *pping = glb_policy->pending_pings; glb_policy->pending_pings = NULL; - if (glb_policy->rr_policy) { + if (glb_policy->rr_policy != NULL) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } // We destroy the LB channel here because diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index bc40165cfb8..a7f7e9542c8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -74,6 +74,9 @@ typedef struct round_robin_lb_policy { bool started_picking; /** are we shutting down? */ bool shutdown; + /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be + * service after this point, the policy will never transition out. */ + bool in_connectivity_shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; @@ -420,6 +423,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(!p->in_connectivity_shutdown); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); } @@ -532,6 +537,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); + p->in_connectivity_shutdown = true; new_state = GRPC_CHANNEL_SHUTDOWN; } else if (subchannel_list->num_transient_failures == p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index e403ee66fc8..2f3acb6ee10 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -132,6 +132,19 @@ class BackendServiceImpl : public BackendService { IncreaseResponseCount(); return status; } + + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + std::unique_lock lock(mu_); + const bool prev = !shutdown_; + shutdown_ = true; + gpr_log(GPR_INFO, "Backend: shut down"); + return prev; + } + + private: + std::mutex mu_; + bool shutdown_ = false; }; grpc::string Ip4ToPackedString(const char* ip_str) { @@ -339,7 +352,7 @@ class GrpclbEnd2endTest : public ::testing::Test { void TearDown() override { for (size_t i = 0; i < backends_.size(); ++i) { - backend_servers_[i].Shutdown(); + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } for (size_t i = 0; i < balancers_.size(); ++i) { if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); @@ -637,6 +650,61 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, BackendsRestart) { + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // Send 100 RPCs per server. + auto statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); + } + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + } + for (size_t i = 0; i < num_backends_; ++i) { + backends_.emplace_back(new BackendServiceImpl()); + backend_servers_.emplace_back(ServerThread( + "backend", server_host_, backends_.back().get())); + } + // The following RPC will fail due to the backend ports having changed. It + // will nonetheless exercise the grpclb-roundrobin handling of the RR policy + // having gone into shutdown. + // TODO(dgq): implement the "backend restart" component as well. We need extra + // machinery to either update the LB responses "on the fly" or instruct + // backends which ports to restart on. + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + } + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + class UpdatesTest : public GrpclbEnd2endTest { public: UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}