Merge pull request #18222 from markdroth/pick_first_selected_fails_connectivity_state_fix

Fix state reported by pick_first when we receive a GOAWAY with a pending subchannel list.
pull/18346/head
Mark D. Roth 6 years ago committed by GitHub
commit a89df2e275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 1
      test/cpp/end2end/BUILD
  4. 54
      test/cpp/end2end/client_lb_end2end_test.cc

@ -102,6 +102,14 @@ class PickFirst : public LoadBalancingPolicy {
PickFirst* p = static_cast<PickFirst*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
bool in_transient_failure() const { return in_transient_failure_; }
void set_in_transient_failure(bool in_transient_failure) {
in_transient_failure_ = in_transient_failure;
}
private:
bool in_transient_failure_ = false;
};
class Picker : public SubchannelPicker {
@ -368,12 +376,21 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->selected_ = nullptr;
StopConnectivityWatchLocked();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"selected subchannel not ready; switching to pending update", &error,
1);
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(new_error)));
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
grpc_error* new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"selected subchannel failed; switching to pending update",
&error, 1);
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
UniquePtr<SubchannelPicker>(
New<TransientFailurePicker>(new_error)));
} else {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
}
} else {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected subchannel goes bad, request a re-resolution. We
@ -382,7 +399,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// to connect to the re-resolved backends until we leave IDLE state.
p->idle_ = true;
p->channel_control_helper()->RequestReresolution();
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
StopConnectivityWatchLocked();
p->channel_control_helper()->UpdateState(
@ -418,6 +434,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
subchannel_list()->set_in_transient_failure(false);
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Renew notification.
@ -431,17 +448,25 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
grpc_error* new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"failed to connect to all addresses", &error, 1);
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
UniquePtr<SubchannelPicker>(
New<TransientFailurePicker>(new_error)));
// If we're tried all subchannels, set state to TRANSIENT_FAILURE.
if (sd->Index() == 0) {
// Re-resolve if this is the most recent subchannel list.
if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr
? p->latest_pending_subchannel_list_.get()
: p->subchannel_list_.get())) {
p->channel_control_helper()->RequestReresolution();
}
subchannel_list()->set_in_transient_failure(true);
// Only report new state in case 1.
if (subchannel_list() == p->subchannel_list_.get()) {
grpc_error* new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"failed to connect to all addresses", &error, 1);
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
UniquePtr<SubchannelPicker>(
New<TransientFailurePicker>(new_error)));
}
}
sd->CheckConnectivityStateAndStartWatchingLocked();
break;

@ -1136,8 +1136,10 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
}
t->goaway_error = grpc_error_set_str(
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
GRPC_ERROR_STR_RAW_BYTES, goaway_text);
/* We want to log this irrespective of whether http tracing is enabled */

@ -402,6 +402,7 @@ grpc_cc_test(
name = "client_lb_end2end_test",
srcs = ["client_lb_end2end_test.cc"],
external_deps = [
"gmock",
"gtest",
],
deps = [

@ -56,6 +56,7 @@
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
using grpc::testing::EchoRequest;
@ -221,9 +222,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
response_generator_->SetFailureOnReresolution();
}
std::vector<int> GetServersPorts() {
std::vector<int> GetServersPorts(size_t start_index = 0) {
std::vector<int> ports;
for (const auto& server : servers_) ports.push_back(server->port_);
for (size_t i = start_index; i < servers_.size(); ++i) {
ports.push_back(servers_[i]->port_);
}
return ports;
}
@ -897,6 +900,53 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
servers_.clear();
}
TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
auto channel = BuildChannel(""); // pick_first is the default.
auto stub = BuildStub(channel);
// Create a number of servers, but only start 1 of them.
CreateServers(10);
StartServer(0);
// Initially resolve to first server and make sure it connects.
gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Send a resolution update with the remaining servers, none of which are
// running yet, so the update will stay pending. Note that it's important
// to have multiple servers here, or else the test will be flaky; with only
// one server, the pending subchannel list has already gone into
// TRANSIENT_FAILURE due to hitting the end of the list by the time we
// check the state.
gpr_log(GPR_INFO,
"Phase 2: Resolver update pointing to remaining "
"(not started) servers.");
SetNextResolution(GetServersPorts(1 /* start_index */));
// RPCs will continue to be sent to the first server.
CheckRpcSendOk(stub, DEBUG_LOCATION);
// Now stop the first server, so that the current subchannel list
// fails. This should cause us to immediately swap over to the
// pending list, even though it's not yet connected. The state should
// be set to CONNECTING, since that's what the pending subchannel list
// was doing when we swapped over.
gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
servers_[0]->Shutdown();
WaitForChannelNotReady(channel.get());
// TODO(roth): This should always return CONNECTING, but it's flaky
// between that and TRANSIENT_FAILURE. I suspect that this problem
// will go away once we move the backoff code out of the subchannel
// and into the LB policies.
EXPECT_THAT(channel->GetState(false),
::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
GRPC_CHANNEL_TRANSIENT_FAILURE));
// Now start the second server.
gpr_log(GPR_INFO, "Phase 4: Starting second server.");
StartServer(1);
// The channel should go to READY state and RPCs should go to the
// second server.
WaitForChannelReady(channel.get());
WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
}
TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

Loading…
Cancel
Save