RR: Initialize subchannel connectivity state

pull/12242/head
David Garcia Quintas 8 years ago
parent 183a9cf523
commit a2c1df4dcd
  1. 21
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  2. 37
      test/cpp/end2end/client_lb_end2end_test.cc

@ -811,19 +811,30 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);
grpc_error *error;
// Get the connectivity state of the subchannel. Already existing ones may
// be in a state other than INIT.
const grpc_connectivity_state subchannel_connectivity_state =
grpc_subchannel_check_connectivity(subchannel, &error);
if (error != GRPC_ERROR_NONE) {
// The subchannel is in error (e.g. shutting down). Ignore it.
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error");
GRPC_ERROR_UNREF(error);
continue;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(
GPR_DEBUG,
"[RR %p] index %lu: Created subchannel %p for address uri %s into "
"subchannel_list %p",
"subchannel_list %p. Connectivity state %s",
(void *)p, (unsigned long)subchannel_index, (void *)subchannel,
address_uri, (void *)subchannel_list);
address_uri, (void *)subchannel_list,
grpc_connectivity_state_name(subchannel_connectivity_state));
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
sd->subchannel_list = subchannel_list;
sd->subchannel = subchannel;
@ -835,7 +846,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
* won't be referring to this value again and it'll be overwritten after
* the first call to rr_connectivity_changed_locked */
sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->curr_connectivity_state = subchannel_connectivity_state;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =

@ -450,6 +450,43 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
std::vector<int> ports;
// Start with a single server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
WaitForServer(0);
// Send RPCs. They should all go servers_[0]
for (size_t i = 0; i < 10; ++i) SendRpc();
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
servers_[0]->service_.ResetCounters();
// All servers, but one is shutdown.
servers_[1]->Shutdown(false);
ports.clear();
ports.emplace_back(servers_[0]->port_);
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
WaitForServer(0);
WaitForServer(2);
// Send three RPCs, one per server.
for (size_t i = 0; i < kNumServers; ++i) SendRpc();
// The server in shutdown shouldn't receive any.
EXPECT_EQ(0, servers_[1]->service_.request_count());
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

Loading…
Cancel
Save