Fix re-resolution in pick_first

pull/16076/head
Juanli Shen 7 years ago
parent b929339276
commit b0e41f6c7d
  1. 16
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 14
      src/core/ext/filters/client_channel/subchannel.cc
  3. 40
      test/cpp/end2end/client_lb_end2end_test.cc

@ -451,6 +451,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
if (grpc_lb_pick_first_trace.enabled()) {
@ -480,14 +481,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"update"),
"selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
// If the selected subchannel goes bad, request a re-resolution. We also
// set the channel state to IDLE and reset started_picking_. The reason
// is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
// reception we don't want to connect to the re-resolved backends until
// we leave the IDLE state.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
@ -568,9 +567,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
GRPC_ERROR_REF(error), "exhausted_subchannels");
}
sd->StartConnectivityWatchLocked();
break;

@ -402,8 +402,6 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
@ -459,27 +457,24 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
/* Don't try to connect if we're already disconnected */
return;
}
if (c->connecting) {
/* Already connecting: don't restart */
return;
}
if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
/* Nobody is interested in connecting: so don't just yet */
return;
}
c->connecting = true;
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
if (!c->backoff_begun) {
c->backoff_begun = true;
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "connecting");
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
@ -494,6 +489,11 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
// During backoff, we prefer the connectivity state of CONNECTING instead of
// TRANSIENT_FAILURE in order to prevent triggering re-resolution
// continuously in pick_first.
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "backoff");
}
}

@ -279,9 +279,14 @@ class ClientLbEnd2endTest : public ::testing::Test {
void WaitForServer(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t server_idx, const grpc_core::DebugLocation& location) {
size_t server_idx, const grpc_core::DebugLocation& location,
bool ignore_failure = false) {
do {
CheckRpcSendOk(stub, location);
if (ignore_failure) {
SendRpc(stub);
} else {
CheckRpcSendOk(stub, location);
}
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
}
@ -507,6 +512,37 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
// Prepare the ports for up servers and down servers.
const int kNumServers = 3;
const int kNumAliveServers = 1;
StartServers(kNumAliveServers);
std::vector<int> alive_ports, dead_ports;
for (size_t i = 0; i < kNumServers; ++i) {
if (i < kNumAliveServers) {
alive_ports.emplace_back(servers_[i]->port_);
} else {
dead_ports.emplace_back(grpc_pick_unused_port_or_die());
}
}
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
// The initial resolution only contains dead ports. There won't be any
// selected subchannel. Re-resolution will return the same result.
SetNextResolution(dead_ports);
gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
// Set a re-resolution result that contains reachable ports, so that the
// pick_first LB policy can recover soon.
SetNextResolutionUponError(alive_ports);
gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

Loading…
Cancel
Save