Fix bug in handling of RR connectivity transition to SHUTDOWN

pull/11948/head
David Garcia Quintas 7 years ago
parent 1d27c66d8e
commit fc950fbeb5
  1. 49
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  2. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  3. 70
      test/cpp/end2end/grpclb_end2end_test.cc

@ -710,7 +710,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
return; return;
} }
glb_policy->rr_policy = new_rr_policy; glb_policy->rr_policy = new_rr_policy;
grpc_error *rr_state_error = NULL; grpc_error *rr_state_error = NULL;
const grpc_connectivity_state rr_state = const grpc_connectivity_state rr_state =
grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
@ -736,7 +735,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
rr_connectivity->state = rr_state; rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */ /* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state, &rr_connectivity->state,
&rr_connectivity->on_change); &rr_connectivity->on_change);
@ -801,32 +800,31 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) { void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg; rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy; glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
if (glb_policy->shutting_down) {
const bool shutting_down = glb_policy->shutting_down; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
bool unref_needed = false; "glb_rr_connectivity_cb");
GRPC_ERROR_REF(error);
if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
/* RR policy shutting down. Don't renew subscription and free the arg of
* this callback. In addition we need to stash away the current policy to
* be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
* one, the policy would be destroyed, alongside the lock, which would
* result in a use-after-free */
unref_needed = true;
gpr_free(rr_connectivity); gpr_free(rr_connectivity);
} else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ return;
update_lb_connectivity_status_locked(
exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change_locked(
exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
&rr_connectivity->on_change);
} }
if (unref_needed) { if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
"rr_connectivity_shutdown");
glb_policy->rr_policy = NULL;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb"); "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return;
} }
GRPC_ERROR_UNREF(error); /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
update_lb_connectivity_status_locked(
exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
} }
static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
@ -990,7 +988,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
gpr_free(glb_policy); gpr_free(glb_policy);
return NULL; return NULL;
} }
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy, glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner)); grpc_combiner_scheduler(args->combiner));
@ -1047,7 +1044,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_policy->pending_picks = NULL; glb_policy->pending_picks = NULL;
pending_ping *pping = glb_policy->pending_pings; pending_ping *pping = glb_policy->pending_pings;
glb_policy->pending_pings = NULL; glb_policy->pending_pings = NULL;
if (glb_policy->rr_policy) { if (glb_policy->rr_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
} }
// We destroy the LB channel here because // We destroy the LB channel here because

@ -74,6 +74,9 @@ typedef struct round_robin_lb_policy {
bool started_picking; bool started_picking;
/** are we shutting down? */ /** are we shutting down? */
bool shutdown; bool shutdown;
/** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
* service after this point, the policy will never transition out. */
bool in_connectivity_shutdown;
/** List of picks that are waiting on connectivity */ /** List of picks that are waiting on connectivity */
pending_pick *pending_picks; pending_pick *pending_picks;
@ -420,6 +423,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data, grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) { grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(!p->in_connectivity_shutdown);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
} }
@ -532,6 +537,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown"); "rr_shutdown");
p->in_connectivity_shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN; new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures == } else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */

@ -132,6 +132,19 @@ class BackendServiceImpl : public BackendService {
IncreaseResponseCount(); IncreaseResponseCount();
return status; return status;
} }
// Returns true on its first invocation, false otherwise.
bool Shutdown() {
std::unique_lock<std::mutex> lock(mu_);
const bool prev = !shutdown_;
shutdown_ = true;
gpr_log(GPR_INFO, "Backend: shut down");
return prev;
}
private:
std::mutex mu_;
bool shutdown_ = false;
}; };
grpc::string Ip4ToPackedString(const char* ip_str) { grpc::string Ip4ToPackedString(const char* ip_str) {
@ -339,7 +352,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
void TearDown() override { void TearDown() override {
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
backend_servers_[i].Shutdown(); if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
} }
for (size_t i = 0; i < balancers_.size(); ++i) { for (size_t i = 0; i < balancers_.size(); ++i) {
if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
@ -637,6 +650,61 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
} }
TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// Send 100 RPCs per server.
auto statuses_and_responses =
SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
const EchoResponse& response = status_and_response.second;
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
}
statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
EXPECT_FALSE(status.ok());
}
for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServiceImpl());
backend_servers_.emplace_back(ServerThread<BackendService>(
"backend", server_host_, backends_.back().get()));
}
// The following RPC will fail due to the backend ports having changed. It
// will nonetheless exercise the grpclb-roundrobin handling of the RR policy
// having gone into shutdown.
// TODO(dgq): implement the "backend restart" component as well. We need extra
// machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on.
statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
EXPECT_FALSE(status.ok());
}
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
class UpdatesTest : public GrpclbEnd2endTest { class UpdatesTest : public GrpclbEnd2endTest {
public: public:
UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}

Loading…
Cancel
Save