diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 0ac0f41d4ef..15d953cd92f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -102,6 +102,14 @@ class PickFirst : public LoadBalancingPolicy { PickFirst* p = static_cast(policy()); p->Unref(DEBUG_LOCATION, "subchannel_list"); } + + bool in_transient_failure() const { return in_transient_failure_; } + void set_in_transient_failure(bool in_transient_failure) { + in_transient_failure_ = in_transient_failure; + } + + private: + bool in_transient_failure_ = false; }; class Picker : public SubchannelPicker { @@ -368,12 +376,21 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p->selected_ = nullptr; StopConnectivityWatchLocked(); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "selected subchannel not ready; switching to pending update", &error, - 1); - p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error), - UniquePtr(New(new_error))); + // Set our state to that of the pending subchannel list. + if (p->subchannel_list_->in_transient_failure()) { + grpc_error* new_error = + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "selected subchannel failed; switching to pending update", + &error, 1); + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error), + UniquePtr( + New(new_error))); + } else { + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + UniquePtr(New(p->Ref()))); + } } else { if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected subchannel goes bad, request a re-resolution. We @@ -382,7 +399,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // to connect to the re-resolved backends until we leave IDLE state. p->idle_ = true; p->channel_control_helper()->RequestReresolution(); - // In transient failure. Rely on re-resolution to recover. p->selected_ = nullptr; StopConnectivityWatchLocked(); p->channel_control_helper()->UpdateState( @@ -418,6 +434,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. + subchannel_list()->set_in_transient_failure(false); switch (connectivity_state) { case GRPC_CHANNEL_READY: { // Renew notification. @@ -431,17 +448,25 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( size_t next_index = (sd->Index() + 1) % subchannel_list()->num_subchannels(); sd = subchannel_list()->subchannel(next_index); - // Case 1: Only set state to TRANSIENT_FAILURE if we've tried - // all subchannels. - if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { - p->channel_control_helper()->RequestReresolution(); - grpc_error* new_error = - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "failed to connect to all addresses", &error, 1); - p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error), - UniquePtr( - New(new_error))); + // If we're tried all subchannels, set state to TRANSIENT_FAILURE. + if (sd->Index() == 0) { + // Re-resolve if this is the most recent subchannel list. + if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr + ? p->latest_pending_subchannel_list_.get() + : p->subchannel_list_.get())) { + p->channel_control_helper()->RequestReresolution(); + } + subchannel_list()->set_in_transient_failure(true); + // Only report new state in case 1. + if (subchannel_list() == p->subchannel_list_.get()) { + grpc_error* new_error = + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "failed to connect to all addresses", &error, 1); + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error), + UniquePtr( + New(new_error))); + } } sd->CheckConnectivityStateAndStartWatchingLocked(); break; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 888c1757be1..829bee6bedd 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1136,8 +1136,10 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, } t->goaway_error = grpc_error_set_str( grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"), - GRPC_ERROR_INT_HTTP2_ERROR, static_cast(goaway_error)), + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"), + GRPC_ERROR_INT_HTTP2_ERROR, static_cast(goaway_error)), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), GRPC_ERROR_STR_RAW_BYTES, goaway_text); /* We want to log this irrespective of whether http tracing is enabled */ diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 12258a64e00..68e0ec3cef1 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -402,6 +402,7 @@ grpc_cc_test( name = "client_lb_end2end_test", srcs = ["client_lb_end2end_test.cc"], external_deps = [ + "gmock", "gtest", ], deps = [ diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 996ba0edbbe..3cd06e9e28c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -56,6 +56,7 @@ #include "test/core/util/test_lb_policies.h" #include "test/cpp/end2end/test_service_impl.h" +#include #include using grpc::testing::EchoRequest; @@ -221,9 +222,11 @@ class ClientLbEnd2endTest : public ::testing::Test { response_generator_->SetFailureOnReresolution(); } - std::vector GetServersPorts() { + std::vector GetServersPorts(size_t start_index = 0) { std::vector ports; - for (const auto& server : servers_) ports.push_back(server->port_); + for (size_t i = start_index; i < servers_.size(); ++i) { + ports.push_back(servers_[i]->port_); + } return ports; } @@ -897,6 +900,53 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { servers_.clear(); } +TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) { + auto channel = BuildChannel(""); // pick_first is the default. + auto stub = BuildStub(channel); + // Create a number of servers, but only start 1 of them. + CreateServers(10); + StartServer(0); + // Initially resolve to first server and make sure it connects. + gpr_log(GPR_INFO, "Phase 1: Connect to first server."); + SetNextResolution({servers_[0]->port_}); + CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + // Send a resolution update with the remaining servers, none of which are + // running yet, so the update will stay pending. Note that it's important + // to have multiple servers here, or else the test will be flaky; with only + // one server, the pending subchannel list has already gone into + // TRANSIENT_FAILURE due to hitting the end of the list by the time we + // check the state. + gpr_log(GPR_INFO, + "Phase 2: Resolver update pointing to remaining " + "(not started) servers."); + SetNextResolution(GetServersPorts(1 /* start_index */)); + // RPCs will continue to be sent to the first server. + CheckRpcSendOk(stub, DEBUG_LOCATION); + // Now stop the first server, so that the current subchannel list + // fails. This should cause us to immediately swap over to the + // pending list, even though it's not yet connected. The state should + // be set to CONNECTING, since that's what the pending subchannel list + // was doing when we swapped over. + gpr_log(GPR_INFO, "Phase 3: Stopping first server."); + servers_[0]->Shutdown(); + WaitForChannelNotReady(channel.get()); + // TODO(roth): This should always return CONNECTING, but it's flaky + // between that and TRANSIENT_FAILURE. I suspect that this problem + // will go away once we move the backoff code out of the subchannel + // and into the LB policies. + EXPECT_THAT(channel->GetState(false), + ::testing::AnyOf(GRPC_CHANNEL_CONNECTING, + GRPC_CHANNEL_TRANSIENT_FAILURE)); + // Now start the second server. + gpr_log(GPR_INFO, "Phase 4: Starting second server."); + StartServer(1); + // The channel should go to READY state and RPCs should go to the + // second server. + WaitForChannelReady(channel.get()); + WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */); +} + TEST_F(ClientLbEnd2endTest, RoundRobin) { // Start servers and send one RPC per server. const int kNumServers = 3;