diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8e9ee889e1d..38036c97181 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -597,6 +597,8 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); } + // Don't override connectivity state if we already have an LB policy. + if (chand->lb_policy != nullptr) set_connectivity_state = false; } else { grpc_core::UniquePtr lb_policy_name = get_lb_policy_name_from_resolver_result_locked(chand); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 144ac24a56a..3aa690bea41 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -103,7 +103,7 @@ void FakeResolver::NextLocked(grpc_channel_args** target_result, } void FakeResolver::RequestReresolutionLocked() { - if (reresolution_results_ != nullptr) { + if (reresolution_results_ != nullptr || return_failure_) { grpc_channel_args_destroy(next_results_); next_results_ = grpc_channel_args_copy(reresolution_results_); MaybeFinishNextLocked(); @@ -141,6 +141,7 @@ struct SetResponseClosureArg { grpc_closure set_response_closure; FakeResolverResponseGenerator* generator; grpc_channel_args* response; + bool immediate = true; }; void FakeResolverResponseGenerator::SetResponseLocked(void* arg, @@ -194,7 +195,7 @@ void FakeResolverResponseGenerator::SetFailureLocked(void* arg, SetResponseClosureArg* closure_arg = static_cast(arg); FakeResolver* resolver = closure_arg->generator->resolver_; resolver->return_failure_ = true; - resolver->MaybeFinishNextLocked(); + if (closure_arg->immediate) resolver->MaybeFinishNextLocked(); Delete(closure_arg); } @@ -209,6 +210,18 @@ void FakeResolverResponseGenerator::SetFailure() { GRPC_ERROR_NONE); } +void FakeResolverResponseGenerator::SetFailureOnReresolution() { + GPR_ASSERT(resolver_ != nullptr); + SetResponseClosureArg* closure_arg = New(); + closure_arg->generator = this; + closure_arg->immediate = false; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked, + closure_arg, + grpc_combiner_scheduler(resolver_->combiner())), + GRPC_ERROR_NONE); +} + namespace { static void* response_generator_arg_copy(void* p) { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index 74a3062e7f4..7f690593510 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -61,6 +61,10 @@ class FakeResolverResponseGenerator // returning a null result with no error). void SetFailure(); + // Same as SetFailure(), but instead of returning the error + // immediately, waits for the next call to RequestReresolutionLocked(). + void SetFailureOnReresolution(); + // Returns a channel arg containing \a generator. static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index d13fb23796e..312065a2dfe 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -191,6 +191,11 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_channel_args_destroy(fake_results); } + void SetFailureOnReresolution() { + grpc_core::ExecCtx exec_ctx; + response_generator_->SetFailureOnReresolution(); + } + std::vector GetServersPorts() { std::vector ports; for (const auto& server : servers_) ports.push_back(server->port_); @@ -728,6 +733,23 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { + // Start server, send RPC, and make sure channel is READY. + const int kNumServers = 1; + StartServers(kNumServers); + auto channel = BuildChannel(""); // pick_first is the default. + auto stub = BuildStub(channel); + SetNextResolution(GetServersPorts()); + CheckRpcSendOk(stub, DEBUG_LOCATION); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + // Stop server. Channel should go into state IDLE. + SetFailureOnReresolution(); + servers_[0]->Shutdown(); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); + servers_.clear(); +} + TEST_F(ClientLbEnd2endTest, RoundRobin) { // Start servers and send one RPC per server. const int kNumServers = 3;