diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 236133eeca9..5d6c7926a8b 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -32,6 +32,7 @@ #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "absl/synchronization/notification.h" #include "absl/types/optional.h" @@ -50,6 +51,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -80,15 +82,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { class FakeSubchannel : public SubchannelInterface { public: FakeSubchannel(SubchannelState* state, - const grpc_resolved_address& address, - const ChannelArgs& args, std::shared_ptr work_serializer) - : state_(state), - address_(address), - args_(args), - work_serializer_(std::move(work_serializer)) {} + : state_(state), work_serializer_(std::move(work_serializer)) {} - const grpc_resolved_address& address() const { return address_; } + SubchannelState* state() const { return state_; } private: // Converts between @@ -146,20 +143,29 @@ class LoadBalancingPolicyTest : public ::testing::Test { void AddDataWatcher(std::unique_ptr) override {} SubchannelState* state_; - grpc_resolved_address address_; - ChannelArgs args_; std::shared_ptr work_serializer_; std::map watcher_map_; }; - SubchannelState() : state_tracker_("LoadBalancingPolicyTest") {} + explicit SubchannelState(absl::string_view address) + : address_(address), state_tracker_("LoadBalancingPolicyTest") {} + + const std::string& address() const { return address_; } // Sets the connectivity state for this subchannel. The updated state // will be reported to all associated SubchannelInterface objects. void SetConnectivityState(grpc_connectivity_state state, - const absl::Status& status) { + const absl::Status& status = absl::OkStatus()) { + if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + EXPECT_FALSE(status.ok()) + << "bug in test: TRANSIENT_FAILURE must have non-OK status"; + } else { + EXPECT_TRUE(status.ok()) + << "bug in test: " << ConnectivityStateName(state) + << " must have OK status: " << status; + } MutexLock lock(&mu_); state_tracker_.SetState(state, status, "set from test"); } @@ -174,13 +180,12 @@ class LoadBalancingPolicyTest : public ::testing::Test { // To be invoked by FakeHelper. RefCountedPtr CreateSubchannel( - const grpc_resolved_address& address, const ChannelArgs& args, std::shared_ptr work_serializer) { - return MakeRefCounted(this, address, args, - std::move(work_serializer)); + return MakeRefCounted(this, std::move(work_serializer)); } private: + const std::string address_; Mutex mu_; ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(&mu_); Mutex requested_connection_mu_; @@ -196,35 +201,100 @@ class LoadBalancingPolicyTest : public ::testing::Test { grpc_connectivity_state state; absl::Status status; RefCountedPtr picker; + + std::string ToString() const { + return absl::StrFormat("UPDATE{state=%s, status=%s, picker=%p}", + ConnectivityStateName(state), status.ToString(), + picker.get()); + } }; // Represents a re-resolution request from the LB policy. - struct ReresolutionRequested {}; - - // Represents an event reported by the LB policy. - using Event = absl::variant; + struct ReresolutionRequested { + std::string ToString() const { return "RERESOLUTION"; } + }; FakeHelper(LoadBalancingPolicyTest* test, std::shared_ptr work_serializer) : test_(test), work_serializer_(std::move(work_serializer)) {} - // Returns the most recent event from the LB policy, or nullopt if - // there have been no events. - absl::optional GetEvent() { + // Called at test tear-down time to ensure that we have not left any + // unexpected events in the queue. + void ExpectQueueEmpty() { MutexLock lock(&mu_); + EXPECT_TRUE(queue_.empty()); + for (const Event& event : queue_) { + gpr_log(GPR_ERROR, "UNEXPECTED EVENT LEFT IN QUEUE: %s", + EventString(event).c_str()); + } + } + + // Returns the next event in the queue if it is a state update. + // If the queue is empty or the next event is not a state update, + // fails the test and returns nullopt without removing anything from + // the queue. + absl::optional GetNextStateUpdate( + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + EXPECT_FALSE(queue_.empty()) << location.file() << ":" << location.line(); if (queue_.empty()) return absl::nullopt; - Event event = std::move(queue_.front()); + Event& event = queue_.front(); + auto* update = absl::get_if(&event); + EXPECT_NE(update, nullptr) + << "unexpected event " << EventString(event) << " at " + << location.file() << ":" << location.line(); + if (update == nullptr) return absl::nullopt; + StateUpdate result = std::move(*update); + queue_.pop_front(); + return std::move(result); + } + + // Returns the next event in the queue if it is a re-resolution. + // If the queue is empty or the next event is not a re-resolution, + // fails the test and returns nullopt without removing anything + // from the queue. + absl::optional GetNextReresolution( + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + EXPECT_FALSE(queue_.empty()) << location.file() << ":" << location.line(); + if (queue_.empty()) return absl::nullopt; + Event& event = queue_.front(); + auto* reresolution = absl::get_if(&event); + EXPECT_NE(reresolution, nullptr) + << "unexpected event " << EventString(event) << " at " + << location.file() << ":" << location.line(); + if (reresolution == nullptr) return absl::nullopt; + ReresolutionRequested result = *reresolution; queue_.pop_front(); - return std::move(event); + return result; } private: + // Represents an event reported by the LB policy. + using Event = absl::variant; + + // Returns a human-readable representation of an event. + static std::string EventString(const Event& event) { + return Match( + event, [](const StateUpdate& update) { return update.ToString(); }, + [](const ReresolutionRequested& reresolution) { + return reresolution.ToString(); + }); + } + RefCountedPtr CreateSubchannel( ServerAddress address, const ChannelArgs& args) override { SubchannelKey key(address.address(), args); - auto& subchannel_state = test_->subchannel_pool_[key]; - return subchannel_state.CreateSubchannel(address.address(), args, - work_serializer_); + auto it = test_->subchannel_pool_.find(key); + if (it == test_->subchannel_pool_.end()) { + auto address_uri = grpc_sockaddr_to_uri(&address.address()); + GPR_ASSERT(address_uri.ok()); + it = test_->subchannel_pool_ + .emplace(std::piecewise_construct, std::forward_as_tuple(key), + std::forward_as_tuple(std::move(*address_uri))) + .first; + } + return it->second.CreateSubchannel(work_serializer_); } void UpdateState( @@ -305,6 +375,16 @@ class LoadBalancingPolicyTest : public ::testing::Test { LoadBalancingPolicyTest() : work_serializer_(std::make_shared()) {} + ~LoadBalancingPolicyTest() override { + // Note: Can't safely trigger this from inside the FakeHelper dtor, + // because if there is a picker in the queue that is holding a ref + // to the LB policy, that will prevent the LB policy from being + // destroyed, and therefore the FakeHelper will not be destroyed. + // (This will cause an ASAN failure, but it will not display the + // queued events, so the failure will be harder to diagnose.) + helper_->ExpectQueueEmpty(); + } + // Creates an LB policy of the specified name. // Creates a new FakeHelper for the new LB policy, and sets helper_ to // point to the FakeHelper. @@ -336,6 +416,19 @@ class LoadBalancingPolicyTest : public ::testing::Test { return address; } + // Constructs an update containing a list of addresses. + LoadBalancingPolicy::UpdateArgs BuildUpdate( + absl::Span addresses, + RefCountedPtr config = nullptr) { + LoadBalancingPolicy::UpdateArgs update; + update.addresses.emplace(); + for (const absl::string_view& address : addresses) { + update.addresses->emplace_back(MakeAddress(address), ChannelArgs()); + } + update.config = std::move(config); + return update; + } + // Applies the update on the LB policy. absl::Status ApplyUpdate(LoadBalancingPolicy::UpdateArgs update_args, LoadBalancingPolicy* lb_policy) { @@ -356,22 +449,12 @@ class LoadBalancingPolicyTest : public ::testing::Test { // not a state update; otherwise (if continue_predicate() tells us to // stop) returns true. bool WaitForStateUpdate( - std::function)> - continue_predicate, + std::function continue_predicate, SourceLocation location = SourceLocation()) { while (true) { - auto event = helper_->GetEvent(); - EXPECT_TRUE(event.has_value()) - << location.file() << ":" << location.line(); - if (!event.has_value()) return false; - auto* update = absl::get_if(&*event); - EXPECT_NE(update, nullptr) << location.file() << ":" << location.line(); - if (update == nullptr) return false; - if (!continue_predicate(update->state, std::move(update->status), - std::move(update->picker))) { - return true; - } + auto update = helper_->GetNextStateUpdate(location); + if (!update.has_value()) return false; + if (!continue_predicate(std::move(*update))) return true; } } @@ -381,76 +464,184 @@ class LoadBalancingPolicyTest : public ::testing::Test { grpc_connectivity_state expected_state, absl::Status expected_status = absl::OkStatus(), SourceLocation location = SourceLocation()) { - RefCountedPtr final_picker; - WaitForStateUpdate( - [&](grpc_connectivity_state state, absl::Status status, - RefCountedPtr picker) { - EXPECT_EQ(state, expected_state) - << "got " << ConnectivityStateName(state) << ", expected " - << ConnectivityStateName(expected_state) << "\n" - << "at " << location.file() << ":" << location.line(); - EXPECT_EQ(status, expected_status) - << status << "\n" - << location.file() << ":" << location.line(); - EXPECT_NE(picker, nullptr) - << location.file() << ":" << location.line(); - final_picker = std::move(picker); - return false; - }); - return final_picker; + auto update = helper_->GetNextStateUpdate(location); + if (!update.has_value()) return nullptr; + EXPECT_EQ(update->state, expected_state) + << "got " << ConnectivityStateName(update->state) << ", expected " + << ConnectivityStateName(expected_state) << "\n" + << "at " << location.file() << ":" << location.line(); + EXPECT_EQ(update->status, expected_status) + << update->status << "\n" + << location.file() << ":" << location.line(); + EXPECT_NE(update->picker, nullptr) + << location.file() << ":" << location.line(); + return std::move(update->picker); } - // Waits for the LB policy to get connected. + // Waits for the LB policy to get connected, then returns the final + // picker. There can be any number of CONNECTING updates, each of + // which must return a picker that queues picks, followed by one + // update for state READY, whose picker is returned. RefCountedPtr WaitForConnected( SourceLocation location = SourceLocation()) { RefCountedPtr final_picker; - WaitForStateUpdate( - [&](grpc_connectivity_state state, absl::Status status, - RefCountedPtr picker) { - if (state == GRPC_CHANNEL_CONNECTING) { - EXPECT_TRUE(status.ok()) << status; - ExpectPickQueued(picker.get(), location); - return true; - } - EXPECT_EQ(state, GRPC_CHANNEL_READY) << ConnectivityStateName(state); - final_picker = std::move(picker); - return false; - }); + WaitForStateUpdate([&](FakeHelper::StateUpdate update) { + if (update.state == GRPC_CHANNEL_CONNECTING) { + EXPECT_TRUE(update.status.ok()) + << update.status << " at " << location.file() << ":" + << location.line(); + ExpectPickQueued(update.picker.get(), location); + return true; // Keep going. + } + EXPECT_EQ(update.state, GRPC_CHANNEL_READY) + << ConnectivityStateName(update.state) << " at " << location.file() + << ":" << location.line(); + final_picker = std::move(update.picker); + return false; // Stop. + }); return final_picker; } - // Requests a pick on picker and expects a Queue result. - void ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker, - SourceLocation location = SourceLocation()) { + // Waits for the LB policy to fail a connection attempt. There can be + // any number of CONNECTING updates, each of which must return a picker + // that queues picks, followed by one update for state TRANSIENT_FAILURE, + // whose status is passed to check_status() and whose picker must fail + // picks with a status that is passed to check_status(). + // Returns true if the reported states match expectations. + bool WaitForConnectionFailed( + std::function check_status, + SourceLocation location = SourceLocation()) { + bool retval = false; + WaitForStateUpdate([&](FakeHelper::StateUpdate update) { + if (update.state == GRPC_CHANNEL_CONNECTING) { + EXPECT_TRUE(update.status.ok()) + << update.status << " at " << location.file() << ":" + << location.line(); + ExpectPickQueued(update.picker.get(), location); + return true; // Keep going. + } + EXPECT_EQ(update.state, GRPC_CHANNEL_TRANSIENT_FAILURE) + << ConnectivityStateName(update.state) << " at " << location.file() + << ":" << location.line(); + check_status(update.status); + ExpectPickFail(update.picker.get(), check_status, location); + retval = update.state == GRPC_CHANNEL_TRANSIENT_FAILURE; + return false; // Stop. + }); + return retval; + } + + // Expects a state update for the specified state and status, and then + // expects the resulting picker to queue picks. + void ExpectStateAndQueuingPicker( + grpc_connectivity_state expected_state, + absl::Status expected_status = absl::OkStatus(), + SourceLocation location = SourceLocation()) { + auto picker = ExpectState(expected_state, expected_status, location); + ExpectPickQueued(picker.get(), location); + } + + // Convenient frontend to ExpectStateAndQueuingPicker() for CONNECTING. + void ExpectConnectingUpdate(SourceLocation location = SourceLocation()) { + ExpectStateAndQueuingPicker(GRPC_CHANNEL_CONNECTING, absl::OkStatus(), + location); + } + + // Does a pick and returns the result. + LoadBalancingPolicy::PickResult DoPick( + LoadBalancingPolicy::SubchannelPicker* picker) { ExecCtx exec_ctx; FakeMetadata metadata({}); FakeCallState call_state; - auto pick_result = - picker->Pick({"/service/method", &metadata, &call_state}); + return picker->Pick({"/service/method", &metadata, &call_state}); + } + + // Requests a pick on picker and expects a Queue result. + void ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker, + SourceLocation location = SourceLocation()) { + auto pick_result = DoPick(picker); ASSERT_TRUE(absl::holds_alternative( pick_result.result)) - << location.file() << ":" << location.line(); + << PickResultString(pick_result) << " at " << location.file() << ":" + << location.line(); } - // Requests a pick on picker and expects a Complete result whose - // subchannel has the specified address. - void ExpectPickComplete(LoadBalancingPolicy::SubchannelPicker* picker, - absl::string_view address_uri, - SourceLocation location = SourceLocation()) { - ExecCtx exec_ctx; - FakeMetadata metadata({}); - FakeCallState call_state; - auto pick_result = - picker->Pick({"/service/method", &metadata, &call_state}); + // Requests a pick on picker and expects a Complete result. + // The address of the resulting subchannel is returned, or nullopt if + // the result was something other than Complete. + absl::optional ExpectPickComplete( + LoadBalancingPolicy::SubchannelPicker* picker, + SourceLocation location = SourceLocation()) { + auto pick_result = DoPick(picker); auto* complete = absl::get_if( &pick_result.result); - ASSERT_NE(complete, nullptr) << location.file() << ":" << location.line(); + EXPECT_NE(complete, nullptr) << PickResultString(pick_result) << " at " + << location.file() << ":" << location.line(); + if (complete == nullptr) return absl::nullopt; auto* subchannel = static_cast( complete->subchannel.get()); - auto uri = grpc_sockaddr_to_uri(&subchannel->address()); - ASSERT_TRUE(uri.ok()) << uri.status() << " at " << location.file() << ":" - << location.line(); - EXPECT_EQ(*uri, address_uri) << location.file() << ":" << location.line(); + return subchannel->state()->address(); + } + + // Requests a picker on picker and expects a Fail result. + // The failing status is passed to check_status. + void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker, + std::function check_status, + SourceLocation location = SourceLocation()) { + auto pick_result = DoPick(picker); + auto* fail = absl::get_if( + &pick_result.result); + ASSERT_NE(fail, nullptr) << PickResultString(pick_result) << " at " + << location.file() << ":" << location.line(); + check_status(fail->status); + } + + // Returns a human-readable string for a pick result. + static std::string PickResultString( + const LoadBalancingPolicy::PickResult& result) { + return Match( + result.result, + [](const LoadBalancingPolicy::PickResult::Complete& complete) { + auto* subchannel = static_cast( + complete.subchannel.get()); + return absl::StrFormat( + "COMPLETE{subchannel=%s, subchannel_call_tracker=%p}", + subchannel->state()->address(), + complete.subchannel_call_tracker.get()); + }, + [](const LoadBalancingPolicy::PickResult::Queue&) -> std::string { + return "QUEUE{}"; + }, + [](const LoadBalancingPolicy::PickResult::Fail& fail) -> std::string { + return absl::StrFormat("FAIL{%s}", fail.status.ToString()); + }, + [](const LoadBalancingPolicy::PickResult::Drop& drop) -> std::string { + return absl::StrFormat("FAIL{%s}", drop.status.ToString()); + }); + } + + // Returns the entry in the subchannel pool, or null if not present. + SubchannelState* FindSubchannel(absl::string_view address, + const ChannelArgs& args = ChannelArgs()) { + SubchannelKey key(MakeAddress(address), args); + auto it = subchannel_pool_.find(key); + if (it == subchannel_pool_.end()) return nullptr; + return &it->second; + } + + // Creates and returns an entry in the subchannel pool. + // This can be used in cases where we want to test that a subchannel + // already exists when the LB policy creates it (e.g., due to it being + // created by another channel and shared via the global subchannel + // pool, or by being created by another LB policy in this channel). + SubchannelState* CreateSubchannel(absl::string_view address, + const ChannelArgs& args = ChannelArgs()) { + SubchannelKey key(MakeAddress(address), args); + auto it = subchannel_pool_ + .emplace(std::piecewise_construct, std::forward_as_tuple(key), + std::forward_as_tuple(address)) + .first; + return &it->second; } std::shared_ptr work_serializer_; diff --git a/test/core/client_channel/lb_policy/outlier_detection_test.cc b/test/core/client_channel/lb_policy/outlier_detection_test.cc index 2d49106cf8c..3a368f6c4a8 100644 --- a/test/core/client_channel/lb_policy/outlier_detection_test.cc +++ b/test/core/client_channel/lb_policy/outlier_detection_test.cc @@ -139,38 +139,29 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest { TEST_F(OutlierDetectionTest, Basic) { constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443"; - const grpc_resolved_address address = MakeAddress(kAddressUri); // Send an update containing one address. - LoadBalancingPolicy::UpdateArgs update_args; - update_args.config = ConfigBuilder().Build(); - update_args.addresses.emplace(); - update_args.addresses->emplace_back(address, ChannelArgs()); - absl::Status status = ApplyUpdate(std::move(update_args), lb_policy_.get()); + absl::Status status = ApplyUpdate( + BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have reported CONNECTING state. - auto picker = ExpectState(GRPC_CHANNEL_CONNECTING); - ExpectPickQueued(picker.get()); + ExpectConnectingUpdate(); // LB policy should have created a subchannel for the address. - SubchannelKey key(address, ChannelArgs()); - auto it = subchannel_pool_.find(key); - ASSERT_NE(it, subchannel_pool_.end()); - auto& subchannel_state = it->second; - // LB policy should have requested a connection on this subchannel. - EXPECT_TRUE(subchannel_state.ConnectionRequested()); - // Tell subchannel to report CONNECTING. - subchannel_state.SetConnectivityState(GRPC_CHANNEL_CONNECTING, - absl::OkStatus()); - // LB policy should again report CONNECTING. - picker = ExpectState(GRPC_CHANNEL_CONNECTING); - ExpectPickQueued(picker.get()); - // Tell subchannel to report READY. - subchannel_state.SetConnectivityState(GRPC_CHANNEL_READY, absl::OkStatus()); - // LB policy should eventually report READY. - picker = WaitForConnected(); + auto* subchannel = FindSubchannel(kAddressUri); + ASSERT_NE(subchannel, nullptr); + // When the LB policy receives the subchannel's initial connectivity + // state notification (IDLE), it will request a connection. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // This causes the subchannel to start to connect, so it reports CONNECTING. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // When the subchannel becomes connected, it reports READY. + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + // The LB policy will report CONNECTING some number of times (doesn't + // matter how many) and then report READY. + auto picker = WaitForConnected(); ASSERT_NE(picker, nullptr); // Picker should return the same subchannel repeatedly. for (size_t i = 0; i < 3; ++i) { - ExpectPickComplete(picker.get(), kAddressUri); + EXPECT_EQ(ExpectPickComplete(picker.get()), kAddressUri); } } diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index 681704de60d..4578648726c 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -51,38 +51,31 @@ class PickFirstTest : public LoadBalancingPolicyTest { TEST_F(PickFirstTest, Basic) { constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443"; - const grpc_resolved_address address = MakeAddress(kAddressUri); // Send an update containing one address. - LoadBalancingPolicy::UpdateArgs update_args; - update_args.addresses.emplace(); - update_args.addresses->emplace_back(address, ChannelArgs()); - absl::Status status = ApplyUpdate(std::move(update_args), lb_policy_.get()); + absl::Status status = + ApplyUpdate(BuildUpdate({kAddressUri}), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have reported CONNECTING state. - auto picker = ExpectState(GRPC_CHANNEL_CONNECTING); - ExpectPickQueued(picker.get()); + ExpectConnectingUpdate(); // LB policy should have created a subchannel for the address with the // GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - SubchannelKey key(address, - ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); - auto it = subchannel_pool_.find(key); - ASSERT_NE(it, subchannel_pool_.end()); - auto& subchannel_state = it->second; - // LB policy should have requested a connection on this subchannel. - EXPECT_TRUE(subchannel_state.ConnectionRequested()); - // Tell subchannel to report CONNECTING. - subchannel_state.SetConnectivityState(GRPC_CHANNEL_CONNECTING, - absl::OkStatus()); - // LB policy should again report CONNECTING. - picker = ExpectState(GRPC_CHANNEL_CONNECTING); - ExpectPickQueued(picker.get()); - // Tell subchannel to report READY. - subchannel_state.SetConnectivityState(GRPC_CHANNEL_READY, absl::OkStatus()); - // LB policy should report READY. - picker = ExpectState(GRPC_CHANNEL_READY); + auto* subchannel = FindSubchannel( + kAddressUri, ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + ASSERT_NE(subchannel, nullptr); + // When the LB policy receives the subchannel's initial connectivity + // state notification (IDLE), it will request a connection. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // This causes the subchannel to start to connect, so it reports CONNECTING. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // When the subchannel becomes connected, it reports READY. + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + // The LB policy will report CONNECTING some number of times (doesn't + // matter how many) and then report READY. + auto picker = WaitForConnected(); + ASSERT_NE(picker, nullptr); // Picker should return the same subchannel repeatedly. for (size_t i = 0; i < 3; ++i) { - ExpectPickComplete(picker.get(), kAddressUri); + EXPECT_EQ(ExpectPickComplete(picker.get()), kAddressUri); } }