Merge pull request #11604 from dgquintas/fix_rr_state_master

Fix RR policy connectivity state upon subchannels shutdown
pull/10358/merge
David G. Quintas 8 years ago committed by GitHub
commit 9864662df1
  1. 45
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
  2. 38
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  3. 18
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
  4. 60
      test/cpp/end2end/client_lb_end2end_test.cc

@ -95,6 +95,9 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p->subchannels);
gpr_free(p->new_subchannels);
gpr_free(p);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
}
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@ -268,11 +271,20 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
if (p->num_subchannels > 0) {
GPR_ASSERT(p->selected == NULL);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
(void *)p, (void *)p->subchannels[p->checking_subchannel]);
}
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
p->updating_subchannels = true;
} else if (p->selected != NULL) {
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG,
"Pick First %p unsubscribing from selected subchannel %p",
(void *)p, (void *)p->selected);
}
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
p->updating_selected = true;
@ -451,12 +463,25 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *selected_subchannel;
pending_pick *pp;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(
GPR_DEBUG,
"Pick First %p connectivity changed. Updating selected: %d; Updating "
"subchannels: %d; Checking %lu index (%lu total); State: %d; ",
(void *)p, p->updating_selected, p->updating_subchannels,
(unsigned long)p->checking_subchannel,
(unsigned long)p->num_subchannels, p->checking_connectivity);
}
bool restart = false;
if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
if (p->updating_selected && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for p->selected */
GPR_ASSERT(p->selected != NULL);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
"pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
(void *)p, (void *)p->selected);
}
p->updating_selected = false;
if (p->num_new_subchannels == 0) {
p->selected = NULL;
@ -464,12 +489,16 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
restart = true;
}
if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for the checking subchannel */
GPR_ASSERT(p->selected == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
"pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
(void *)p->subchannels[i]);
}
}
gpr_free(p->subchannels);
p->subchannels = NULL;
@ -481,14 +510,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (restart) {
p->selected = NULL;
p->selected_key = NULL;
GPR_ASSERT(p->new_subchannels != NULL);
GPR_ASSERT(p->num_new_subchannels > 0);
p->num_subchannels = p->num_new_subchannels;
p->subchannels = p->new_subchannels;
p->num_new_subchannels = 0;
p->new_subchannels = NULL;
if (p->started_picking) {
/* If we were picking, continue to do so over the new subchannels,
* starting from the 0th index. */
@ -542,7 +569,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
"picked_first");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
gpr_log(GPR_INFO,
"Pick First %p selected subchannel %p (connected %p)",
(void *)p, (void *)selected_subchannel, (void *)p->selected);
}
p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
@ -568,7 +597,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
/* only trigger transient failure when we've tried all alternatives */
/* only trigger transient failure when we've tried all alternatives
*/
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
@ -652,6 +682,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
}
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,

@ -126,6 +126,8 @@ struct rr_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
/** how many subchannels are in state SHUTDOWN */
size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
@ -425,6 +427,9 @@ static void update_state_counters_locked(subchannel_data *sd) {
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GPR_ASSERT(subchannel_list->num_shutdown > 0);
--subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@ -433,6 +438,8 @@ static void update_state_counters_locked(subchannel_data *sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@ -455,7 +462,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
* CHECK: p->subchannel_list->num_subchannels = 0.
* CHECK: p->subchannel_list->num_shutdown ==
* p->subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
@ -464,37 +472,39 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
* CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
grpc_connectivity_state new_state = sd->curr_connectivity_state;
rr_subchannel_list *subchannel_list = sd->subchannel_list;
round_robin_lb_policy *p = subchannel_list->policy;
if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
return GRPC_CHANNEL_CONNECTING;
} else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */
new_state = GRPC_CHANNEL_CONNECTING;
} else if (p->subchannel_list->num_shutdown ==
p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
return GRPC_CHANNEL_SHUTDOWN;
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
return GRPC_CHANNEL_TRANSIENT_FAILURE;
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} else if (subchannel_list->num_idle ==
p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
return GRPC_CHANNEL_IDLE;
new_state = GRPC_CHANNEL_IDLE;
}
/* no change */
return sd->curr_connectivity_state;
GRPC_ERROR_UNREF(error);
return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -571,13 +581,15 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(!sd->subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const unsigned long num_subchannels =
p->subchannel_list != NULL
? (unsigned long)p->subchannel_list->num_subchannels
: 0;
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)",
(void *)p, (void *)p->subchannel_list,
(unsigned long)p->subchannel_list->num_subchannels,
(void *)sd->subchannel_list,
(unsigned long)sd->subchannel_list->num_subchannels);
(void *)p, (void *)p->subchannel_list, num_subchannels,
(void *)sd->subchannel_list, num_subchannels);
}
if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list

@ -56,6 +56,10 @@ typedef struct {
// grpc_resolver_next_locked()'s closure.
grpc_channel_args* next_results;
// Results to use for the pretended re-resolution in
// fake_resolver_channel_saw_error_locked().
grpc_channel_args* results_upon_error;
// pending next completion, or NULL
grpc_closure* next_completion;
// target result address for next completion
@ -65,6 +69,7 @@ typedef struct {
static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
grpc_channel_args_destroy(exec_ctx, r->next_results);
grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
}
@ -87,15 +92,22 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
*r->target_result =
grpc_channel_args_union(r->next_results, r->channel_args);
grpc_channel_args_destroy(exec_ctx, r->next_results);
r->next_results = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
r->next_results = NULL;
}
}
static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
gpr_log(
GPR_INFO,
"FOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO");
if (r->next_results == NULL && r->results_upon_error != NULL) {
// Pretend we re-resolved.
r->next_results = grpc_channel_args_copy(r->results_upon_error);
}
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}
@ -151,6 +163,10 @@ static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
grpc_channel_args_destroy(exec_ctx, r->next_results);
}
r->next_results = generator->next_response;
if (r->results_upon_error != NULL) {
grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
}
r->results_upon_error = grpc_channel_args_copy(generator->next_response);
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}

@ -97,9 +97,12 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
}
void StartServers(int num_servers) {
for (int i = 0; i < num_servers; ++i) {
servers_.emplace_back(new ServerData(server_host_));
void StartServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
for (size_t i = 0; i < num_servers; ++i) {
int port = 0;
if (ports.size() == num_servers) port = ports[i];
servers_.emplace_back(new ServerData(server_host_, port));
}
}
@ -146,14 +149,18 @@ class ClientLbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
void SendRpc() {
void SendRpc(bool expect_ok = true) {
EchoRequest request;
EchoResponse response;
request.set_message("Live long and prosper.");
ClientContext context;
Status status = stub_->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.message(), request.message());
if (expect_ok) {
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.message(), request.message());
} else {
EXPECT_FALSE(status.ok());
}
}
struct ServerData {
@ -162,8 +169,8 @@ class ClientLbEnd2endTest : public ::testing::Test {
MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
explicit ServerData(const grpc::string& server_host) {
port_ = grpc_pick_unused_port_or_die();
explicit ServerData(const grpc::string& server_host, int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::condition_variable cond;
@ -187,9 +194,9 @@ class ClientLbEnd2endTest : public ::testing::Test {
cond->notify_one();
}
void Shutdown() {
void Shutdown(bool join = true) {
server_->Shutdown();
thread_->join();
if (join) thread_->join();
}
};
@ -456,6 +463,39 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) {
// Start servers and send one RPC per server.
const int kNumServers = 1;
std::vector<int> ports;
ports.push_back(grpc_pick_unused_port_or_die());
StartServers(kNumServers, ports);
ResetStub("round_robin");
SetNextResolution(ports);
// Send one RPC per backend and make sure they are used in order.
// Note: This relies on the fact that the subchannels are reported in
// state READY in the order in which the addresses are specified,
// which is only true because the backends are all local.
for (size_t i = 0; i < servers_.size(); ++i) {
SendRpc();
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
// Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown(false);
}
// Client request should fail.
SendRpc(false);
// Bring servers back up on the same port (we aren't recreating the channel).
StartServers(kNumServers, ports);
// Client request should succeed.
SendRpc();
}
} // namespace
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save