Make SubchannelList internally ref counted.

reviewable/pr14886/r6
Mark D. Roth 7 years ago
parent 88832144f4
commit 757cd41055
  1. 44
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 30
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  3. 43
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@ -98,9 +98,9 @@ class PickFirst : public LoadBalancingPolicy {
void DestroyUnselectedSubchannelsLocked();
// All our subchannels.
RefCountedPtr<PickFirstSubchannelList> subchannel_list_;
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Have we started picking?
@ -160,14 +160,8 @@ void PickFirst::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
if (subchannel_list_ != nullptr) {
subchannel_list_->ShutdownLocked("pf_shutdown");
subchannel_list_.reset();
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ShutdownLocked("pf_shutdown");
latest_pending_subchannel_list_.reset();
}
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@ -300,7 +294,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
auto subchannel_list = MakeRefCounted<PickFirstSubchannelList>(
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
client_channel_factory(), args);
if (subchannel_list->num_subchannels() == 0) {
@ -310,9 +304,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
if (subchannel_list_ != nullptr) {
subchannel_list_->ShutdownLocked("sl_shutdown_empty_update");
}
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
@ -320,9 +311,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
if (subchannel_list_ != nullptr) {
subchannel_list_->ShutdownLocked("pf_update_before_selected");
}
subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
@ -347,20 +335,13 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
sd->SetConnectedSubchannelFromLocked(selected_);
}
selected_ = sd;
if (subchannel_list_ != nullptr) {
subchannel_list_->ShutdownLocked("pf_update_includes_selected");
}
subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
sd->StartOrRenewConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ShutdownLocked(
"pf_update_includes_selected+outdated");
latest_pending_subchannel_list_.reset();
}
latest_pending_subchannel_list_.reset();
return;
}
}
@ -376,7 +357,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
this, latest_pending_subchannel_list_.get(),
subchannel_list.get());
}
latest_pending_subchannel_list_->ShutdownLocked("sl_outdated_dont_smash");
}
latest_pending_subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
@ -404,8 +384,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
}
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(p->subchannel_list_ == subchannel_list() ||
p->latest_pending_subchannel_list_ == subchannel_list());
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
// If the new state is anything other than READY and there is a
@ -414,7 +394,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr;
StopConnectivityWatchLocked();
subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
@ -460,9 +439,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
if (p->latest_pending_subchannel_list_ == subchannel_list()) {
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
GPR_ASSERT(p->subchannel_list_ != nullptr);
p->subchannel_list_->ShutdownLocked("finish_update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
@ -502,7 +480,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
} while (sd->subchannel() == nullptr);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && p->subchannel_list_ == subchannel_list()) {
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
@ -513,7 +491,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (p->subchannel_list_ == subchannel_list()) {
if (subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");

@ -174,13 +174,13 @@ class RoundRobin : public LoadBalancingPolicy {
void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
/** list of subchannels */
RefCountedPtr<RoundRobinSubchannelList> subchannel_list_;
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
@ -303,14 +303,8 @@ void RoundRobin::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
if (subchannel_list_ != nullptr) {
subchannel_list_->ShutdownLocked("rr_shutdown");
subchannel_list_.reset();
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ShutdownLocked("rr_shutdown");
latest_pending_subchannel_list_.reset();
}
subchannel_list_.reset();
latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@ -487,7 +481,7 @@ void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_ != this) return;
if (p->subchannel_list_.get() != this) return;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@ -523,12 +517,12 @@ void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
if (num_ready_ > 0) {
if (p->subchannel_list_ != this) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
// This list must be p->latest_pending_subchannel_list_, because
// any previous update would have been shut down already and
// therefore weeded out in ProcessConnectivityChangeLocked().
GPR_ASSERT(p->latest_pending_subchannel_list_ == this);
GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
GPR_ASSERT(!shutting_down());
if (grpc_lb_round_robin_trace.enabled()) {
const size_t old_num_subchannels =
@ -541,10 +535,6 @@ void RoundRobin::RoundRobinSubchannelList::
p, p->subchannel_list_.get(), old_num_subchannels, this,
num_subchannels());
}
if (p->subchannel_list_ != nullptr) {
// Dispose of the current subchannel_list.
p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown");
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
p->last_ready_subchannel_index_ = -1;
}
@ -652,9 +642,8 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
"[RR %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_->ShutdownLocked("sl_outdated");
}
latest_pending_subchannel_list_ = MakeRefCounted<RoundRobinSubchannelList>(
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, addresses, combiner(),
client_channel_factory(), args);
// If we haven't started picking yet or the new list is empty,
@ -667,9 +656,6 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
}
if (subchannel_list_ != nullptr) {
subchannel_list_->ShutdownLocked("sl_shutdown_replace_on_update");
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
last_ready_subchannel_index_ = -1;
} else {

@ -31,6 +31,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
@ -118,6 +119,7 @@ class SubchannelData {
pending_connectivity_state_unsafe_ =
grpc_subchannel_check_connectivity(subchannel(), &error);
UpdateConnectedSubchannelLocked();
// FIXME: move the rest of this into RR
if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) {
curr_connectivity_state_ = pending_connectivity_state_unsafe_;
ProcessConnectivityChangeLocked(error);
@ -148,7 +150,7 @@ class SubchannelData {
void CancelConnectivityWatchLocked(const char* reason);
// Cancels any pending connectivity watch and unrefs the subchannel.
void ShutdownLocked(const char* reason);
void ShutdownLocked();
GRPC_ABSTRACT_BASE_CLASS
@ -199,11 +201,9 @@ class SubchannelData {
};
// A list of subchannels.
// FIXME: make this InternallyRefCounted, and have Orphan() do
// ShutdownLocked()?
// (also, maybe we don't need to take a ref to the LB policy anymore?)
template <typename SubchannelListType, typename SubchannelDataType>
class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
class SubchannelList
: public InternallyRefCountedWithTracing<SubchannelListType> {
public:
typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
@ -213,9 +213,6 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
// The data for the subchannel at a particular index.
SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
// Marks the subchannel_list as discarded. Unsubscribes all its subchannels.
void ShutdownLocked(const char* reason);
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
@ -223,6 +220,13 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
// Note: Caller must ensure that this is invoked inside of the combiner.
void Orphan() override {
ShutdownLocked();
InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION,
"shutdown");
}
GRPC_ABSTRACT_BASE_CLASS
protected:
@ -238,6 +242,11 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
template <typename T, typename... Args>
friend T* New(Args&&... args);
// For accessing Ref() and Unref().
friend class SubchannelData<SubchannelListType, SubchannelDataType>;
void ShutdownLocked();
// Backpointer to owning policy.
LoadBalancingPolicy* policy_;
@ -430,15 +439,14 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked(
const char* reason) {
void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
// If there's a pending notification for this subchannel, cancel it;
// the callback is responsible for unreffing the subchannel.
// Otherwise, unref the subchannel directly.
if (connectivity_notification_pending_) {
CancelConnectivityWatchLocked(reason);
CancelConnectivityWatchLocked("shutdown");
} else if (subchannel_ != nullptr) {
UnrefSubchannelLocked(reason);
UnrefSubchannelLocked("shutdown");
}
}
@ -452,7 +460,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: RefCountedWithTracing<SubchannelListType>(tracer),
: InternallyRefCountedWithTracing<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer) {
if (tracer_->enabled()) {
@ -518,17 +526,16 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked(
const char* reason) {
void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
if (tracer_->enabled()) {
gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
tracer_->name(), policy_, this, reason);
gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p",
tracer_->name(), policy_, this);
}
GPR_ASSERT(!shutting_down_);
shutting_down_ = true;
for (size_t i = 0; i < subchannels_.size(); i++) {
SubchannelDataType* sd = &subchannels_[i];
sd->ShutdownLocked(reason);
sd->ShutdownLocked();
}
}

Loading…
Cancel
Save