LB policy API: make pickers ref-counted (#31612)

* LB policy API: make pickers ref-counted

* fix build

* clang-tidy
pull/31564/head^2
Mark D. Roth 2 years ago committed by GitHub
parent d061903dab
commit 916a325b6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/client_channel.h
  3. 2
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  4. 12
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 28
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  6. 18
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  7. 50
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  8. 4
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  9. 12
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  10. 10
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  11. 53
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  12. 8
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  13. 32
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  14. 49
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  15. 6
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  16. 6
      src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
  17. 5
      src/core/lib/load_balancing/lb_policy.h
  18. 25
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  19. 2
      test/core/end2end/tests/retry_lb_drop.cc
  20. 24
      test/core/util/test_lb_policies.cc

@ -883,10 +883,9 @@ class ClientChannel::ClientChannelControlHelper
chand_, std::move(subchannel), std::move(health_check_service_name));
}
void UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
if (chand_->resolver_ == nullptr) return; // Shutting down.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
const char* extra = chand_->disconnect_error_.ok()
@ -1331,7 +1330,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) {
// Update connectivity state.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
std::make_unique<LoadBalancingPolicy::TransientFailurePicker>(status));
MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(status));
}
}
@ -1511,7 +1510,7 @@ void ClientChannel::CreateResolverLocked() {
GPR_ASSERT(resolver_ != nullptr);
UpdateStateAndPickerLocked(
GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
std::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
resolver_->StartLocked();
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
@ -1540,7 +1539,7 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() {
void ClientChannel::UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
// Special case for IDLE and SHUTDOWN states.
if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
saved_service_config_.reset();
@ -1717,7 +1716,7 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
disconnect_error_ = op->disconnect_with_error;
UpdateStateAndPickerLocked(
GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
std::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
grpc_error_to_absl_status(op->disconnect_with_error)));
}
}

@ -260,7 +260,7 @@ class ClientChannel {
void UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateServiceConfigInControlPlaneLocked(
@ -332,7 +332,7 @@ class ClientChannel {
// Fields used in the data plane. Guarded by data_plane_mu_.
//
mutable Mutex data_plane_mu_;
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(data_plane_mu_);
// Linked list of calls queued waiting for LB pick.
LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr;

@ -61,7 +61,7 @@ class ChildPolicyHandler::Helper
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
RefCountedPtr<SubchannelPicker> picker) override {
if (parent_->shutting_down_) return;
// If this request is from the pending child policy, ignore it until
// it reports something other than CONNECTING, at which point we swap it

@ -407,7 +407,7 @@ class GrpcLb : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<Serverlist> serverlist,
std::unique_ptr<SubchannelPicker> child_picker,
RefCountedPtr<SubchannelPicker> child_picker,
RefCountedPtr<GrpcLbClientStats> client_stats)
: serverlist_(std::move(serverlist)),
child_picker_(std::move(child_picker)),
@ -452,7 +452,7 @@ class GrpcLb : public LoadBalancingPolicy {
// Serverlist to be used for determining drops.
RefCountedPtr<Serverlist> serverlist_;
std::unique_ptr<SubchannelPicker> child_picker_;
RefCountedPtr<SubchannelPicker> child_picker_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
@ -464,7 +464,7 @@ class GrpcLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -816,7 +816,7 @@ RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (parent_->shutting_down_) return;
// Record whether child policy reports READY.
parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
@ -851,8 +851,8 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
}
parent_->channel_control_helper()->UpdateState(
state, status,
std::make_unique<Picker>(std::move(serverlist), std::move(picker),
std::move(client_stats)));
MakeRefCounted<Picker>(std::move(serverlist), std::move(picker),
std::move(client_stats)));
}
void GrpcLb::Helper::RequestReresolution() {

@ -305,28 +305,17 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
std::set<SubchannelWrapper*> subchannels_;
};
// A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> {
public:
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A picker that wraps the picker from the child to perform outlier detection.
class Picker : public SubchannelPicker {
public:
Picker(OutlierDetectionLb* outlier_detection_lb,
RefCountedPtr<RefCountedPicker> picker, bool counting_enabled);
RefCountedPtr<SubchannelPicker> picker, bool counting_enabled);
PickResult Pick(PickArgs args) override;
private:
class SubchannelCallTracker;
RefCountedPtr<RefCountedPicker> picker_;
RefCountedPtr<SubchannelPicker> picker_;
bool counting_enabled_;
};
@ -342,7 +331,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -396,7 +385,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<RefCountedPicker> picker_;
RefCountedPtr<SubchannelPicker> picker_;
std::map<std::string, RefCountedPtr<SubchannelState>> subchannel_state_map_;
OrphanablePtr<EjectionTimer> ejection_timer_;
};
@ -490,7 +479,7 @@ class OutlierDetectionLb::Picker::SubchannelCallTracker
//
OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
RefCountedPtr<RefCountedPicker> picker,
RefCountedPtr<SubchannelPicker> picker,
bool counting_enabled)
: picker_(std::move(picker)), counting_enabled_(counting_enabled) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
@ -682,7 +671,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
void OutlierDetectionLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto outlier_detection_picker =
std::make_unique<Picker>(this, picker_, config_->CountingEnabled());
MakeRefCounted<Picker>(this, picker_, config_->CountingEnabled());
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] updating connectivity: state=%s "
@ -743,7 +732,7 @@ RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
void OutlierDetectionLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (outlier_detection_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
@ -755,8 +744,7 @@ void OutlierDetectionLb::Helper::UpdateState(
// Save the state and picker.
outlier_detection_policy_->state_ = state;
outlier_detection_policy_->status_ = status;
outlier_detection_policy_->picker_ =
MakeRefCounted<RefCountedPicker>(std::move(picker));
outlier_detection_policy_->picker_ = std::move(picker);
// Wrap the picker and return it to the channel.
outlier_detection_policy_->MaybeUpdatePickerLocked();
}

@ -239,14 +239,14 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
: latest_update_args_.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
channel_control_helper()->RequestReresolution();
}
// Otherwise, if this is the initial update, report CONNECTING.
else if (subchannel_list_.get() == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
std::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
// If the new update is empty or we don't yet have a selected subchannel in
// the current list, replace the current subchannel list immediately.
@ -338,12 +338,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
.ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
} else {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
std::make_unique<QueuePicker>(
p->Ref(DEBUG_LOCATION, "QueuePicker")));
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
}
return;
}
@ -360,7 +359,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_.reset();
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
std::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return;
}
// If we get here, there are two possible cases:
@ -434,7 +433,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
connectivity_status().ToString()));
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
}
}
// If the next subchannel is in IDLE, trigger a connection attempt.
@ -460,8 +459,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
!subchannel_list()->in_transient_failure()) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
std::make_unique<QueuePicker>(
p->Ref(DEBUG_LOCATION, "QueuePicker")));
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
}
break;
}
@ -501,7 +499,7 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
p->selected_ = this;
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
std::make_unique<Picker>(subchannel()->Ref()));
MakeRefCounted<Picker>(subchannel()->Ref()));
for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
if (i != Index()) {
subchannel_list()->subchannel(i)->ShutdownLocked();

@ -149,7 +149,7 @@ class PriorityLb : public LoadBalancingPolicy {
void Orphan() override;
std::unique_ptr<SubchannelPicker> GetPicker();
RefCountedPtr<SubchannelPicker> GetPicker();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
@ -162,28 +162,6 @@ class PriorityLb : public LoadBalancingPolicy {
bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
private:
// A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> {
public:
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A non-ref-counted wrapper for RefCountedPicker.
class RefCountedPickerWrapper : public SubchannelPicker {
public:
explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
private:
RefCountedPtr<RefCountedPicker> picker_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<ChildPriority> priority)
@ -195,7 +173,7 @@ class PriorityLb : public LoadBalancingPolicy {
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -244,7 +222,7 @@ class PriorityLb : public LoadBalancingPolicy {
void OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker);
RefCountedPtr<SubchannelPicker> picker);
RefCountedPtr<PriorityLb> priority_policy_;
const std::string name_;
@ -254,7 +232,7 @@ class PriorityLb : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
absl::Status connectivity_status_;
RefCountedPtr<RefCountedPicker> picker_wrapper_;
RefCountedPtr<SubchannelPicker> picker_;
bool seen_ready_or_idle_since_transient_failure_ = true;
@ -422,7 +400,7 @@ void PriorityLb::ChoosePriorityLocked() {
absl::UnavailableError("priority policy has empty priority list");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
return;
}
// Iterate through priorities, searching for one in READY or IDLE,
@ -681,17 +659,17 @@ void PriorityLb::ChildPriority::Orphan() {
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
picker_.reset();
Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
}
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
PriorityLb::ChildPriority::GetPicker() {
if (picker_wrapper_ == nullptr) {
return std::make_unique<QueuePicker>(
if (picker_ == nullptr) {
return MakeRefCounted<QueuePicker>(
priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
}
return std::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
return picker_;
}
absl::Status PriorityLb::ChildPriority::UpdateLocked(
@ -760,7 +738,7 @@ void PriorityLb::ChildPriority::ResetBackoffLocked() {
void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
@ -776,9 +754,7 @@ void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
// TRANSIENT_FAILURE, but we have no new picker to report. In that case,
// just keep using the old picker, in case we wind up delegating to this
// child when all priorities are failing.
if (picker != nullptr) {
picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
}
if (picker != nullptr) picker_ = std::move(picker);
// If we transition to state CONNECTING and we've not seen
// TRANSIENT_FAILURE more recently than READY or IDLE, start failover
// timer if not already pending.
@ -833,7 +809,7 @@ PriorityLb::ChildPriority::Helper::CreateSubchannel(ServerAddress address,
void PriorityLb::ChildPriority::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (priority_->priority_policy_->shutting_down_) return;
// Notify the priority.
priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));

@ -659,7 +659,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
// Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState(
state, status,
std::make_unique<Picker>(Ref(DEBUG_LOCATION, "RingHashPicker")));
MakeRefCounted<Picker>(Ref(DEBUG_LOCATION, "RingHashPicker")));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
@ -846,7 +846,7 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
return status;
}
// Otherwise, report IDLE.

@ -333,7 +333,7 @@ class RlsLb : public LoadBalancingPolicy {
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -354,7 +354,7 @@ class RlsLb : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) =
GRPC_CHANNEL_IDLE;
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(&RlsLb::mu_);
};
@ -730,7 +730,7 @@ RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
: nullptr),
lb_policy_(lb_policy),
target_(std::move(target)),
picker_(std::make_unique<QueuePicker>(std::move(lb_policy))) {
picker_(MakeRefCounted<QueuePicker>(std::move(lb_policy))) {
lb_policy_->child_policy_map_.emplace(target_, this);
}
@ -811,7 +811,7 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() {
config.status().ToString().c_str());
}
pending_config_.reset();
picker_ = std::make_unique<TransientFailurePicker>(
picker_ = MakeRefCounted<TransientFailurePicker>(
absl::UnavailableError(config.status().message()));
child_policy_.reset();
} else {
@ -876,7 +876,7 @@ RlsLb::ChildPolicyWrapper::ChildPolicyHelper::CreateSubchannel(
void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO,
"[rlslb %p] ChildPolicyWrapper=%p [%s] ChildPolicyHelper=%p: "
@ -2147,7 +2147,7 @@ void RlsLb::UpdatePickerLocked() {
status = absl::UnavailableError("no children available");
}
channel_control_helper()->UpdateState(
state, status, std::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker")));
state, status, MakeRefCounted<Picker>(Ref(DEBUG_LOCATION, "Picker")));
}
//

@ -305,7 +305,7 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
return status;
}
// Otherwise, if this is the initial update, immediately promote it to
@ -314,7 +314,7 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
subchannel_list_ = std::move(latest_pending_subchannel_list_);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
std::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
return absl::OkStatus();
}
@ -386,7 +386,7 @@ void RoundRobin::RoundRobinSubchannelList::
this);
}
p->channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
std::make_unique<Picker>(p, this));
MakeRefCounted<Picker>(p, this));
} else if (num_connecting_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with subchannel list %p",
@ -394,7 +394,7 @@ void RoundRobin::RoundRobinSubchannelList::
}
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
std::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
} else if (num_transient_failure_ == num_subchannels()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
@ -408,7 +408,7 @@ void RoundRobin::RoundRobinSubchannelList::
}
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
std::make_unique<TransientFailurePicker>(last_failure_));
MakeRefCounted<TransientFailurePicker>(last_failure_));
}
}

@ -121,17 +121,6 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using stateless WRR and then delegates to that
// child's picker.
class WeightedPicker : public SubchannelPicker {
@ -141,7 +130,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
// range proportional to the child's weight. The start of the range
// is the previous value in the vector and is 0 for the first element.
using PickerList =
std::vector<std::pair<uint64_t, RefCountedPtr<ChildPickerWrapper>>>;
std::vector<std::pair<uint64_t, RefCountedPtr<SubchannelPicker>>>;
explicit WeightedPicker(PickerList pickers)
: pickers_(std::move(pickers)) {}
@ -173,9 +162,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
private:
class Helper : public ChannelControlHelper {
@ -189,7 +176,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -220,7 +207,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker);
RefCountedPtr<SubchannelPicker> picker);
// The owning LB policy.
RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
@ -231,7 +218,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
RefCountedPtr<SubchannelPicker> picker_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
@ -364,7 +351,7 @@ absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
"no children in weighted_target policy: ", args.resolution_note));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
return absl::OkStatus();
}
UpdateStateLocked();
@ -409,18 +396,19 @@ void WeightedTargetLb::UpdateStateLocked() {
if (config_->target_map().find(child_name) == config_->target_map().end()) {
continue;
}
auto child_picker = child->picker();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] child=%s state=%s weight=%u picker=%p",
this, child_name.c_str(),
ConnectivityStateName(child->connectivity_state()),
child->weight(), child->picker_wrapper().get());
child->weight(), child_picker.get());
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
GPR_ASSERT(child->weight() > 0);
ready_end += child->weight();
ready_picker_list.emplace_back(ready_end, child->picker_wrapper());
ready_picker_list.emplace_back(ready_end, std::move(child_picker));
break;
}
case GRPC_CHANNEL_CONNECTING: {
@ -434,7 +422,7 @@ void WeightedTargetLb::UpdateStateLocked() {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
GPR_ASSERT(child->weight() > 0);
tf_end += child->weight();
tf_picker_list.emplace_back(tf_end, child->picker_wrapper());
tf_picker_list.emplace_back(tf_end, std::move(child_picker));
break;
}
default:
@ -456,19 +444,18 @@ void WeightedTargetLb::UpdateStateLocked() {
gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s",
this, ConnectivityStateName(connectivity_state));
}
std::unique_ptr<SubchannelPicker> picker;
RefCountedPtr<SubchannelPicker> picker;
absl::Status status;
switch (connectivity_state) {
case GRPC_CHANNEL_READY:
picker = std::make_unique<WeightedPicker>(std::move(ready_picker_list));
picker = MakeRefCounted<WeightedPicker>(std::move(ready_picker_list));
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
picker =
std::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
picker = MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
picker = std::make_unique<WeightedPicker>(std::move(tf_picker_list));
picker = MakeRefCounted<WeightedPicker>(std::move(tf_picker_list));
}
channel_control_helper()->UpdateState(connectivity_state, status,
std::move(picker));
@ -555,7 +542,7 @@ void WeightedTargetLb::WeightedChild::Orphan() {
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
picker_.reset();
delayed_removal_timer_.reset();
Unref();
}
@ -635,16 +622,16 @@ void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
// Cache the picker in the WeightedChild.
picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(std::move(picker));
picker_ = std::move(picker);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: connectivity "
"state update: state=%s (%s) picker_wrapper=%p",
"state update: state=%s (%s) picker=%p",
weighted_target_policy_.get(), this, name_.c_str(),
ConnectivityStateName(state), status.ToString().c_str(),
picker_wrapper_.get());
picker_.get());
}
// If the child reports IDLE, immediately tell it to exit idle.
if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
@ -688,7 +675,7 @@ WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
void WeightedTargetLb::WeightedChild::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return;
weighted_child_->OnConnectivityStateUpdateLocked(state, status,
std::move(picker));

@ -171,7 +171,7 @@ class CdsLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -239,7 +239,7 @@ RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
void CdsLb::Helper::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] state updated by child: %s (%s)", this,
@ -590,7 +590,7 @@ void CdsLb::OnError(const std::string& name, absl::Status status) {
if (child_policy_ == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(absl::UnavailableError(
MakeRefCounted<TransientFailurePicker>(absl::UnavailableError(
absl::StrCat(name, ": ", status.ToString()))));
}
}
@ -604,7 +604,7 @@ void CdsLb::OnResourceDoesNotExist(const std::string& name) {
absl::StrCat("CDS resource \"", config_->cluster(), "\" does not exist"));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
MaybeDestroyChildPolicyLocked();
}

@ -210,22 +210,11 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
// A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> {
public:
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A picker that wraps the picker from the child to perform drops.
class Picker : public SubchannelPicker {
public:
Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker);
RefCountedPtr<SubchannelPicker> picker);
PickResult Pick(PickArgs args) override;
@ -236,7 +225,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<RefCountedPicker> picker_;
RefCountedPtr<SubchannelPicker> picker_;
};
class Helper : public ChannelControlHelper {
@ -251,7 +240,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -294,7 +283,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<RefCountedPicker> picker_;
RefCountedPtr<SubchannelPicker> picker_;
};
//
@ -367,7 +356,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker
//
XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker)
RefCountedPtr<SubchannelPicker> picker)
: call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_(
xds_cluster_impl_lb->config_->max_concurrent_requests()),
@ -532,7 +521,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
// If we're dropping all calls, report READY, regardless of what (or
// whether) the child has reported.
if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
auto drop_picker = std::make_unique<Picker>(this, picker_);
auto drop_picker = MakeRefCounted<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity (drop all): "
@ -545,7 +534,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
}
// Otherwise, update only if we have a child picker.
if (picker_ != nullptr) {
auto drop_picker = std::make_unique<Picker>(this, picker_);
auto drop_picker = MakeRefCounted<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity: state=%s "
@ -612,7 +601,7 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the EdsPicker.
// includes the locality stats object, which will be used by the Picker.
if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
.has_value()) {
RefCountedPtr<XdsLocalityName> locality_name;
@ -653,7 +642,7 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
void XdsClusterImplLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (xds_cluster_impl_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
@ -666,8 +655,7 @@ void XdsClusterImplLb::Helper::UpdateState(
// Save the state and picker.
xds_cluster_impl_policy_->state_ = state;
xds_cluster_impl_policy_->status_ = status;
xds_cluster_impl_policy_->picker_ =
MakeRefCounted<RefCountedPicker>(std::move(picker));
xds_cluster_impl_policy_->picker_ = std::move(picker);
// Wrap the picker and return it to the channel.
xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
}

@ -120,28 +120,13 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
ChildPickerWrapper(std::string name,
std::unique_ptr<SubchannelPicker> picker)
: name_(std::move(name)), picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
const std::string& name() const { return name_; }
private:
std::string name_;
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using prefix or path matching and then delegates to that
// child's picker.
class ClusterPicker : public SubchannelPicker {
public:
// Maintains a map of cluster names to pickers.
using ClusterMap = std::map<absl::string_view /*cluster_name*/,
RefCountedPtr<ChildPickerWrapper>>;
using ClusterMap =
std::map<std::string /*cluster_name*/, RefCountedPtr<SubchannelPicker>>;
// It is required that the keys of cluster_map have to live at least as long
// as the ClusterPicker instance.
@ -174,9 +159,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
private:
class Helper : public ChannelControlHelper {
@ -192,7 +175,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
EventEngine* GetEventEngine() override;
@ -217,7 +200,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
RefCountedPtr<SubchannelPicker> picker_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
// States for delayed removal.
@ -252,7 +235,7 @@ XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
args.call_state);
auto cluster_name =
call_state->GetCallAttribute(XdsClusterAttributeTypeName());
auto it = cluster_map_.find(cluster_name);
auto it = cluster_map_.find(std::string(cluster_name));
if (it != cluster_map_.end()) {
return it->second->Pick(args);
}
@ -395,8 +378,8 @@ void XdsClusterManagerLb::UpdateStateLocked() {
ClusterPicker::ClusterMap cluster_map;
for (const auto& p : config_->cluster_map()) {
const std::string& cluster_name = p.first;
RefCountedPtr<ChildPickerWrapper>& child_picker = cluster_map[cluster_name];
child_picker = children_[cluster_name]->picker_wrapper();
RefCountedPtr<SubchannelPicker>& child_picker = cluster_map[cluster_name];
child_picker = children_[cluster_name]->picker();
if (child_picker == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
@ -404,13 +387,11 @@ void XdsClusterManagerLb::UpdateStateLocked() {
"picker; creating a QueuePicker.",
this, cluster_name.c_str());
}
child_picker = MakeRefCounted<ChildPickerWrapper>(
cluster_name,
std::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
child_picker =
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
}
}
std::unique_ptr<SubchannelPicker> picker =
std::make_unique<ClusterPicker>(std::move(cluster_map));
auto picker = MakeRefCounted<ClusterPicker>(std::move(cluster_map));
absl::Status status;
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
status = absl::Status(absl::StatusCode::kUnavailable,
@ -461,7 +442,7 @@ void XdsClusterManagerLb::ClusterChild::Orphan() {
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
picker_.reset();
if (delayed_removal_timer_handle_.has_value()) {
xds_cluster_manager_policy_->channel_control_helper()
->GetEventEngine()
@ -587,7 +568,7 @@ XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(
GPR_INFO,
@ -601,9 +582,7 @@ void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
return;
}
// Cache the picker in the ClusterChild.
xds_cluster_manager_child_->picker_wrapper_ =
MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
std::move(picker));
xds_cluster_manager_child_->picker_ = std::move(picker);
// Decide what state to report for aggregation purposes.
// If the last recorded state was TRANSIENT_FAILURE and the new state
// is something other than READY, don't change the state.

@ -373,7 +373,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
// This is a no-op, because we get the addresses from the xds
// client, which is a watch-based API.
void RequestReresolution() override {}
@ -435,7 +435,7 @@ XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address,
void XdsClusterResolverLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (xds_cluster_resolver_policy_->shutting_down_ ||
xds_cluster_resolver_policy_->child_policy_ == nullptr) {
return;
@ -961,7 +961,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
"config");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
return nullptr;
}
return std::move(*config);

@ -127,7 +127,7 @@ class XdsWrrLocalityLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
@ -240,7 +240,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
child_config.status().ToString()));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(status));
return status;
}
// Create child policy if needed (i.e., on first update).
@ -295,7 +295,7 @@ RefCountedPtr<SubchannelInterface> XdsWrrLocalityLb::Helper::CreateSubchannel(
void XdsWrrLocalityLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
RefCountedPtr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_wrr_locality_lb_trace)) {
gpr_log(
GPR_INFO,

@ -256,10 +256,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Currently, pickers are always accessed from within the
/// client_channel data plane mutex, so they do not have to be
/// thread-safe.
class SubchannelPicker {
class SubchannelPicker : public RefCounted<SubchannelPicker> {
public:
SubchannelPicker() = default;
virtual ~SubchannelPicker() = default;
virtual PickResult Pick(PickArgs args) = 0;
};
@ -284,7 +283,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// by the client channel.
virtual void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker>) = 0;
RefCountedPtr<SubchannelPicker> picker) = 0;
/// Requests that the resolver re-resolve.
virtual void RequestReresolution() = 0;

@ -195,7 +195,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
struct StateUpdate {
grpc_connectivity_state state;
absl::Status status;
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
};
// Represents a re-resolution request from the LB policy.
@ -227,9 +227,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
work_serializer_);
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
picker) override {
void UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
MutexLock lock(&mu_);
queue_.push_back(StateUpdate{state, status, std::move(picker)});
}
@ -356,9 +356,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// not a state update; otherwise (if continue_predicate() tells us to
// stop) returns true.
bool WaitForStateUpdate(
std::function<
bool(grpc_connectivity_state, absl::Status,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>)>
std::function<bool(grpc_connectivity_state, absl::Status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>)>
continue_predicate,
SourceLocation location = SourceLocation()) {
while (true) {
@ -378,14 +377,14 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Expects that the LB policy has reported the specified connectivity
// state to helper_. Returns the picker from the state update.
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> ExpectState(
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectState(
grpc_connectivity_state expected_state,
absl::Status expected_status = absl::OkStatus(),
SourceLocation location = SourceLocation()) {
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> final_picker;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> final_picker;
WaitForStateUpdate(
[&](grpc_connectivity_state state, absl::Status status,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
EXPECT_EQ(state, expected_state)
<< "got " << ConnectivityStateName(state) << ", expected "
<< ConnectivityStateName(expected_state) << "\n"
@ -402,12 +401,12 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
// Waits for the LB policy to get connected.
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> WaitForConnected(
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> WaitForConnected(
SourceLocation location = SourceLocation()) {
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> final_picker;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> final_picker;
WaitForStateUpdate(
[&](grpc_connectivity_state state, absl::Status status,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
if (state == GRPC_CHANNEL_CONNECTING) {
EXPECT_TRUE(status.ok()) << status;
ExpectPickQueued(picker.get(), location);

@ -59,7 +59,7 @@ class DropPolicy : public LoadBalancingPolicy {
absl::Status UpdateLocked(UpdateArgs) override {
channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
std::make_unique<DropPicker>());
MakeRefCounted<DropPicker>());
return absl::OkStatus();
}

@ -113,7 +113,7 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
private:
class Picker : public SubchannelPicker {
public:
Picker(std::unique_ptr<SubchannelPicker> delegate_picker,
Picker(RefCountedPtr<SubchannelPicker> delegate_picker,
TestPickArgsCallback cb)
: delegate_picker_(std::move(delegate_picker)), cb_(std::move(cb)) {}
@ -128,7 +128,7 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
}
private:
std::unique_ptr<SubchannelPicker> delegate_picker_;
RefCountedPtr<SubchannelPicker> delegate_picker_;
TestPickArgsCallback cb_;
};
@ -144,9 +144,9 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
RefCountedPtr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState(
state, status, std::make_unique<Picker>(std::move(picker), cb_));
state, status, MakeRefCounted<Picker>(std::move(picker), cb_));
}
void RequestReresolution() override {
@ -231,7 +231,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
private:
class Picker : public SubchannelPicker {
public:
Picker(std::unique_ptr<SubchannelPicker> delegate_picker,
Picker(RefCountedPtr<SubchannelPicker> delegate_picker,
InterceptRecvTrailingMetadataCallback cb)
: delegate_picker_(std::move(delegate_picker)), cb_(std::move(cb)) {}
@ -248,7 +248,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
}
private:
std::unique_ptr<SubchannelPicker> delegate_picker_;
RefCountedPtr<SubchannelPicker> delegate_picker_;
InterceptRecvTrailingMetadataCallback cb_;
};
@ -266,9 +266,9 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
RefCountedPtr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState(
state, status, std::make_unique<Picker>(std::move(picker), cb_));
state, status, MakeRefCounted<Picker>(std::move(picker), cb_));
}
void RequestReresolution() override {
@ -381,7 +381,7 @@ class AddressTestLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
RefCountedPtr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
@ -500,7 +500,7 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
RefCountedPtr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
@ -621,7 +621,7 @@ class OobBackendMetricTestLoadBalancingPolicy
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
RefCountedPtr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
@ -693,7 +693,7 @@ class FailPolicy : public LoadBalancingPolicy {
absl::Status UpdateLocked(UpdateArgs) override {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status_,
std::make_unique<FailPicker>(status_, pick_counter_));
MakeRefCounted<FailPicker>(status_, pick_counter_));
return absl::OkStatus();
}

Loading…
Cancel
Save