diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index a7f7e9542c8..a63bdd933d5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -811,19 +811,30 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, sc_args.args = new_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); + grpc_channel_args_destroy(exec_ctx, new_args); + grpc_error *error; + // Get the connectivity state of the subchannel. Already existing ones may + // be in a state other than INIT. + const grpc_connectivity_state subchannel_connectivity_state = + grpc_subchannel_check_connectivity(subchannel, &error); + if (error != GRPC_ERROR_NONE) { + // The subchannel is in error (e.g. shutting down). Ignore it. + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error"); + GRPC_ERROR_UNREF(error); + continue; + } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); gpr_log( GPR_DEBUG, "[RR %p] index %lu: Created subchannel %p for address uri %s into " - "subchannel_list %p", + "subchannel_list %p. Connectivity state %s", (void *)p, (unsigned long)subchannel_index, (void *)subchannel, - address_uri, (void *)subchannel_list); + address_uri, (void *)subchannel_list, + grpc_connectivity_state_name(subchannel_connectivity_state)); gpr_free(address_uri); } - grpc_channel_args_destroy(exec_ctx, new_args); - subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; sd->subchannel_list = subchannel_list; sd->subchannel = subchannel; @@ -835,7 +846,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, * won't be referring to this value again and it'll be overwritten after * the first call to rr_connectivity_changed_locked */ sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->curr_connectivity_state = subchannel_connectivity_state; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e1160ecdc6d..cb4f992bfa7 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -450,6 +450,43 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub("round_robin"); + std::vector ports; + + // Start with a single server. + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + WaitForServer(0); + // Send RPCs. They should all go servers_[0] + for (size_t i = 0; i < 10; ++i) SendRpc(); + EXPECT_EQ(10, servers_[0]->service_.request_count()); + EXPECT_EQ(0, servers_[1]->service_.request_count()); + EXPECT_EQ(0, servers_[2]->service_.request_count()); + servers_[0]->service_.ResetCounters(); + + // All servers, but one is shutdown. + servers_[1]->Shutdown(false); + ports.clear(); + ports.emplace_back(servers_[0]->port_); + ports.emplace_back(servers_[1]->port_); + ports.emplace_back(servers_[2]->port_); + SetNextResolution(ports); + WaitForServer(0); + WaitForServer(2); + + // Send three RPCs, one per server. + for (size_t i = 0; i < kNumServers; ++i) SendRpc(); + // The server in shutdown shouldn't receive any. + EXPECT_EQ(0, servers_[1]->service_.request_count()); + + // Check LB policy name for the channel. + EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); +} + TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3;