[pick_first] changes to support dualstack design (#34218)

This rolls forward only the pick_first changes from #32692, which were
rolled back in #33718. Specifically:
- Changes PF to use its own subchannel list implementation instead of
using the subchannel_list library, since the latter will be going away
with the dualstack changes.
- As a result of no longer using the subchannel_list library, PF no
longer needs to set the `GRPC_ARG_INHIBIT_HEALTH_CHECKING` channel arg.
- Adds an option to start a health watch on the chosen subchannel, to be
used in the future when pick_first is the child of a petiole policy.
(Currently, this code is not actually called anywhere.)
pull/34223/head
Mark D. Roth 1 year ago committed by GitHub
parent f5e02f6c62
commit 6412412ae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/BUILD
  2. 536
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  3. 16
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h
  4. 5
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  5. 75
      test/core/client_channel/lb_policy/pick_first_test.cc

@ -4789,15 +4789,15 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"grpc_lb_subchannel_list",
"grpc_outlier_detection_header",
"health_check_client",
"iomgr_fwd",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"subchannel_interface",
"//:channel_arg_names",
"//:config",
"//:debug_location",
"//:gpr",
@ -4806,7 +4806,6 @@ grpc_cc_library(
"//:orphanable",
"//:ref_counted_ptr",
"//:server_address",
"//:work_serializer",
],
)

@ -16,6 +16,8 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h"
#include <inttypes.h>
#include <string.h>
@ -33,19 +35,19 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h"
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -98,72 +100,140 @@ class PickFirst : public LoadBalancingPolicy {
private:
~PickFirst() override;
class PickFirstSubchannelList;
class PickFirstSubchannelData
: public SubchannelData<PickFirstSubchannelList,
PickFirstSubchannelData> {
class SubchannelList : public InternallyRefCounted<SubchannelList> {
public:
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
class SubchannelData {
public:
SubchannelData(SubchannelList* subchannel_list,
RefCountedPtr<SubchannelInterface> subchannel);
SubchannelInterface* subchannel() const { return subchannel_.get(); }
absl::optional<grpc_connectivity_state> connectivity_state() const {
return connectivity_state_;
}
// Returns the index into the subchannel list of this object.
size_t Index() const {
return static_cast<size_t>(this -
&subchannel_list_->subchannels_.front());
}
// Resets the connection backoff.
void ResetBackoffLocked() {
if (subchannel_ != nullptr) subchannel_->ResetBackoff();
}
// Cancels any pending connectivity watch and unrefs the subchannel.
void ShutdownLocked();
private:
// Watcher for subchannel connectivity state.
class Watcher
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
Watcher(SubchannelData* subchannel_data,
RefCountedPtr<SubchannelList> subchannel_list)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)) {}
~Watcher() override {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
}
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override {
subchannel_data_->OnConnectivityStateChange(new_state,
std::move(status));
}
grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy_->interested_parties();
}
private:
SubchannelData* subchannel_data_;
RefCountedPtr<SubchannelList> subchannel_list_;
};
void ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
// This method will be invoked once soon after instantiation to report
// the current connectivity state, and it will then be invoked again
// whenever the connectivity state changes.
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status);
// Processes the connectivity change to READY for an unselected
// subchannel.
void ProcessUnselectedReadyLocked();
// Reacts to the current connectivity state while trying to connect.
void ReactToConnectivityStateLocked();
// Backpointer to owning subchannel list. Not owned.
SubchannelList* subchannel_list_;
// The subchannel.
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
nullptr;
// Data updated by the watcher.
absl::optional<grpc_connectivity_state> connectivity_state_;
absl::Status connectivity_status_;
};
SubchannelList(RefCountedPtr<PickFirst> policy, ServerAddressList addresses,
const ChannelArgs& args);
~SubchannelList() override;
// The number of subchannels in the list.
size_t size() const { return subchannels_.size(); }
// Resets connection backoff of all subchannels.
void ResetBackoffLocked();
void Orphan() override;
private:
// Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked();
// Returns true if all subchannels have seen their initial
// connectivity state notifications.
bool AllSubchannelsSeenInitialState();
// Backpointer to owning policy.
RefCountedPtr<PickFirst> policy_;
ChannelArgs args_;
// The list of subchannels.
std::vector<SubchannelData> subchannels_;
// Reacts to the current connectivity state while trying to connect.
void ReactToConnectivityStateLocked();
// Is this list shutting down? This may be true due to the shutdown of the
// policy itself or because a newer update has arrived while this one hadn't
// finished processing.
bool shutting_down_ = false;
bool in_transient_failure_ = false;
size_t attempting_index_ = 0;
};
class PickFirstSubchannelList
: public SubchannelList<PickFirstSubchannelList,
PickFirstSubchannelData> {
class HealthWatcher
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
PickFirstSubchannelList(PickFirst* policy, ServerAddressList addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)
? "PickFirstSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Note that we do not start trying to connect to any subchannel here,
// since we will wait until we see the initial connectivity state for all
// subchannels before doing that.
}
~PickFirstSubchannelList() override {
PickFirst* p = static_cast<PickFirst*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
explicit HealthWatcher(RefCountedPtr<PickFirst> policy)
: policy_(std::move(policy)) {}
bool in_transient_failure() const { return in_transient_failure_; }
void set_in_transient_failure(bool in_transient_failure) {
in_transient_failure_ = in_transient_failure;
~HealthWatcher() override {
policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
}
size_t attempting_index() const { return attempting_index_; }
void set_attempting_index(size_t index) { attempting_index_ = index; }
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override;
private:
std::shared_ptr<WorkSerializer> work_serializer() const override {
return static_cast<PickFirst*>(policy())->work_serializer();
grpc_pollset_set* interested_parties() override {
return policy_->interested_parties();
}
bool in_transient_failure_ = false;
size_t attempting_index_ = 0;
private:
RefCountedPtr<PickFirst> policy_;
};
class Picker : public SubchannelPicker {
@ -181,19 +251,30 @@ class PickFirst : public LoadBalancingPolicy {
void ShutdownLocked() override;
void AttemptToConnectUsingLatestUpdateArgsLocked();
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker);
void AttemptToConnectUsingLatestUpdateArgsLocked();
void UnsetSelectedSubchannel();
// Whether we should enable health watching.
const bool enable_health_watch_;
// Whether we should omit our status message prefix.
const bool omit_status_message_prefix_;
// Lateset update args.
UpdateArgs latest_update_args_;
// All our subchannels.
RefCountedPtr<PickFirstSubchannelList> subchannel_list_;
OrphanablePtr<SubchannelList> subchannel_list_;
// Latest pending subchannel list.
RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
OrphanablePtr<SubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in subchannel_list_.
SubchannelList::SubchannelData* selected_ = nullptr;
// Health watcher for the selected subchannel.
SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
nullptr;
SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
// Current connectivity state.
grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
// Are we shut down?
@ -202,7 +283,16 @@ class PickFirst : public LoadBalancingPolicy {
absl::BitGen bit_gen_;
};
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
PickFirst::PickFirst(Args args)
: LoadBalancingPolicy(std::move(args)),
enable_health_watch_(
channel_args()
.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
.value_or(false)),
omit_status_message_prefix_(
channel_args()
.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
.value_or(false)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
@ -221,6 +311,7 @@ void PickFirst::ShutdownLocked() {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
}
shutdown_ = true;
UnsetSelectedSubchannel();
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
}
@ -255,13 +346,11 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
"[PF %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeRefCounted<PickFirstSubchannelList>(
this, std::move(addresses), latest_update_args_.args);
latest_pending_subchannel_list_->StartWatchingLocked(
latest_update_args_.args);
latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
Ref(), std::move(addresses), latest_update_args_.args);
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
if (latest_pending_subchannel_list_->size() == 0) {
channel_control_helper()->RequestReresolution();
absl::Status status =
latest_update_args_.addresses.ok()
@ -273,9 +362,8 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
}
// If the new update is empty or we don't yet have a selected subchannel in
// the current list, replace the current subchannel list immediately.
if (latest_pending_subchannel_list_->num_subchannels() == 0 ||
selected_ == nullptr) {
selected_ = nullptr;
if (latest_pending_subchannel_list_->size() == 0 || selected_ == nullptr) {
UnsetSelectedSubchannel();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[PF %p] Shutting down previous subchannel list %p",
@ -296,8 +384,6 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
this, args.addresses.status().ToString().c_str());
}
}
// Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
// Set return status based on the address list.
absl::Status status;
if (!args.addresses.ok()) {
@ -345,18 +431,122 @@ void PickFirst::UpdateState(grpc_connectivity_state state,
channel_control_helper()->UpdateState(state, status, std::move(picker));
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
void PickFirst::UnsetSelectedSubchannel() {
if (selected_ != nullptr && health_data_watcher_ != nullptr) {
selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
}
selected_ = nullptr;
health_watcher_ = nullptr;
health_data_watcher_ = nullptr;
}
//
// PickFirst::HealthWatcher
//
void PickFirst::HealthWatcher::OnConnectivityStateChange(
grpc_connectivity_state new_state, absl::Status status) {
if (policy_->health_watcher_ != this) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "[PF %p] health watch state update: %s (%s)",
policy_.get(), ConnectivityStateName(new_state),
status.ToString().c_str());
}
switch (new_state) {
case GRPC_CHANNEL_READY:
policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::OkStatus(),
MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
break;
case GRPC_CHANNEL_IDLE:
// If the subchannel becomes disconnected, the health watcher
// might happen to see the change before the raw connectivity
// state watcher does. In this case, ignore it, since the raw
// connectivity state watcher will handle it shortly.
break;
case GRPC_CHANNEL_CONNECTING:
policy_->channel_control_helper()->UpdateState(
new_state, absl::OkStatus(),
MakeRefCounted<QueuePicker>(policy_->Ref()));
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
break;
case GRPC_CHANNEL_SHUTDOWN:
Crash("health watcher reported state SHUTDOWN");
}
}
//
// PickFirst::SubchannelList::SubchannelData
//
PickFirst::SubchannelList::SubchannelData::SubchannelData(
SubchannelList* subchannel_list,
RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR
" (subchannel %p): starting watch",
subchannel_list_->policy_.get(), subchannel_list_,
subchannel_list_->size(), subchannel_.get());
}
auto watcher = std::make_unique<Watcher>(
this, subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"));
pending_watcher_ = watcher.get();
subchannel_->WatchConnectivityState(std::move(watcher));
}
void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
if (subchannel_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): cancelling watch and unreffing subchannel",
subchannel_list_->policy_.get(), subchannel_list_, Index(),
subchannel_list_->size(), subchannel_.get());
}
subchannel_->CancelConnectivityStateWatch(pending_watcher_);
pending_watcher_ = nullptr;
subchannel_.reset();
}
}
void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
grpc_connectivity_state new_state, absl::Status status) {
PickFirst* p = subchannel_list_->policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(
GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
"status=%s, shutting_down=%d, pending_watcher=%p, "
"p->selected_=%p, p->subchannel_list_=%p, "
"p->latest_pending_subchannel_list_=%p",
p, subchannel_list_, Index(), subchannel_list_->size(),
subchannel_.get(),
(connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
: "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str(),
subchannel_list_->shutting_down_, pending_watcher_, p->selected_,
p->subchannel_list_.get(), p->latest_pending_subchannel_list_.get());
}
if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() ||
subchannel_list_ == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
connectivity_state_ = new_state;
connectivity_status_ = status;
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get());
GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p selected subchannel connectivity changed to %s", p,
@ -380,17 +570,15 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr;
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
if (p->subchannel_list_->in_transient_failure_) {
absl::Status status = absl::UnavailableError(absl::StrCat(
"selected subchannel failed; switching to pending update; "
"last failure: ",
p->subchannel_list_
->subchannel(p->subchannel_list_->num_subchannels())
->connectivity_status()
.ToString()));
p->subchannel_list_->subchannels_.back()
.connectivity_status_.ToString()));
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
} else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -400,7 +588,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
return;
}
// Enter idle.
p->selected_ = nullptr;
p->UnsetSelectedSubchannel();
p->subchannel_list_.reset();
p->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
@ -418,32 +606,33 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// select in place of the current one.
// If the subchannel is READY, use it.
if (new_state == GRPC_CHANNEL_READY) {
subchannel_list()->set_in_transient_failure(false);
subchannel_list_->in_transient_failure_ = false;
ProcessUnselectedReadyLocked();
return;
}
// If we haven't yet seen the initial connectivity state notification
// for all subchannels, do nothing.
if (!subchannel_list()->AllSubchannelsSeenInitialState()) return;
if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
// If we're still here and this is the initial connectivity state
// notification for this subchannel, that means it was the last one to
// see its initial notification. Start trying to connect, starting
// with the first subchannel.
if (!old_state.has_value()) {
subchannel_list()->subchannel(0)->ReactToConnectivityStateLocked();
subchannel_list_->subchannels_.front().ReactToConnectivityStateLocked();
return;
}
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list()->attempting_index()) return;
if (Index() != subchannel_list_->attempting_index_) return;
// React to the connectivity state.
ReactToConnectivityStateLocked();
}
void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
void PickFirst::SubchannelList::SubchannelData::
ReactToConnectivityStateLocked() {
PickFirst* p = subchannel_list_->policy_.get();
// Otherwise, process connectivity state.
switch (connectivity_state().value()) {
switch (connectivity_state_.value()) {
case GRPC_CHANNEL_READY:
// Already handled this case above, so this should not happen.
GPR_UNREACHABLE_CODE(break);
@ -451,13 +640,13 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() {
// Find the next subchannel not in state TRANSIENT_FAILURE.
// We skip subchannels in state TRANSIENT_FAILURE to avoid a
// large recursion that could overflow the stack.
PickFirstSubchannelData* found_subchannel = nullptr;
SubchannelData* found_subchannel = nullptr;
for (size_t next_index = Index() + 1;
next_index < subchannel_list()->num_subchannels(); ++next_index) {
PickFirstSubchannelData* sc = subchannel_list()->subchannel(next_index);
GPR_ASSERT(sc->connectivity_state().has_value());
if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list()->set_attempting_index(next_index);
next_index < subchannel_list_->size(); ++next_index) {
SubchannelData* sc = &subchannel_list_->subchannels_[next_index];
GPR_ASSERT(sc->connectivity_state_.has_value());
if (sc->connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list_->attempting_index_ = next_index;
found_subchannel = sc;
break;
}
@ -475,14 +664,14 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p failed to connect to "
"all subchannels",
p, subchannel_list());
p, subchannel_list_);
}
subchannel_list()->set_attempting_index(0);
subchannel_list()->set_in_transient_failure(true);
subchannel_list_->attempting_index_ = 0;
subchannel_list_->in_transient_failure_ = true;
// In case 2, swap to the new subchannel list. This means reporting
// TRANSIENT_FAILURE and dropping the existing (working) connection,
// but we can't ignore what the control plane has told us.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
@ -490,36 +679,38 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() {
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->selected_ = nullptr; // owned by p->subchannel_list_
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// If this is the current subchannel list (either because we were
// in case 1 or because we were in case 2 and just promoted it to
// be the current list), re-resolve and report new state.
if (subchannel_list() == p->subchannel_list_.get()) {
if (subchannel_list_ == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(
absl::StrCat("failed to connect to all addresses; last error: ",
connectivity_status().ToString()));
absl::Status status = absl::UnavailableError(absl::StrCat(
(p->omit_status_message_prefix_
? ""
: "failed to connect to all addresses; last error: "),
connectivity_status_.ToString()));
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
// If the first subchannel is already IDLE, trigger the next connection
// attempt immediately. Otherwise, we'll wait for it to report
// attempt immediately. Otherwise, we'll wait for it to report
// its own connectivity state change.
auto* subchannel0 = subchannel_list()->subchannel(0);
if (subchannel0->connectivity_state() == GRPC_CHANNEL_IDLE) {
subchannel0->subchannel()->RequestConnection();
auto& subchannel0 = subchannel_list_->subchannels_.front();
if (subchannel0.connectivity_state_ == GRPC_CHANNEL_IDLE) {
subchannel0.subchannel_->RequestConnection();
}
break;
}
case GRPC_CHANNEL_IDLE:
subchannel()->RequestConnection();
subchannel_->RequestConnection();
break;
case GRPC_CHANNEL_CONNECTING:
// Only update connectivity state in case 1, and only if we're not
// already in TRANSIENT_FAILURE.
if (subchannel_list() == p->subchannel_list_.get() &&
if (subchannel_list_ == p->subchannel_list_.get() &&
p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
@ -530,8 +721,8 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() {
}
}
void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = subchannel_list_->policy_.get();
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
@ -541,10 +732,10 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() ||
subchannel_list_ == p->latest_pending_subchannel_list_.get());
// Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
@ -556,16 +747,113 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
}
// Cases 1 and 2.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
subchannel_.get());
}
p->selected_ = this;
p->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
MakeRefCounted<Picker>(subchannel()->Ref()));
for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
// If health checking is enabled, start the health watch, but don't
// report a new picker -- we want to stay in CONNECTING while we wait
// for the health status notification.
// If health checking is NOT enabled, report READY.
if (p->enable_health_watch_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "[PF %p] starting health watch", p);
}
auto watcher = std::make_unique<HealthWatcher>(
p->Ref(DEBUG_LOCATION, "HealthWatcher"));
p->health_watcher_ = watcher.get();
auto health_data_watcher = MakeHealthCheckWatcher(
p->work_serializer(), subchannel_list_->args_, std::move(watcher));
p->health_data_watcher_ = health_data_watcher.get();
subchannel_->AddDataWatcher(std::move(health_data_watcher));
} else {
p->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
MakeRefCounted<Picker>(subchannel()->Ref()));
}
// Unref all other subchannels in the list.
for (size_t i = 0; i < subchannel_list_->size(); ++i) {
if (i != Index()) {
subchannel_list()->subchannel(i)->ShutdownLocked();
subchannel_list_->subchannels_[i].ShutdownLocked();
}
}
}
//
// PickFirst::SubchannelList
//
PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
ServerAddressList addresses,
const ChannelArgs& args)
: InternallyRefCounted<SubchannelList>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
: nullptr),
policy_(std::move(policy)),
args_(args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
.Remove(
GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] Creating subchannel list %p for %" PRIuPTR
" subchannels - channel args: %s",
policy_.get(), this, addresses.size(), args_.ToString().c_str());
}
subchannels_.reserve(addresses.size());
// Create a subchannel for each address.
for (const ServerAddress& address : addresses) {
RefCountedPtr<SubchannelInterface> subchannel =
policy_->channel_control_helper()->CreateSubchannel(address, args_);
if (subchannel == nullptr) {
// Subchannel could not be created.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] could not create subchannel for address %s, ignoring",
policy_.get(), address.ToString().c_str());
}
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address %s",
policy_.get(), this, subchannels_.size(), subchannel.get(),
address.ToString().c_str());
}
subchannels_.emplace_back(this, std::move(subchannel));
}
}
PickFirst::SubchannelList::~SubchannelList() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "[PF %p] Destroying subchannel_list %p", policy_.get(),
this);
}
}
void PickFirst::SubchannelList::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "[PF %p] Shutting down subchannel_list %p", policy_.get(),
this);
}
GPR_ASSERT(!shutting_down_);
shutting_down_ = true;
for (auto& sd : subchannels_) {
sd.ShutdownLocked();
}
Unref();
}
void PickFirst::SubchannelList::ResetBackoffLocked() {
for (auto& sd : subchannels_) {
sd.ResetBackoffLocked();
}
}
bool PickFirst::SubchannelList::AllSubchannelsSeenInitialState() {
for (auto& sd : subchannels_) {
if (!sd.connectivity_state().has_value()) return false;
}
return true;
}
//

@ -17,4 +17,20 @@
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_PICK_FIRST_PICK_FIRST_H
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_PICK_FIRST_PICK_FIRST_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/resolver/server_address.h"
// Internal channel arg to enable health checking in pick_first.
// Intended to be used by petiole policies (e.g., round_robin) that
// delegate to pick_first.
#define GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING \
GRPC_ARG_NO_SUBCHANNEL_PREFIX "pick_first_enable_health_checking"
// Internal channel arg to tell pick_first to omit the prefix it normally
// adds to error status messages. Intended to be used by petiole policies
// (e.g., round_robin) that want to add their own prefixes.
#define GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX \
GRPC_ARG_NO_SUBCHANNEL_PREFIX "pick_first_omit_status_message_prefix"
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_PICK_FIRST_PICK_FIRST_H

@ -33,12 +33,10 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
@ -257,8 +255,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the first address with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
// When the LB policy receives the subchannel's initial connectivity
// state notification (IDLE), it will request a connection.

@ -31,10 +31,8 @@
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/json.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -74,8 +72,7 @@ class PickFirstTest : public LoadBalancingPolicyTest {
// We will remove entries as each subchannel starts to connect.
std::map<SubchannelState*, absl::string_view> subchannels;
for (auto address : addresses) {
auto* subchannel = FindSubchannel(
address, ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel = FindSubchannel(address);
ASSERT_NE(subchannel, nullptr);
subchannels.emplace(subchannel, address);
}
@ -136,13 +133,10 @@ TEST_F(PickFirstTest, FirstAddressWorks) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
@ -172,13 +166,10 @@ TEST_F(PickFirstTest, FirstAddressFails) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
@ -216,28 +207,26 @@ TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) {
// LB policy gets the update.
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
auto* subchannel = CreateSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel = CreateSubchannel(kAddresses[0]);
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
auto* subchannel2 = CreateSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = CreateSubchannel(kAddresses[1]);
subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for all addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel3 = FindSubchannel(
kAddresses[2], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
// LB policy should have created a subchannel for all addresses.
auto* subchannel3 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel3, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (TRANSIENT_FAILURE), it will move on to the second
// subchannel. The second subchannel is in state IDLE, so the LB
// policy will request a connection attempt on it.
// subchannel. The second subchannel is also in state TRANSIENT_FAILURE,
// so the LB policy will move on to the third subchannel. That
// subchannel is in state IDLE, so the LB policy will request a connection
// attempt on it.
EXPECT_TRUE(subchannel3->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
@ -261,13 +250,11 @@ TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
// when the LB policy gets the update.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
auto* subchannel = CreateSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel = CreateSubchannel(kAddresses[0]);
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
auto* subchannel2 = CreateSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = CreateSubchannel(kAddresses[1]);
subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
@ -309,13 +296,11 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
// when the LB policy gets the update.
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
auto* subchannel = CreateSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel = CreateSubchannel(kAddresses[0]);
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
auto* subchannel2 = CreateSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = CreateSubchannel(kAddresses[1]);
subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
@ -342,9 +327,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should have created a subchannel for the new address.
auto* subchannel3 =
FindSubchannel(kAddresses2[1],
ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel3 = FindSubchannel(kAddresses2[1]);
ASSERT_NE(subchannel3, nullptr);
// The policy will ask it to connect.
EXPECT_TRUE(subchannel3->ConnectionRequested());
@ -368,13 +351,10 @@ TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
@ -432,13 +412,10 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(
kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.

Loading…
Cancel
Save