PR comments

pull/14170/head
David Garcia Quintas 7 years ago
parent 20331706e2
commit a68a11e4a0
  1. 25
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  2. 2
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  3. 38
      test/cpp/end2end/client_lb_end2end_test.cc

@ -329,8 +329,7 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE (and
* requests re-resolution).
* TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
@ -397,9 +396,6 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and new overall state.
update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
switch (sd->curr_connectivity_state) {
@ -482,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
}
// Update state counters and new overall state.
update_state_counters_locked(sd);
// Only update connectivity based on the selected subchannel list.
if (sd->subchannel_list == p->subchannel_list) {
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
@ -564,9 +566,20 @@ static void rr_update_locked(grpc_lb_policy* policy,
// purposes if the subchannel is already in transient failure. Otherwise
// we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
// discrepancy, attempt to re-resolve and end up here again.
// TODO(roth): As part of C++-ifying the subchannel_list API, design a
// better API for notifying the LB policy of subchannel states, which can
// be used both for the subchannel's initial state and for subsequent
// state changes. This will allow us to handle this more generally instead
// of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
// pending picks across all READY subchannels rather than sending them all
// to the first one).
if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
subchannel_state;
subchannel_list->subchannels[i].curr_connectivity_state =
subchannel_list->subchannels[i].prev_connectivity_state =
subchannel_state;
--subchannel_list->num_idle;
++subchannel_list->num_transient_failures;
}
}
if (p->latest_pending_subchannel_list != nullptr) {

@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
/** how many subchannels are in state SHUTDOWN */
size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;

@ -138,7 +138,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
}
void SetNextResolution(const std::vector<int>& ports, bool notify = true) {
void SetNextResolution(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* addresses =
grpc_lb_addresses_create(ports.size(), nullptr);
@ -157,13 +157,33 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args* fake_result =
grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
if (notify) {
grpc_fake_resolver_response_generator_set_response(response_generator_,
fake_result);
} else {
grpc_fake_resolver_response_generator_set_response_upon_error(
response_generator_, fake_result);
grpc_fake_resolver_response_generator_set_response(response_generator_,
fake_result);
grpc_channel_args_destroy(fake_result);
grpc_lb_addresses_destroy(addresses);
}
void SetNextResolutionUponError(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* addresses =
grpc_lb_addresses_create(ports.size(), nullptr);
for (size_t i = 0; i < ports.size(); ++i) {
char* lb_uri_str;
gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
GPR_ASSERT(lb_uri != nullptr);
grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
false /* is balancer */,
"" /* balancer name */, nullptr);
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
const grpc_arg fake_addresses =
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args* fake_result =
grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
grpc_fake_resolver_response_generator_set_response_upon_error(
response_generator_, fake_result);
grpc_channel_args_destroy(fake_result);
grpc_lb_addresses_destroy(addresses);
}
@ -578,9 +598,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.emplace_back(servers_[0]->port_);
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
gpr_log(GPR_INFO, "ABOUT TO SEND ALLLLL");
SetNextResolution(ports);
gpr_log(GPR_INFO, "SENT ALLLLLLLLLLLLLLLLLL");
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
@ -708,7 +726,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
StartServers(kNumServers, second_ports);
// Don't notify of the update. Wait for the LB policy's re-resolution to
// "pull" the new ports.
SetNextResolution(second_ports, false);
SetNextResolutionUponError(second_ports);
gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
// Client request should eventually (but still fairly soon) succeed.

Loading…
Cancel
Save