From 1733c66932aa8b6165c27c8df2c3541caac7180b Mon Sep 17 00:00:00 2001
From: "Mark D. Roth" <roth@google.com>
Date: Mon, 10 Oct 2022 11:06:23 -0700
Subject: [PATCH] LB policy: add LB policy unit test framework and simple
 pick_first test (#31016)

* LB policy: add LB policy unit test framework and simple pick_first test

* Automated change: Fix sanity tests

* fix build

* simplify WorkSerializer interactions

* remove duplicate check

* add ExpectPickComplete() helper method

* remove unused parameter

* fix sanity

* iwyu

* more iwyu

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
---
 BUILD                                         |   1 -
 CMakeLists.txt                                |  36 ++
 build_autogenerated.yaml                      |  10 +
 .../filters/client_channel/client_channel.cc  |   2 -
 .../ext/filters/client_channel/subchannel.h   |   2 -
 .../lib/load_balancing/subchannel_interface.h |   7 -
 test/core/client_channel/lb_policy/BUILD      |  50 +++
 .../lb_policy/lb_policy_test_lib.h            | 399 ++++++++++++++++++
 .../lb_policy/pick_first_test.cc              | 100 +++++
 tools/run_tests/generated/tests.json          |  24 ++
 10 files changed, 619 insertions(+), 12 deletions(-)
 create mode 100644 test/core/client_channel/lb_policy/BUILD
 create mode 100644 test/core/client_channel/lb_policy/lb_policy_test_lib.h
 create mode 100644 test/core/client_channel/lb_policy/pick_first_test.cc

diff --git a/BUILD b/BUILD
index 7742c8d0669..834ce86580e 100644
--- a/BUILD
+++ b/BUILD
@@ -3906,7 +3906,6 @@ grpc_cc_library(
     hdrs = ["src/core/lib/load_balancing/subchannel_interface.h"],
     external_deps = ["absl/status"],
     deps = [
-        "channel_args",
         "gpr_platform",
         "grpc_public_hdrs",
         "iomgr_fwd",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f1a11870f54..9a0c582e245 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1076,6 +1076,7 @@ if(gRPC_BUILD_TESTS)
   add_dependencies(buildtests_cxx parser_test)
   add_dependencies(buildtests_cxx percent_encoding_test)
   add_dependencies(buildtests_cxx periodic_update_test)
+  add_dependencies(buildtests_cxx pick_first_test)
   add_dependencies(buildtests_cxx pid_controller_test)
   add_dependencies(buildtests_cxx pipe_test)
   add_dependencies(buildtests_cxx poll_test)
@@ -14359,6 +14360,41 @@ target_link_libraries(periodic_update_test
 )
 
 
+endif()
+if(gRPC_BUILD_TESTS)
+
+add_executable(pick_first_test
+  test/core/client_channel/lb_policy/pick_first_test.cc
+  third_party/googletest/googletest/src/gtest-all.cc
+  third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(pick_first_test
+  PRIVATE
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/include
+    ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+    ${_gRPC_RE2_INCLUDE_DIR}
+    ${_gRPC_SSL_INCLUDE_DIR}
+    ${_gRPC_UPB_GENERATED_DIR}
+    ${_gRPC_UPB_GRPC_GENERATED_DIR}
+    ${_gRPC_UPB_INCLUDE_DIR}
+    ${_gRPC_XXHASH_INCLUDE_DIR}
+    ${_gRPC_ZLIB_INCLUDE_DIR}
+    third_party/googletest/googletest/include
+    third_party/googletest/googletest
+    third_party/googletest/googlemock/include
+    third_party/googletest/googlemock
+    ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(pick_first_test
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  grpc_test_util
+)
+
+
 endif()
 if(gRPC_BUILD_TESTS)
 
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index ce90cd6496c..2b887aa77e5 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -8093,6 +8093,16 @@ targets:
   - gpr
   - upb
   uses_polling: false
+- name: pick_first_test
+  gtest: true
+  build: test
+  language: c++
+  headers:
+  - test/core/client_channel/lb_policy/lb_policy_test_lib.h
+  src:
+  - test/core/client_channel/lb_policy/pick_first_test.cc
+  deps:
+  - grpc_test_util
 - name: pid_controller_test
   gtest: true
   build: test
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index af4be93b519..31f573a0e12 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -519,8 +519,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
     data_watchers_.push_back(std::move(internal_watcher));
   }
 
-  ChannelArgs channel_args() override { return subchannel_->channel_args(); }
-
   void ThrottleKeepaliveTime(int new_keepalive_time) {
     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
   }
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index f16511dfa8b..aa8bd1daa9b 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -240,8 +240,6 @@ class Subchannel : public DualRefCounted<Subchannel> {
 
   grpc_pollset_set* pollset_set() const { return pollset_set_; }
 
-  const ChannelArgs& channel_args() const { return args_; }
-
   channelz::SubchannelNode* channelz_node();
 
   // Starts watching the subchannel's connectivity state.
diff --git a/src/core/lib/load_balancing/subchannel_interface.h b/src/core/lib/load_balancing/subchannel_interface.h
index 3017fb373fe..ef50d8941d7 100644
--- a/src/core/lib/load_balancing/subchannel_interface.h
+++ b/src/core/lib/load_balancing/subchannel_interface.h
@@ -26,7 +26,6 @@
 
 #include <grpc/impl/codegen/connectivity_state.h>
 
-#include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/gprpp/ref_counted.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
 #include "src/core/lib/iomgr/iomgr_fwd.h"
@@ -96,9 +95,6 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
   // Registers a new data watcher.
   virtual void AddDataWatcher(
       std::unique_ptr<DataWatcherInterface> watcher) = 0;
-
-  // TODO(roth): Need a better non-grpc-specific abstraction here.
-  virtual ChannelArgs channel_args() = 0;
 };
 
 // A class that delegates to another subchannel, to be used in cases
@@ -124,9 +120,6 @@ class DelegatingSubchannel : public SubchannelInterface {
     wrapped_subchannel_->RequestConnection();
   }
   void ResetBackoff() override { wrapped_subchannel_->ResetBackoff(); }
-  ChannelArgs channel_args() override {
-    return wrapped_subchannel_->channel_args();
-  }
   void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override {
     wrapped_subchannel_->AddDataWatcher(std::move(watcher));
   }
diff --git a/test/core/client_channel/lb_policy/BUILD b/test/core/client_channel/lb_policy/BUILD
new file mode 100644
index 00000000000..d733207f61d
--- /dev/null
+++ b/test/core/client_channel/lb_policy/BUILD
@@ -0,0 +1,50 @@
+# Copyright 2022 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load(
+    "//bazel:grpc_build_system.bzl",
+    "grpc_cc_library",
+    "grpc_cc_test",
+    "grpc_package",
+)
+
+grpc_package(name = "test/core/client_channel/lb_policy")
+
+licenses(["notice"])
+
+grpc_cc_library(
+    name = "lb_policy_test_lib",
+    testonly = True,
+    hdrs = ["lb_policy_test_lib.h"],
+    external_deps = [
+        "gtest",
+    ],
+    language = "C++",
+    deps = [
+        "//:lb_policy",
+        "//:subchannel_interface",
+    ],
+)
+
+grpc_cc_test(
+    name = "pick_first_test",
+    srcs = ["pick_first_test.cc"],
+    external_deps = ["gtest"],
+    language = "C++",
+    deps = [
+        ":lb_policy_test_lib",
+        "//:grpc_lb_policy_pick_first",
+        "//test/core/util:grpc_test_util",
+    ],
+)
diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h
new file mode 100644
index 00000000000..794aa63c967
--- /dev/null
+++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h
@@ -0,0 +1,399 @@
+//
+// Copyright 2022 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_LB_POLICY_TEST_LIB_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_LB_POLICY_TEST_LIB_H
+
+#include <grpc/support/port_platform.h>
+
+#include <stddef.h>
+
+#include <deque>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/base/thread_annotations.h"
+#include "absl/memory/memory.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "absl/synchronization/notification.h"
+#include "absl/types/optional.h"
+#include "absl/types/variant.h"
+#include "gtest/gtest.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
+#include "src/core/lib/address_utils/parse_address.h"
+#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/config/core_configuration.h"
+#include "src/core/lib/gprpp/debug_location.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/gprpp/work_serializer.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolved_address.h"
+#include "src/core/lib/load_balancing/lb_policy.h"
+#include "src/core/lib/load_balancing/lb_policy_registry.h"
+#include "src/core/lib/load_balancing/subchannel_interface.h"
+#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/uri/uri_parser.h"
+
+namespace grpc_core {
+namespace testing {
+
+class LoadBalancingPolicyTest : public ::testing::Test {
+ protected:
+  // Channel-level subchannel state for a specific address and channel args.
+  // This is analogous to the real subchannel in the ClientChannel code.
+  class SubchannelState {
+   public:
+    // A fake SubchannelInterface object, to be returned to the LB
+    // policy when it calls the helper's CreateSubchannel() method.
+    // There may be multiple FakeSubchannel objects associated with a
+    // given SubchannelState object.
+    class FakeSubchannel : public SubchannelInterface {
+     public:
+      FakeSubchannel(SubchannelState* state,
+                     const grpc_resolved_address& address,
+                     const ChannelArgs& args,
+                     std::shared_ptr<WorkSerializer> work_serializer)
+          : state_(state),
+            address_(address),
+            args_(args),
+            work_serializer_(std::move(work_serializer)) {}
+
+      const grpc_resolved_address& address() const { return address_; }
+
+     private:
+      // Converts between
+      // SubchannelInterface::ConnectivityStateWatcherInterface and
+      // ConnectivityStateWatcherInterface.
+      class WatcherWrapper : public AsyncConnectivityStateWatcherInterface {
+       public:
+        WatcherWrapper(
+            std::shared_ptr<WorkSerializer> work_serializer,
+            std::unique_ptr<
+                SubchannelInterface::ConnectivityStateWatcherInterface>
+                watcher)
+            : AsyncConnectivityStateWatcherInterface(
+                  std::move(work_serializer)),
+              watcher_(std::move(watcher)) {}
+
+        void OnConnectivityStateChange(grpc_connectivity_state new_state,
+                                       const absl::Status& status) override {
+          watcher_->OnConnectivityStateChange(new_state, status);
+        }
+
+       private:
+        std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
+            watcher_;
+      };
+
+      void WatchConnectivityState(
+          std::unique_ptr<
+              SubchannelInterface::ConnectivityStateWatcherInterface>
+              watcher) override {
+        auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
+            work_serializer_, std::move(watcher));
+        watcher_map_[watcher.get()] = watcher_wrapper.get();
+        MutexLock lock(&state_->mu_);
+        state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
+                                          std::move(watcher_wrapper));
+      }
+
+      void CancelConnectivityStateWatch(
+          ConnectivityStateWatcherInterface* watcher) override {
+        auto it = watcher_map_.find(watcher);
+        if (it == watcher_map_.end()) return;
+        MutexLock lock(&state_->mu_);
+        state_->state_tracker_.RemoveWatcher(it->second);
+        watcher_map_.erase(it);
+      }
+
+      void RequestConnection() override {
+        MutexLock lock(&state_->mu_);
+        state_->requested_connection_ = true;
+      }
+
+      // Don't need these methods here, so they're no-ops.
+      void ResetBackoff() override {}
+      void AddDataWatcher(std::unique_ptr<DataWatcherInterface>) override {}
+
+      SubchannelState* state_;
+      grpc_resolved_address address_;
+      ChannelArgs args_;
+      std::shared_ptr<WorkSerializer> work_serializer_;
+      std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
+               WatcherWrapper*>
+          watcher_map_;
+    };
+
+    SubchannelState() : state_tracker_("LoadBalancingPolicyTest") {}
+
+    // Sets the connectivity state for this subchannel.  The updated state
+    // will be reported to all associated SubchannelInterface objects.
+    void SetConnectivityState(grpc_connectivity_state state,
+                              const absl::Status& status) {
+      MutexLock lock(&mu_);
+      state_tracker_.SetState(state, status, "set from test");
+    }
+
+    // Indicates if any of the associated SubchannelInterface objects
+    // have requested a connection attempt since the last time this
+    // method was called.
+    bool ConnectionRequested() {
+      MutexLock lock(&mu_);
+      return std::exchange(requested_connection_, false);
+    }
+
+    // To be invoked by FakeHelper.
+    RefCountedPtr<SubchannelInterface> CreateSubchannel(
+        const grpc_resolved_address& address, const ChannelArgs& args,
+        std::shared_ptr<WorkSerializer> work_serializer) {
+      return MakeRefCounted<FakeSubchannel>(this, address, args,
+                                            std::move(work_serializer));
+    }
+
+   private:
+    Mutex mu_;
+    bool requested_connection_ ABSL_GUARDED_BY(&mu_) = false;
+    ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(&mu_);
+  };
+
+  // A fake helper to be passed to the LB policy.
+  class FakeHelper : public LoadBalancingPolicy::ChannelControlHelper {
+   public:
+    // Represents a state update reported by the LB policy.
+    struct StateUpdate {
+      grpc_connectivity_state state;
+      absl::Status status;
+      std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker;
+    };
+
+    // Represents a re-resolution request from the LB policy.
+    struct ReresolutionRequested {};
+
+    // Represents an event reported by the LB policy.
+    using Event = absl::variant<StateUpdate, ReresolutionRequested>;
+
+    FakeHelper(LoadBalancingPolicyTest* test,
+               std::shared_ptr<WorkSerializer> work_serializer)
+        : test_(test), work_serializer_(std::move(work_serializer)) {}
+
+    // Returns the most recent event from the LB policy, or nullopt if
+    // there have been no events.
+    absl::optional<Event> GetEvent() {
+      MutexLock lock(&mu_);
+      if (queue_.empty()) return absl::nullopt;
+      Event event = std::move(queue_.front());
+      queue_.pop_front();
+      return std::move(event);
+    }
+
+   private:
+    RefCountedPtr<SubchannelInterface> CreateSubchannel(
+        ServerAddress address, const ChannelArgs& args) override {
+      SubchannelKey key(address.address(), args);
+      auto& subchannel_state = test_->subchannel_pool_[key];
+      return subchannel_state.CreateSubchannel(address.address(), args,
+                                               work_serializer_);
+    }
+
+    void UpdateState(grpc_connectivity_state state, const absl::Status& status,
+                     std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
+                         picker) override {
+      MutexLock lock(&mu_);
+      queue_.push_back(StateUpdate{state, status, std::move(picker)});
+    }
+
+    void RequestReresolution() override {
+      MutexLock lock(&mu_);
+      queue_.push_back(ReresolutionRequested());
+    }
+
+    absl::string_view GetAuthority() override { return "server.example.com"; }
+
+    void AddTraceEvent(TraceSeverity, absl::string_view) override {}
+
+    LoadBalancingPolicyTest* test_;
+    std::shared_ptr<WorkSerializer> work_serializer_;
+    Mutex mu_;
+    std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
+  };
+
+  // A fake MetadataInterface implementation, for use in PickArgs.
+  class FakeMetadata : public LoadBalancingPolicy::MetadataInterface {
+   public:
+    explicit FakeMetadata(std::map<std::string, std::string> metadata)
+        : metadata_(std::move(metadata)) {}
+
+    const std::map<std::string, std::string>& metadata() const {
+      return metadata_;
+    }
+
+   private:
+    void Add(absl::string_view key, absl::string_view value) override {
+      metadata_[std::string(key)] = std::string(value);
+    }
+
+    std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
+        override {
+      return {};  // Not used.
+    }
+
+    absl::optional<absl::string_view> Lookup(
+        absl::string_view key, std::string* /*buffer*/) const override {
+      auto it = metadata_.find(std::string(key));
+      if (it == metadata_.end()) return absl::nullopt;
+      return it->second;
+    }
+
+    std::map<std::string, std::string> metadata_;
+  };
+
+  // A fake CallState implementation, for use in PickArgs.
+  class FakeCallState : public LoadBalancingPolicy::CallState {
+   public:
+    ~FakeCallState() override {
+      for (void* allocation : allocations_) {
+        gpr_free(allocation);
+      }
+    }
+
+   private:
+    void* Alloc(size_t size) override {
+      void* allocation = gpr_malloc(size);
+      allocations_.push_back(allocation);
+      return allocation;
+    }
+
+    std::vector<void*> allocations_;
+  };
+
+  LoadBalancingPolicyTest()
+      : work_serializer_(std::make_shared<WorkSerializer>()) {}
+
+  // Creates an LB policy of the specified name.
+  // Creates a new FakeHelper for the new LB policy, and sets helper_ to
+  // point to the FakeHelper.
+  OrphanablePtr<LoadBalancingPolicy> MakeLbPolicy(absl::string_view name) {
+    auto helper = absl::make_unique<FakeHelper>(this, work_serializer_);
+    helper_ = helper.get();
+    LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper),
+                                      ChannelArgs()};
+    return CoreConfiguration::Get()
+        .lb_policy_registry()
+        .CreateLoadBalancingPolicy(name, std::move(args));
+  }
+
+  // Converts an address URI into a grpc_resolved_address.
+  static grpc_resolved_address MakeAddress(absl::string_view address_uri) {
+    auto uri = URI::Parse(address_uri);
+    GPR_ASSERT(uri.ok());
+    grpc_resolved_address address;
+    GPR_ASSERT(grpc_parse_uri(*uri, &address));
+    return address;
+  }
+
+  // Applies the update on the LB policy.
+  absl::Status ApplyUpdate(LoadBalancingPolicy::UpdateArgs update_args,
+                           LoadBalancingPolicy* lb_policy) {
+    absl::Status status;
+    absl::Notification notification;
+    work_serializer_->Run(
+        [&]() {
+          status = lb_policy->UpdateLocked(std::move(update_args));
+          notification.Notify();
+        },
+        DEBUG_LOCATION);
+    notification.WaitForNotification();
+    return status;
+  }
+
+  // Expects that the LB policy has reported the specified connectivity
+  // state to helper_.  Returns the picker from the state update.
+  std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> ExpectState(
+      grpc_connectivity_state state, absl::Status status = absl::OkStatus(),
+      SourceLocation location = SourceLocation()) {
+    auto event = helper_->GetEvent();
+    EXPECT_TRUE(event.has_value()) << location.file() << ":" << location.line();
+    if (!event.has_value()) return nullptr;
+    auto* update = absl::get_if<FakeHelper::StateUpdate>(&*event);
+    EXPECT_NE(update, nullptr) << location.file() << ":" << location.line();
+    if (update == nullptr) return nullptr;
+    EXPECT_EQ(update->state, state)
+        << location.file() << ":" << location.line();
+    EXPECT_EQ(update->status, status)
+        << update->status << "\n"
+        << location.file() << ":" << location.line();
+    EXPECT_NE(update->picker, nullptr)
+        << location.file() << ":" << location.line();
+    return std::move(update->picker);
+  }
+
+  // Requests a pick on picker and expects a Queue result.
+  void ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker,
+                        SourceLocation location = SourceLocation()) {
+    ExecCtx exec_ctx;
+    FakeMetadata metadata({});
+    FakeCallState call_state;
+    auto pick_result =
+        picker->Pick({"/service/method", &metadata, &call_state});
+    ASSERT_TRUE(absl::holds_alternative<LoadBalancingPolicy::PickResult::Queue>(
+        pick_result.result))
+        << location.file() << ":" << location.line();
+  }
+
+  // Requests a pick on picker and expects a Complete result whose
+  // subchannel has the specified address.
+  void ExpectPickComplete(LoadBalancingPolicy::SubchannelPicker* picker,
+                          absl::string_view address_uri,
+                          SourceLocation location = SourceLocation()) {
+    ExecCtx exec_ctx;
+    FakeMetadata metadata({});
+    FakeCallState call_state;
+    auto pick_result =
+        picker->Pick({"/service/method", &metadata, &call_state});
+    auto* complete = absl::get_if<LoadBalancingPolicy::PickResult::Complete>(
+        &pick_result.result);
+    ASSERT_NE(complete, nullptr) << location.file() << ":" << location.line();
+    auto* subchannel = static_cast<SubchannelState::FakeSubchannel*>(
+        complete->subchannel.get());
+    auto uri = grpc_sockaddr_to_uri(&subchannel->address());
+    ASSERT_TRUE(uri.ok()) << uri.status() << " at " << location.file() << ":"
+                          << location.line();
+    EXPECT_EQ(*uri, address_uri) << location.file() << ":" << location.line();
+  }
+
+  std::shared_ptr<WorkSerializer> work_serializer_;
+  FakeHelper* helper_ = nullptr;
+  std::map<SubchannelKey, SubchannelState> subchannel_pool_;
+};
+
+}  // namespace testing
+}  // namespace grpc_core
+
+#endif  // GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_LB_POLICY_TEST_LIB_H
diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc
new file mode 100644
index 00000000000..681704de60d
--- /dev/null
+++ b/test/core/client_channel/lb_policy/pick_first_test.cc
@@ -0,0 +1,100 @@
+//
+// Copyright 2022 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <stddef.h>
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "gtest/gtest.h"
+
+#include <grpc/grpc.h>
+
+#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/iomgr/resolved_address.h"
+#include "src/core/lib/load_balancing/lb_policy.h"
+#include "src/core/lib/resolver/server_address.h"
+#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace testing {
+namespace {
+
+class PickFirstTest : public LoadBalancingPolicyTest {
+ protected:
+  PickFirstTest() : lb_policy_(MakeLbPolicy("pick_first")) {}
+
+  OrphanablePtr<LoadBalancingPolicy> lb_policy_;
+};
+
+TEST_F(PickFirstTest, Basic) {
+  constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443";
+  const grpc_resolved_address address = MakeAddress(kAddressUri);
+  // Send an update containing one address.
+  LoadBalancingPolicy::UpdateArgs update_args;
+  update_args.addresses.emplace();
+  update_args.addresses->emplace_back(address, ChannelArgs());
+  absl::Status status = ApplyUpdate(std::move(update_args), lb_policy_.get());
+  EXPECT_TRUE(status.ok()) << status;
+  // LB policy should have reported CONNECTING state.
+  auto picker = ExpectState(GRPC_CHANNEL_CONNECTING);
+  ExpectPickQueued(picker.get());
+  // LB policy should have created a subchannel for the address with the
+  // GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
+  SubchannelKey key(address,
+                    ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
+  auto it = subchannel_pool_.find(key);
+  ASSERT_NE(it, subchannel_pool_.end());
+  auto& subchannel_state = it->second;
+  // LB policy should have requested a connection on this subchannel.
+  EXPECT_TRUE(subchannel_state.ConnectionRequested());
+  // Tell subchannel to report CONNECTING.
+  subchannel_state.SetConnectivityState(GRPC_CHANNEL_CONNECTING,
+                                        absl::OkStatus());
+  // LB policy should again report CONNECTING.
+  picker = ExpectState(GRPC_CHANNEL_CONNECTING);
+  ExpectPickQueued(picker.get());
+  // Tell subchannel to report READY.
+  subchannel_state.SetConnectivityState(GRPC_CHANNEL_READY, absl::OkStatus());
+  // LB policy should report READY.
+  picker = ExpectState(GRPC_CHANNEL_READY);
+  // Picker should return the same subchannel repeatedly.
+  for (size_t i = 0; i < 3; ++i) {
+    ExpectPickComplete(picker.get(), kAddressUri);
+  }
+}
+
+}  // namespace
+}  // namespace testing
+}  // namespace grpc_core
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  grpc::testing::TestEnvironment env(&argc, argv);
+  grpc_init();
+  int ret = RUN_ALL_TESTS();
+  grpc_shutdown();
+  return ret;
+}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index a7d7ed30122..f5303039b63 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -5143,6 +5143,30 @@
     ],
     "uses_polling": false
   },
+  {
+    "args": [],
+    "benchmark": false,
+    "ci_platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "cpu_cost": 1.0,
+    "exclude_configs": [],
+    "exclude_iomgrs": [],
+    "flaky": false,
+    "gtest": true,
+    "language": "c++",
+    "name": "pick_first_test",
+    "platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "uses_polling": true
+  },
   {
     "args": [],
     "benchmark": false,