[client channel] assume LB policies start in CONNECTING state (#33009)

Currently, we are not very consistent in what we assume the initial
state of an LB policy will be and whether or not we assume that it will
immediately report a new picker when it gets its initial address update;
different parts of our code make different assumptions. This PR
establishes the convention that LB policies will be assumed to start in
state CONNECTING and will *not* be assumed to report a new picker
immediately upon getting their initial address update, and we now assume
that convention everywhere consistently.

This is a preparatory step for changing policies like round_robin to
delegate to pick_first, which I'm working on in #32692. As part of that
change, we need pick_first to not report a connectivity state until it
actually sees the connectivity state of the underlying subchannels, so
that round_robin knows when to swap over to a new child list without
reintroducing the problem fixed in #31939.
pull/33059/head
Mark D. Roth 2 years ago committed by GitHub
parent dc95133140
commit 17315823c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 87
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/client_channel.h
  3. 6
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 8
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  5. 5
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
  7. 4
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  8. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  9. 2
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  10. 13
      src/core/lib/load_balancing/lb_policy.h
  11. 4
      test/core/client_channel/lb_policy/pick_first_test.cc

@ -1347,11 +1347,8 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
// Otherwise, we go into TRANSIENT_FAILURE.
if (lb_policy_ == nullptr) {
// Update connectivity state.
// TODO(roth): We should be updating the connectivity state here but
// not the picker.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(status));
UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
"resolver failure");
{
MutexLock lock(&resolution_mu_);
// Update resolver transient failure.
@ -1395,6 +1392,14 @@ absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
// Creates a new LB policy.
OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
const ChannelArgs& args) {
// The LB policy will start in state CONNECTING but will not
// necessarily send us an update synchronously, so set state to
// CONNECTING (in case the resolver had previously failed and put the
// channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
// Now create the LB policy.
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer_;
lb_policy_args.channel_control_helper =
@ -1495,13 +1500,8 @@ void ClientChannel::CreateResolverLocked() {
// Since the validity of the args was checked when the channel was created,
// CreateResolver() must return a non-null result.
GPR_ASSERT(resolver_ != nullptr);
// TODO(roth): We should be updating the connectivity state here but
// not the picker. But we need to make sure that we are initializing
// the picker to a queueing picker somewhere, in case the LB policy
// does not immediately return a new picker.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
"started resolving");
resolver_->StartLocked();
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
@ -1515,24 +1515,7 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() {
resolver_.get());
}
resolver_.reset();
if (lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties_);
lb_policy_.reset();
}
}
}
void ClientChannel::UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
// Special case for IDLE and SHUTDOWN states.
if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
// Clear resolution state.
saved_service_config_.reset();
saved_config_selector_.reset();
// Acquire resolution lock to update config selector and associated state.
@ -1548,8 +1531,22 @@ void ClientChannel::UpdateStateAndPickerLocked(
config_selector_to_unref = std::move(config_selector_);
dynamic_filters_to_unref = std::move(dynamic_filters_);
}
// Clear LB policy if set.
if (lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties_);
lb_policy_.reset();
}
}
// Update connectivity state.
}
void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
const absl::Status& status,
const char* reason) {
state_tracker_.SetState(state, status, reason);
if (channelz_node_ != nullptr) {
channelz_node_->SetConnectivityState(state);
@ -1559,19 +1556,24 @@ void ClientChannel::UpdateStateAndPickerLocked(
channelz::ChannelNode::GetChannelConnectivityStateChangeString(
state)));
}
}
void ClientChannel::UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
UpdateStateLocked(state, status, reason);
// Grab the LB lock to update the picker and trigger reprocessing of the
// queued picks.
// Old picker will be unreffed after releasing the lock.
{
MutexLock lock(&lb_mu_);
picker_.swap(picker);
// Reprocess queued picks.
for (LoadBalancedCall* call : lb_queued_calls_) {
call->RemoveCallFromLbQueuedCallsLocked();
call->RetryPickLocked();
}
lb_queued_calls_.clear();
MutexLock lock(&lb_mu_);
picker_.swap(picker);
// Reprocess queued picks.
for (LoadBalancedCall* call : lb_queued_calls_) {
call->RemoveCallFromLbQueuedCallsLocked();
call->RetryPickLocked();
}
lb_queued_calls_.clear();
}
namespace {
@ -1685,10 +1687,13 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
StatusIntProperty::ChannelConnectivityState,
&value) &&
static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
if (disconnect_error_.ok()) {
if (disconnect_error_.ok()) { // Ignore if we're shutting down.
// Enter IDLE state.
UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
"channel entering IDLE", nullptr);
// TODO(roth): Do we need to check for any queued picks here, in
// case there's a race condition in the client_idle filter?
// And maybe also check for calls in the resolver queue?
}
} else {
// Disconnect.

@ -247,6 +247,10 @@ class ClientChannel {
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateStateLocked(grpc_connectivity_state state,
const absl::Status& status, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,

@ -240,12 +240,6 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
MakeRefCounted<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
}
// Otherwise, if this is the initial update, report CONNECTING.
else if (subchannel_list_.get() == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
// If the new update is empty or we don't yet have a selected subchannel in
// the current list, replace the current subchannel list immediately.
if (latest_pending_subchannel_list_->num_subchannels() == 0 ||

@ -357,7 +357,7 @@ class RlsLb : public LoadBalancingPolicy {
RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
GRPC_CHANNEL_IDLE;
GRPC_CHANNEL_CONNECTING;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(&RlsLb::mu_);
};
@ -732,9 +732,9 @@ RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
: DualRefCounted<ChildPolicyWrapper>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper"
: nullptr),
lb_policy_(lb_policy),
lb_policy_(std::move(lb_policy)),
target_(std::move(target)),
picker_(MakeRefCounted<QueuePicker>(std::move(lb_policy))) {
picker_(MakeRefCounted<QueuePicker>(nullptr)) {
lb_policy_->child_policy_map_.emplace(target_, this);
}
@ -895,6 +895,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
{
MutexLock lock(&wrapper_->lb_policy_->mu_);
if (wrapper_->is_shutdown_) return;
// TODO(roth): It looks like this ignores subsequent TF updates that
// might change the status used to fail picks, which seems wrong.
if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
state != GRPC_CHANNEL_READY) {
return;

@ -318,12 +318,9 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
return status;
}
// Otherwise, if this is the initial update, immediately promote it to
// subchannel_list_ and report CONNECTING.
// subchannel_list_.
if (subchannel_list_.get() == nullptr) {
subchannel_list_ = std::move(latest_pending_subchannel_list_);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
return absl::OkStatus();
}

@ -729,12 +729,9 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return status;
}
// Otherwise, if this is the initial update, immediately promote it to
// subchannel_list_ and report CONNECTING.
// subchannel_list_.
if (subchannel_list_.get() == nullptr) {
subchannel_list_ = std::move(latest_pending_subchannel_list_);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
return absl::OkStatus();
}

@ -519,7 +519,9 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
WeightedTargetLb::WeightedChild::WeightedChild(
RefCountedPtr<WeightedTargetLb> weighted_target_policy,
const std::string& name)
: weighted_target_policy_(std::move(weighted_target_policy)), name_(name) {
: weighted_target_policy_(std::move(weighted_target_policy)),
name_(name),
picker_(MakeRefCounted<QueuePicker>(nullptr)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
weighted_target_policy_.get(), this, name_.c_str());

@ -200,7 +200,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<SubchannelPicker> picker_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
// States for delayed removal.
absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
@ -409,7 +409,8 @@ XdsClusterManagerLb::ClusterChild::ClusterChild(
RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
const std::string& name)
: xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
name_(name) {
name_(name),
picker_(MakeRefCounted<QueuePicker>(nullptr)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] created ClusterChild %p for %s",

@ -319,7 +319,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
absl::Status status_;
RefCountedPtr<SubchannelPicker> picker_;
Mutex subchannel_map_mu_;

@ -366,6 +366,19 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// whether the LB policy accepted the update; if non-OK, informs
/// polling-based resolvers that they should go into backoff delay and
/// eventually reattempt the resolution.
///
/// The first time that UpdateLocked() is called, the LB policy will
/// generally not be able to determine the appropriate connectivity
/// state by the time UpdateLocked() returns (e.g., it will need to
/// wait for connectivity state notifications from each subchannel,
/// which will be delivered asynchronously). In this case, the LB
/// policy should not call the helper's UpdateState() method until it
/// does have a clear picture of the connectivity state (e.g., it
/// should wait for all subchannels to report connectivity state
/// before calling the helper's UpdateState() method), although it is
/// expected to do so within some short period of time. The parent of
/// the LB policy will assume that the policy's initial state is
/// CONNECTING and that picks should be queued.
virtual absl::Status UpdateLocked(UpdateArgs) = 0; // NOLINT
/// Tries to enter a READY connectivity state.

@ -46,8 +46,6 @@ TEST_F(PickFirstTest, Basic) {
absl::Status status =
ApplyUpdate(BuildUpdate({kAddressUri}), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// LB policy should have created a subchannel for the address with the
// GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
@ -58,6 +56,8 @@ TEST_F(PickFirstTest, Basic) {
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// When the subchannel becomes connected, it reports READY.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't

Loading…
Cancel
Save