[pick_first] fix sticky-TF and handling of subchannels in TRANSIENT_FAILURE (#33753)

Fix sticky-TF behavior such that once we enter TRANSIENT_FAILURE, we do
not leave that state if we get a new address list.

Also, fix handling of subchannels in state TRANSIENT_FAILURE.
Previously, if a subchannel was already in state TRANSIENT_FAILURE when
we wanted to start a connection attempt on it (e.g., because the
subchannel already existed from a different channel, or because it
already existed in the previous subchannel list), we would wait for it
to report IDLE before attempting to connect. This PR changes pick_first
to instead immediately skip the subchannel and move on to the next one.
Now, the only time we wait for a subchannel in TRANSIENT_FAILURE is when
we wrap back around to the first subchannel in the list.
pull/34074/head
Mark D. Roth 1 year ago committed by GitHub
parent 2a036ce1de
commit 64a318acd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 204
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 7
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  3. 215
      test/core/client_channel/lb_policy/pick_first_test.cc
  4. 28
      test/core/end2end/tests/connectivity.cc
  5. 2
      test/core/end2end/tests/simple_delayed_request.cc
  6. 62
      test/cpp/end2end/client_lb_end2end_test.cc

@ -134,8 +134,12 @@ class PickFirst : public LoadBalancingPolicy {
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
private:
// Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked();
// Reacts to the current connectivity state while trying to connect.
void ReactToConnectivityStateLocked();
};
class PickFirstSubchannelList
@ -198,6 +202,9 @@ class PickFirst : public LoadBalancingPolicy {
void AttemptToConnectUsingLatestUpdateArgsLocked();
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker);
// Lateset update args.
UpdateArgs latest_update_args_;
// All our subchannels.
@ -206,8 +213,8 @@ class PickFirst : public LoadBalancingPolicy {
RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Are we in IDLE state?
bool idle_ = false;
// Current connectivity state.
grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
// Are we shut down?
bool shutdown_ = false;
// Random bit generator used for shuffling addresses if configured
@ -239,11 +246,10 @@ void PickFirst::ShutdownLocked() {
void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (idle_) {
if (state_ == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
}
idle_ = false;
AttemptToConnectUsingLatestUpdateArgsLocked();
}
}
@ -275,15 +281,14 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
channel_control_helper()->RequestReresolution();
absl::Status status =
latest_update_args_.addresses.ok()
? absl::UnavailableError(absl::StrCat(
"empty address list: ", latest_update_args_.resolution_note))
: latest_update_args_.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
// If the new update is empty or we don't yet have a selected subchannel in
// the current list, replace the current subchannel list immediately.
@ -346,12 +351,19 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
latest_update_args_ = std::move(args);
// If we are not in idle, start connection attempt immediately.
// Otherwise, we defer the attempt into ExitIdleLocked().
if (!idle_) {
if (state_ != GRPC_CHANNEL_IDLE) {
AttemptToConnectUsingLatestUpdateArgsLocked();
}
return status;
}
void PickFirst::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) {
state_ = state;
channel_control_helper()->UpdateState(state, status, std::move(picker));
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
@ -371,6 +383,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
}
// Any state change is considered to be a failure of the existing
// connection.
// TODO(roth): We could check the connectivity states of all the
// subchannels here, just in case one of them happens to be READY,
// and we could switch to that rather than going IDLE.
// Request a re-resolution.
// TODO(qianchengz): We may want to request re-resolution in
// ExitIdleLocked().
p->channel_control_helper()->RequestReresolution();
// If there is a pending update, switch to the pending update.
if (p->latest_pending_subchannel_list_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
@ -391,28 +410,18 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
->subchannel(p->subchannel_list_->num_subchannels())
->connectivity_status()
.ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
} else {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
} else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
}
return;
}
// If the selected subchannel goes bad, request a re-resolution.
// TODO(qianchengz): We may want to request re-resolution in
// ExitIdleLocked().
p->channel_control_helper()->RequestReresolution();
// TODO(roth): We chould check the connectivity states of all the
// subchannels here, just in case one of them happens to be READY,
// and we could switch to that rather than going IDLE.
// Enter idle.
p->idle_ = true;
p->selected_ = nullptr;
p->subchannel_list_.reset();
p->channel_control_helper()->UpdateState(
p->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return;
@ -432,92 +441,108 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
ProcessUnselectedReadyLocked();
return;
}
// If this is the initial connectivity state notification for this
// subchannel, check to see if it's the last one we were waiting for,
// in which case we start trying to connect to the first subchannel.
// Otherwise, do nothing, since we'll continue to wait until all of
// the subchannels report their state.
// If we haven't yet seen the initial connectivity state notification
// for all subchannels, do nothing.
if (!subchannel_list()->AllSubchannelsSeenInitialState()) return;
// If we're still here and this is the initial connectivity state
// notification for this subchannel, that means it was the last one to
// see its initial notification. Start trying to connect, starting
// with the first subchannel.
if (!old_state.has_value()) {
if (subchannel_list()->AllSubchannelsSeenInitialState()) {
subchannel_list()->subchannel(0)->subchannel()->RequestConnection();
}
return;
subchannel_list()->subchannel(0)->ReactToConnectivityStateLocked();
}
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list()->attempting_index()) return;
// React to the connectivity state.
ReactToConnectivityStateLocked();
}
void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// Otherwise, process connectivity state.
switch (new_state) {
switch (connectivity_state().value()) {
case GRPC_CHANNEL_READY:
// Already handled this case above, so this should not happen.
GPR_UNREACHABLE_CODE(break);
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
subchannel_list()->set_attempting_index(next_index);
PickFirstSubchannelData* sd = subchannel_list()->subchannel(next_index);
// If we're tried all subchannels, set state to TRANSIENT_FAILURE.
if (sd->Index() == 0) {
// Find the next subchannel not in state TRANSIENT_FAILURE.
// We skip subchannels in state TRANSIENT_FAILURE to avoid a
// large recursion that could overflow the stack.
PickFirstSubchannelData* found_subchannel = nullptr;
for (size_t next_index = Index() + 1;
next_index < subchannel_list()->num_subchannels(); ++next_index) {
PickFirstSubchannelData* sc = subchannel_list()->subchannel(next_index);
GPR_ASSERT(sc->connectivity_state().has_value());
if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list()->set_attempting_index(next_index);
found_subchannel = sc;
break;
}
}
// If we found another subchannel in the list not in state
// TRANSIENT_FAILURE, trigger the right behavior for that subchannel.
if (found_subchannel != nullptr) {
found_subchannel->ReactToConnectivityStateLocked();
break;
}
// We didn't find another subchannel not in state TRANSIENT_FAILURE,
// so report TRANSIENT_FAILURE and wait for the first subchannel
// in the list to report IDLE before continuing.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p failed to connect to "
"all subchannels",
p, subchannel_list());
}
subchannel_list()->set_attempting_index(0);
subchannel_list()->set_in_transient_failure(true);
// In case 2, swap to the new subchannel list. This means reporting
// TRANSIENT_FAILURE and dropping the existing (working) connection,
// but we can't ignore what the control plane has told us.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p failed to connect to "
"all subchannels",
p, subchannel_list());
}
subchannel_list()->set_in_transient_failure(true);
// In case 2, swap to the new subchannel list. This means reporting
// TRANSIENT_FAILURE and dropping the existing (working) connection,
// but we can't ignore what the control plane has told us.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr; // owned by p->subchannel_list_
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// If this is the current subchannel list (either because we were
// in case 1 or because we were in case 2 and just promoted it to
// be the current list), re-resolve and report new state.
if (subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(
absl::StrCat("failed to connect to all addresses; last error: ",
connectivity_status().ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr; // owned by p->subchannel_list_
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// If this is the current subchannel list (either because we were
// in case 1 or because we were in case 2 and just promoted it to
// be the current list), re-resolve and report new state.
if (subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(
absl::StrCat("failed to connect to all addresses; last error: ",
connectivity_status().ToString()));
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
// If the next subchannel is in IDLE, trigger a connection attempt.
// If it's in READY, we can't get here, because we would already
// have selected the subchannel above.
// If it's already in CONNECTING, we don't need to do this.
// If it's in TRANSIENT_FAILURE, then we will trigger the
// connection attempt later when it reports IDLE.
auto sd_state = sd->connectivity_state();
if (sd_state.has_value() && *sd_state == GRPC_CHANNEL_IDLE) {
sd->subchannel()->RequestConnection();
// If the first subchannel is already IDLE, trigger the next connection
// attempt immediately. Otherwise, we'll wait for it to report
// its own connectivity state change.
auto* subchannel0 = subchannel_list()->subchannel(0);
if (subchannel0->connectivity_state() == GRPC_CHANNEL_IDLE) {
subchannel0->subchannel()->RequestConnection();
}
break;
}
case GRPC_CHANNEL_IDLE: {
case GRPC_CHANNEL_IDLE:
subchannel()->RequestConnection();
break;
}
case GRPC_CHANNEL_CONNECTING: {
case GRPC_CHANNEL_CONNECTING:
// Only update connectivity state in case 1, and only if we're not
// already in TRANSIENT_FAILURE.
if (subchannel_list() == p->subchannel_list_.get() &&
!subchannel_list()->in_transient_failure()) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
}
break;
}
case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break);
}
@ -552,9 +577,8 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
p->selected_ = this;
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
MakeRefCounted<Picker>(subchannel()->Ref()));
p->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
MakeRefCounted<Picker>(subchannel()->Ref()));
for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
if (i != Index()) {
subchannel_list()->subchannel(i)->ShutdownLocked();

@ -241,6 +241,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// will be reported to all associated SubchannelInterface objects.
void SetConnectivityState(grpc_connectivity_state state,
const absl::Status& status = absl::OkStatus(),
bool validate_state_transition = true,
SourceLocation location = SourceLocation()) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
EXPECT_FALSE(status.ok())
@ -251,8 +252,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< " must have OK status: " << status;
}
MutexLock lock(&mu_);
AssertValidConnectivityStateTransition(state_tracker_.state(), state,
location);
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(), state,
location);
}
state_tracker_.SetState(state, status, "set from test");
}

@ -211,6 +211,221 @@ TEST_F(PickFirstTest, FirstAddressFails) {
}
}
TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) {
// Send an update containing three addresses.
// The first two addresses are already in state TRANSIENT_FAILURE when the
// LB policy gets the update.
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
auto* subchannel = CreateSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
auto* subchannel2 = CreateSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for all addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel3 = FindSubchannel(
kAddresses[2], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel3, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (TRANSIENT_FAILURE), it will move on to the second
// subchannel. The second subchannel is in state IDLE, so the LB
// policy will request a connection attempt on it.
EXPECT_TRUE(subchannel3->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// The connection attempt succeeds.
subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
// matter how many) and then report READY.
auto picker = WaitForConnected();
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[2]);
}
}
TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
// Send an update containing two addresses, both in TRANSIENT_FAILURE
// when the LB policy gets the update.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
auto* subchannel = CreateSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
auto* subchannel2 = CreateSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
// The LB policy should report TRANSIENT_FAILURE.
WaitForConnectionFailed([&](const absl::Status& status) {
EXPECT_EQ(status, absl::UnavailableError(
"failed to connect to all addresses; "
"last error: UNAVAILABLE: failed to connect"));
});
// No connections should have been requested.
EXPECT_FALSE(subchannel->ConnectionRequested());
EXPECT_FALSE(subchannel2->ConnectionRequested());
// Now have the first subchannel report IDLE.
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
// The policy will ask it to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The connection attempt succeeds.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
// matter how many) and then report READY.
auto picker = WaitForConnected();
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
}
}
TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
// Send an update containing two addresses, both in TRANSIENT_FAILURE
// when the LB policy gets the update.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
auto* subchannel = CreateSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
auto* subchannel2 = CreateSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
// The LB policy should report TRANSIENT_FAILURE.
WaitForConnectionFailed([&](const absl::Status& status) {
EXPECT_EQ(status, absl::UnavailableError(
"failed to connect to all addresses; "
"last error: UNAVAILABLE: failed to connect"));
});
// No connections should have been requested.
EXPECT_FALSE(subchannel->ConnectionRequested());
EXPECT_FALSE(subchannel2->ConnectionRequested());
// Now send an address list update. This contains the first address
// from the previous update plus a new address, whose subchannel will
// be in state IDLE.
constexpr std::array<absl::string_view, 2> kAddresses2 = {
kAddresses[0], "ipv4:127.0.0.1:445"};
status = ApplyUpdate(BuildUpdate(kAddresses2, MakePickFirstConfig(false)),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should have created a subchannel for the new address.
auto* subchannel3 =
FindSubchannel(kAddresses2[1],
ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel3, nullptr);
// The policy will ask it to connect.
EXPECT_TRUE(subchannel3->ConnectionRequested());
// This causes it to start to connect, so it reports CONNECTING.
subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The connection attempt succeeds.
subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report READY.
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses2[1]);
}
}
TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
// Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// The second subchannel should not be connecting.
EXPECT_FALSE(subchannel2->ConnectionRequested());
// The first subchannel's connection attempt fails.
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// The LB policy will start a connection attempt on the second subchannel.
EXPECT_TRUE(subchannel2->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// Before the second subchannel's attempt completes, the first
// subchannel reports IDLE.
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
// Now the connection attempt on the second subchannel fails.
subchannel2->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
// The LB policy will report TRANSIENT_FAILURE.
WaitForConnectionFailed([&](const absl::Status& status) {
EXPECT_EQ(status, absl::UnavailableError(
"failed to connect to all addresses; "
"last error: UNAVAILABLE: failed to connect"));
});
// It will then start connecting to the first address again.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This time, the connection attempt succeeds.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report READY.
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
}
}
TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
// Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = {

@ -47,6 +47,8 @@ CORE_END2END_TEST(RetryHttp2Test, ConnectivityWatch) {
// start watching for a change
WatchConnectivityState(GRPC_CHANNEL_IDLE, Duration::Seconds(10), 2);
// and now the watch should trigger
// (we might miss the notification for CONNECTING, so we might see
// TRANSIENT_FAILURE instead)
Expect(2, true);
Step();
grpc_connectivity_state state = CheckConnectivityState(false);
@ -57,32 +59,24 @@ CORE_END2END_TEST(RetryHttp2Test, ConnectivityWatch) {
Expect(3, true);
Step();
state = CheckConnectivityState(false);
EXPECT_THAT(state, ::testing::AnyOf(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_CHANNEL_CONNECTING));
EXPECT_EQ(state, GRPC_CHANNEL_TRANSIENT_FAILURE);
// now let's bring up a server to connect to
InitServer(ChannelArgs());
// we'll go through some set of transitions (some might be missed), until
// READY is reached
while (state != GRPC_CHANNEL_READY) {
WatchConnectivityState(state, Duration::Seconds(10), 4);
Expect(4, true);
Step(Duration::Seconds(20));
state = CheckConnectivityState(false);
EXPECT_THAT(state,
::testing::AnyOf(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_CHANNEL_CONNECTING, GRPC_CHANNEL_READY));
}
// when the channel gets connected, it will report READY
WatchConnectivityState(state, Duration::Seconds(10), 4);
Expect(4, true);
Step(Duration::Seconds(20));
state = CheckConnectivityState(false);
EXPECT_EQ(state, GRPC_CHANNEL_READY);
// bring down the server again
// we should go immediately to TRANSIENT_FAILURE
// we should go immediately to IDLE
WatchConnectivityState(GRPC_CHANNEL_READY, Duration::Seconds(10), 5);
ShutdownServerAndNotify(1000);
Expect(5, true);
Expect(1000, true);
Step();
state = CheckConnectivityState(false);
EXPECT_THAT(state,
::testing::AnyOf(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_CHANNEL_CONNECTING, GRPC_CHANNEL_IDLE));
EXPECT_EQ(state, GRPC_CHANNEL_IDLE);
}
} // namespace

@ -36,7 +36,7 @@ CORE_END2END_TEST(Http2SingleHopTest, SimpleDelayedRequestShort) {
.Set(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 1000)
.Set(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 5000));
gpr_log(GPR_ERROR, "Create client side call");
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(30)).Create();
auto c = NewClientCall("/foo").Timeout(Duration::Minutes(1)).Create();
IncomingMetadata server_initial_metadata;
IncomingStatusOnClient server_status;
gpr_log(GPR_ERROR, "Start initial batch");

@ -1022,30 +1022,22 @@ TEST_F(
// Channel 2 should continue to report CONNECTING.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel1->GetState(false));
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
// Inject a hold for port 2, which will eventually be tried by channel 2.
auto hold_channel2_port2 = injector.AddHold(ports2[2]);
// Allow channel 2 to resume port 0. Port 0 will fail, as will port 1.
// When it gets to port 2, it will see it already in state
// TRANSIENT_FAILURE due to being shared with channel 1, so it won't
// trigger another connection attempt.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 0 ===");
hold_channel2_port0->Resume();
// Wait for channel 2 to try port 2.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 2 ===");
hold_channel2_port2->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 2 PORT 2 STARTED ===");
// Channel 2 should still be CONNECTING here.
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
// Add a hold for channel 2 port 0.
hold_channel2_port0 = injector.AddHold(ports2[0]);
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 2 ===");
hold_channel2_port2->Resume();
// Wait for channel 2 to retry port 0.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 0 ===");
hold_channel2_port0->Wait();
// Now channel 2 should be reporting TRANSIENT_FAILURE.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
// Channel 2 should soon report TRANSIENT_FAILURE.
EXPECT_TRUE(
WaitForChannelState(channel2.get(), [](grpc_connectivity_state state) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) return true;
EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
return false;
}));
// Clean up.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 0 AND CHANNEL 2 PORT 0 ===");
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 0 ===");
hold_channel1_port0->Resume();
hold_channel2_port0->Resume();
}
TEST_F(PickFirstTest, Updates) {
@ -1055,44 +1047,24 @@ TEST_F(PickFirstTest, Updates) {
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
response_generator.SetNextResolution(ports);
response_generator.SetNextResolution(GetServersPorts(0, 1));
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk(DEBUG_LOCATION, stub);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
ports.clear();
response_generator.SetNextResolution(ports);
response_generator.SetNextResolution({});
gpr_log(GPR_INFO, "****** SET none *******");
grpc_connectivity_state channel_state;
do {
channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
WaitForChannelNotReady(channel.get());
// Next update introduces servers_[1], making the channel recover.
ports.clear();
ports.emplace_back(servers_[1]->port_);
response_generator.SetNextResolution(ports);
response_generator.SetNextResolution(GetServersPorts(1, 2));
gpr_log(GPR_INFO, "****** SET [1] *******");
WaitForChannelReady(channel.get());
WaitForServer(DEBUG_LOCATION, stub, 1);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
// And again for servers_[2]
ports.clear();
ports.emplace_back(servers_[2]->port_);
response_generator.SetNextResolution(ports);
response_generator.SetNextResolution(GetServersPorts(2, 3));
gpr_log(GPR_INFO, "****** SET [2] *******");
WaitForServer(DEBUG_LOCATION, stub, 2);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
EXPECT_EQ(servers_[1]->service_.request_count(), 0);
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}

Loading…
Cancel
Save