client_channel: remove synchronous subchannel connectivity state API (#28339)

* ring_hash: don't recreate ring when individual subchannel states change

* client_channel: remove synchronous subchannel connectivity state API

* change subchannel list to automatically start watching all subchannels

* use a separate loop to start watches, so list size is logged correctly

* fix RR to re-resolve on IDLE again

* fix ring_hash to delay promoting new subchannel list

* fix pick_first to wait for all subchannels to report state

* clean up SubchannelList API

* fix unused argument error

* fix another unused argument error

* clang-format

* fix RR to not re-resolve on initial IDLE state

* also don't re-resolve in initial TF state; same for ring_hash

* clang-format

* change RR and PF to initially report CONNECTING, and add second loop to priority policy

* simplify priority logic a bit

* fix grpclb to drop ref to stats object even if the subchannel call is never started

* fix memory leak in ring_hash

* fix tsan failure in grpclb code

* iwyu

* add missing BUILD deps

* update outlier_detection policy

* fix test

* fix pick_first to not report TF prematurely due to subchannel sharing

* fix test to not depend on timing
pull/29891/head
Mark D. Roth 3 years ago committed by GitHub
parent d53986657f
commit e49c61cd2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 24
      src/core/ext/filters/client_channel/client_channel.cc
  3. 38
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 1
      src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc
  5. 39
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  6. 176
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  7. 123
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  8. 234
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  9. 200
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  10. 146
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  11. 27
      src/core/ext/filters/client_channel/subchannel.cc
  12. 14
      src/core/ext/filters/client_channel/subchannel.h
  13. 15
      src/core/ext/filters/client_channel/subchannel_interface.h
  14. 163
      test/cpp/end2end/client_lb_end2end_test.cc
  15. 3
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  16. 19
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -4129,6 +4129,7 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
tags = ["grpc-autodeps"],
@ -4203,6 +4204,7 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
tags = ["grpc-autodeps"],

@ -505,21 +505,15 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
}
grpc_connectivity_state CheckConnectivityState() override {
return subchannel_->CheckConnectivityState(health_check_service_name_);
}
void WatchConnectivityState(
grpc_connectivity_state initial_state,
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
auto& watcher_wrapper = watcher_map_[watcher.get()];
GPR_ASSERT(watcher_wrapper == nullptr);
watcher_wrapper = new WatcherWrapper(std::move(watcher),
Ref(DEBUG_LOCATION, "WatcherWrapper"),
initial_state);
Ref(DEBUG_LOCATION, "WatcherWrapper"));
subchannel_->WatchConnectivityState(
initial_state, health_check_service_name_,
health_check_service_name_,
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
watcher_wrapper));
}
@ -578,11 +572,8 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
WatcherWrapper(
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher,
RefCountedPtr<SubchannelWrapper> parent,
grpc_connectivity_state initial_state)
: watcher_(std::move(watcher)),
parent_(std::move(parent)),
last_seen_state_(initial_state) {}
RefCountedPtr<SubchannelWrapper> parent)
: watcher_(std::move(watcher)), parent_(std::move(parent)) {}
~WatcherWrapper() override {
auto* parent = parent_.release(); // ref owned by lambda
@ -619,14 +610,11 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
}
WatcherWrapper* MakeReplacement() {
auto* replacement =
new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
auto* replacement = new WatcherWrapper(std::move(watcher_), parent_);
replacement_ = replacement;
return replacement;
}
grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
private:
void ApplyUpdateInControlPlaneWorkSerializer()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
@ -668,7 +656,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
// Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled.
if (watcher_ != nullptr) {
last_seen_state_ = state_change.state;
watcher_->OnConnectivityStateChange(state_change.state);
}
}
@ -676,7 +663,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
RefCountedPtr<SubchannelWrapper> parent_;
grpc_connectivity_state last_seen_state_;
WatcherWrapper* replacement_ = nullptr;
};

@ -382,6 +382,39 @@ class GrpcLb : public LoadBalancingPolicy {
PickResult Pick(PickArgs args) override;
private:
// A subchannel call tracker that unrefs the GrpcLbClientStats object
// in the case where the subchannel call is never actually started,
// since the client load reporting filter will not be able to do it
// in that case.
class SubchannelCallTracker : public SubchannelCallTrackerInterface {
public:
SubchannelCallTracker(
RefCountedPtr<GrpcLbClientStats> client_stats,
std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker)
: client_stats_(std::move(client_stats)),
original_call_tracker_(std::move(original_call_tracker)) {}
void Start() override {
if (original_call_tracker_ != nullptr) {
original_call_tracker_->Start();
}
// If we're actually starting the subchannel call, then the
// client load reporting filter will take ownership of the ref
// passed down to it via metadata.
client_stats_.release();
}
void Finish(FinishArgs args) override {
if (original_call_tracker_ != nullptr) {
original_call_tracker_->Finish(args);
}
}
private:
RefCountedPtr<GrpcLbClientStats> client_stats_;
std::unique_ptr<SubchannelCallTrackerInterface> original_call_tracker_;
};
// Serverlist to be used for determining drops.
RefCountedPtr<Serverlist> serverlist_;
@ -694,7 +727,10 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// client_load_reporting filter.
GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
if (client_stats != nullptr) {
client_stats->Ref().release(); // Ref passed via metadata.
complete_pick->subchannel_call_tracker =
absl::make_unique<SubchannelCallTracker>(
client_stats->Ref(),
std::move(complete_pick->subchannel_call_tracker));
// The metadata value is a hack: we pretend the pointer points to
// a string and rely on the client_load_reporting filter to know
// how to interpret it.

@ -295,7 +295,6 @@ OrcaProducer::OrcaProducer(RefCountedPtr<Subchannel> subchannel)
auto connectivity_watcher = MakeRefCounted<ConnectivityWatcher>(WeakRef());
connectivity_watcher_ = connectivity_watcher.get();
subchannel_->WatchConnectivityState(
connected_subchannel_ == nullptr ? GRPC_CHANNEL_IDLE : GRPC_CHANNEL_READY,
/*health_check_service_name=*/absl::nullopt,
std::move(connectivity_watcher));
}

@ -154,10 +154,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Uneject();
grpc_connectivity_state CheckConnectivityState() override;
void WatchConnectivityState(
grpc_connectivity_state initial_state,
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override;
void CancelConnectivityStateWatch(
@ -174,30 +171,32 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
WatcherWrapper(std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher,
grpc_connectivity_state initial_state, bool ejected)
: watcher_(std::move(watcher)),
last_seen_state_(initial_state),
ejected_(ejected) {}
bool ejected)
: watcher_(std::move(watcher)), ejected_(ejected) {}
void Eject() {
ejected_ = true;
if (last_seen_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (last_seen_state_.has_value() &&
*last_seen_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
watcher_->OnConnectivityStateChange(GRPC_CHANNEL_TRANSIENT_FAILURE);
}
}
void Uneject() {
ejected_ = false;
if (last_seen_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
watcher_->OnConnectivityStateChange(last_seen_state_);
if (last_seen_state_.has_value() &&
*last_seen_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
watcher_->OnConnectivityStateChange(*last_seen_state_);
}
}
void OnConnectivityStateChange(
grpc_connectivity_state new_state) override {
const bool send_update = !last_seen_state_.has_value() || !ejected_;
last_seen_state_ = new_state;
if (!ejected_) {
watcher_->OnConnectivityStateChange(new_state);
if (send_update) {
watcher_->OnConnectivityStateChange(
ejected_ ? GRPC_CHANNEL_TRANSIENT_FAILURE : new_state);
}
}
@ -208,7 +207,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
private:
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
grpc_connectivity_state last_seen_state_;
absl::optional<grpc_connectivity_state> last_seen_state_;
bool ejected_;
};
@ -418,21 +417,13 @@ void OutlierDetectionLb::SubchannelWrapper::Uneject() {
}
}
grpc_connectivity_state
OutlierDetectionLb::SubchannelWrapper::CheckConnectivityState() {
if (ejected_) return GRPC_CHANNEL_TRANSIENT_FAILURE;
return wrapped_subchannel()->CheckConnectivityState();
}
void OutlierDetectionLb::SubchannelWrapper::WatchConnectivityState(
grpc_connectivity_state initial_state,
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
ConnectivityStateWatcherInterface* watcher_ptr = watcher.get();
auto watcher_wrapper = absl::make_unique<WatcherWrapper>(
std::move(watcher), initial_state, ejected_);
auto watcher_wrapper =
absl::make_unique<WatcherWrapper>(std::move(watcher), ejected_);
watchers_.emplace(watcher_ptr, watcher_wrapper.get());
wrapped_subchannel()->WatchConnectivityState(initial_state,
std::move(watcher_wrapper));
wrapped_subchannel()->WatchConnectivityState(std::move(watcher_wrapper));
}
void OutlierDetectionLb::SubchannelWrapper::CancelConnectivityStateWatch(

@ -28,6 +28,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -87,12 +88,11 @@ class PickFirst : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
// Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked();
void CheckConnectivityStateAndStartWatchingLocked();
};
class PickFirstSubchannelList
@ -111,6 +111,9 @@ class PickFirst : public LoadBalancingPolicy {
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Note that we do not start trying to connect to any subchannel here,
// since we will wait until we see the initial connectivity state for all
// subchannels before doing that.
}
~PickFirstSubchannelList() override {
@ -123,8 +126,19 @@ class PickFirst : public LoadBalancingPolicy {
in_transient_failure_ = in_transient_failure;
}
size_t attempting_index() const { return attempting_index_; }
void set_attempting_index(size_t index) { attempting_index_ = index; }
bool AllSubchannelsSeenInitialState() {
for (size_t i = 0; i < num_subchannels(); ++i) {
if (!subchannel(i)->connectivity_state().has_value()) return false;
}
return true;
}
private:
bool in_transient_failure_ = false;
size_t attempting_index_ = 0;
};
class Picker : public SubchannelPicker {
@ -205,14 +219,18 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
if (latest_update_args_.addresses.ok()) {
addresses = *latest_update_args_.addresses;
}
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
// Replace latest_pending_subchannel_list_.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO,
"[PF %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<PickFirstSubchannelList>(
this, std::move(addresses), *latest_update_args_.args);
// Empty update or no valid subchannels.
if (subchannel_list->num_subchannels() == 0) {
// Unsubscribe from all current subchannels.
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
// Put the channel in TRANSIENT_FAILURE.
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
absl::Status status =
latest_update_args_.addresses.ok()
? absl::UnavailableError(absl::StrCat(
@ -221,65 +239,24 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
return;
}
// If one of the subchannels in the new list is already in state
// READY, then select it immediately. This can happen when the
// currently selected subchannel is also present in the update. It
// can also happen if one of the subchannels in the update is already
// in the global subchannel pool because it's in use by another channel.
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
grpc_connectivity_state state = sd->CheckConnectivityStateLocked();
if (state == GRPC_CHANNEL_READY) {
subchannel_list_ = std::move(subchannel_list);
sd->StartConnectivityWatchLocked();
sd->ProcessUnselectedReadyLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
return;
}
// Otherwise, if this is the initial update, report CONNECTING.
else if (subchannel_list_.get() == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
subchannel_list_ = std::move(subchannel_list);
// If we're not in IDLE state, start trying to connect to the first
// subchannel in the new list.
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
subchannel_list_->subchannel(0)->subchannel()->RequestConnection();
} else {
// We do have a selected subchannel (which means it's READY), so keep
// using it until one of the subchannels in the new list reports READY.
if (latest_pending_subchannel_list_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
this, latest_pending_subchannel_list_.get(),
subchannel_list.get());
}
// If the new update is empty or we don't yet have a selected subchannel in
// the current list, replace the current subchannel list immediately.
if (latest_pending_subchannel_list_->num_subchannels() == 0 ||
selected_ == nullptr) {
selected_ = nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[PF %p] Shutting down previous subchannel list %p",
this, subchannel_list_.get());
}
latest_pending_subchannel_list_ = std::move(subchannel_list);
// If we're not in IDLE state, start trying to connect to the first
// subchannel in the new list.
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked();
latest_pending_subchannel_list_->subchannel(0)
->subchannel()
->RequestConnection();
subchannel_list_ = std::move(latest_pending_subchannel_list_);
}
}
@ -316,26 +293,24 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p selected subchannel connectivity changed to %s", p,
ConnectivityStateName(connectivity_state));
ConnectivityStateName(new_state));
}
// We might miss a connectivity state update between calling
// CheckConnectivityStateLocked() and StartConnectivityWatchLocked().
// If the new state is READY, just ignore it; otherwise, regardless of
// what state it is, we treat it as a failure of the existing connection.
if (connectivity_state == GRPC_CHANNEL_READY) return;
// Any state change is considered to be a failure of the existing
// connection.
// If there is a pending update, switch to the pending update.
if (p->latest_pending_subchannel_list_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
@ -346,8 +321,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_.get());
}
p->selected_ = nullptr;
CancelConnectivityWatchLocked(
"selected subchannel failed; switching to pending update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
@ -386,18 +359,37 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
subchannel_list()->set_in_transient_failure(false);
ProcessUnselectedReadyLocked();
break;
// If the subchannel is READY, use it.
if (new_state == GRPC_CHANNEL_READY) {
subchannel_list()->set_in_transient_failure(false);
ProcessUnselectedReadyLocked();
return;
}
// If this is the initial connectivity state notification for this
// subchannel, check to see if it's the last one we were waiting for,
// in which case we start trying to connect to the first subchannel.
// Otherwise, do nothing, since we'll continue to wait until all of
// the subchannels report their state.
if (!old_state.has_value()) {
if (subchannel_list()->AllSubchannelsSeenInitialState()) {
subchannel_list()->subchannel(0)->subchannel()->RequestConnection();
}
return;
}
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list()->attempting_index()) return;
// Otherwise, process connectivity state.
switch (new_state) {
case GRPC_CHANNEL_READY:
// Already handled this case above, so this should not happen.
GPR_UNREACHABLE_CODE(break);
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_IDLE: {
CancelConnectivityWatchLocked("connection attempt failed");
PickFirstSubchannelData* sd = this;
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
subchannel_list()->set_attempting_index(next_index);
sd = subchannel_list()->subchannel(next_index);
// If we're tried all subchannels, set state to TRANSIENT_FAILURE.
if (sd->Index() == 0) {
@ -434,7 +426,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
absl::make_unique<TransientFailurePicker>(status));
}
}
sd->CheckConnectivityStateAndStartWatchingLocked();
sd->subchannel()->RequestConnection();
break;
}
case GRPC_CHANNEL_CONNECTING: {
@ -493,24 +485,6 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
}
}
void PickFirst::PickFirstSubchannelData::
CheckConnectivityStateAndStartWatchingLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// Check current state.
grpc_connectivity_state current_state = CheckConnectivityStateLocked();
// Start watch.
StartConnectivityWatchLocked();
// If current state is READY, select the subchannel now, since we started
// watching from this state and will not get a notification of it
// transitioning into this state.
// If the current state is not READY, attempt to connect.
if (current_state == GRPC_CHANNEL_READY) {
if (p->selected_ != this) ProcessUnselectedReadyLocked();
} else {
subchannel()->RequestConnection();
}
}
class PickFirstConfig : public LoadBalancingPolicy::Config {
public:
const char* name() const override { return kPickFirst; }

@ -135,9 +135,7 @@ class PriorityLb : public LoadBalancingPolicy {
void Orphan() override;
std::unique_ptr<SubchannelPicker> GetPicker() {
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
}
std::unique_ptr<SubchannelPicker> GetPicker();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
@ -267,18 +265,16 @@ class PriorityLb : public LoadBalancingPolicy {
// Iterates through the list of priorities to choose one:
// - If the child for a priority doesn't exist, creates it.
// - If a child's failover timer is pending, returns without selecting
// a priority while we wait for the child to attempt to connect. In
// this case, if report_connecting is true, reports CONNECTING state to
// the channel.
// - If the child is connected, it will be used as the current priority.
// - If a child's failover timer is pending, selects that priority
// while we wait for the child to attempt to connect.
// - If the child is connected, selects that priority.
// - Otherwise, continues on to the next child.
// Reports TRANSIENT_FAILURE to the channel if all children are not
// connected.
// Delegates to the last child if none of the children are connecting.
// Reports TRANSIENT_FAILURE if the priority list is empty.
//
// This method is idempotent; it should yield the same result every
// time as a function of the state of the children.
void ChoosePriorityLocked(bool report_connecting);
void ChoosePriorityLocked();
// Sets the specified priority as the current priority.
// Deactivates any children at lower priorities.
@ -362,12 +358,11 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
// Save current child.
if (current_priority_ != UINT32_MAX) {
const std::string& child_name = config_->priorities()[current_priority_];
current_child_from_before_update_ = children_[child_name].get();
// Unset current_priority_, since it was an index into the old
// config's priority list and may no longer be valid. It will be
// reset later by ChoosePriorityLocked(), but we unset it here in
// case updating any of our children triggers a state update.
current_priority_ = UINT32_MAX;
auto* child = children_[child_name].get();
GPR_ASSERT(child != nullptr);
if (child->connectivity_state() == GRPC_CHANNEL_READY) {
current_child_from_before_update_ = children_[child_name].get();
}
}
// Update config.
config_ = std::move(args.config);
@ -394,7 +389,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
}
update_in_progress_ = false;
// Try to get connected.
ChoosePriorityLocked(/*report_connecting=*/children_.empty());
ChoosePriorityLocked();
}
uint32_t PriorityLb::GetChildPriorityLocked(
@ -437,7 +432,7 @@ void PriorityLb::HandleChildConnectivityStateChangeLocked(
// properly select between CONNECTING and TRANSIENT_FAILURE as the
// new state to report to our parent.
current_child_from_before_update_ = nullptr;
ChoosePriorityLocked(/*report_connecting=*/true);
ChoosePriorityLocked();
}
return;
}
@ -451,8 +446,7 @@ void PriorityLb::HandleChildConnectivityStateChangeLocked(
}
// Unconditionally call ChoosePriorityLocked(). It should do the
// right thing based on the state of all children.
ChoosePriorityLocked(
/*report_connecting=*/child_priority == current_priority_);
ChoosePriorityLocked();
}
void PriorityLb::DeleteChild(ChildPriority* child) {
@ -463,12 +457,24 @@ void PriorityLb::DeleteChild(ChildPriority* child) {
// new state to report to our parent.
if (current_child_from_before_update_ == child) {
current_child_from_before_update_ = nullptr;
ChoosePriorityLocked(/*report_connecting=*/true);
ChoosePriorityLocked();
}
children_.erase(child->name());
}
void PriorityLb::ChoosePriorityLocked(bool report_connecting) {
void PriorityLb::ChoosePriorityLocked() {
// If priority list is empty, report TF.
if (config_->priorities().empty()) {
current_child_from_before_update_ = nullptr;
absl::Status status =
absl::UnavailableError("priority policy has empty priority list");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return;
}
// Iterate through priorities, searching for one in READY or IDLE,
// creating new children as needed.
current_priority_ = UINT32_MAX;
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
@ -480,11 +486,18 @@ void PriorityLb::ChoosePriorityLocked(bool report_connecting) {
}
auto& child = children_[child_name];
if (child == nullptr) {
if (report_connecting) {
// If we're not still using an old child from before the last
// update, report CONNECTING here.
// This is probably not strictly necessary, since the child should
// immediately report CONNECTING and cause us to report that state
// anyway, but we do this just in case the child fails to report
// state before UpdateLocked() returns.
if (current_child_from_before_update_ != nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
current_priority_ = priority;
child = MakeOrphanable<ChildPriority>(
Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
auto child_config = config_->children().find(child_name);
@ -510,10 +523,13 @@ void PriorityLb::ChoosePriorityLocked(bool report_connecting) {
"attempting to connect, will wait",
this, priority, child_name.c_str());
}
if (report_connecting) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
current_priority_ = priority;
// If we're not still using an old child from before the last
// update, report CONNECTING here.
if (current_child_from_before_update_ != nullptr) {
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
}
return;
}
@ -526,18 +542,44 @@ void PriorityLb::ChoosePriorityLocked(bool report_connecting) {
ConnectivityStateName(child->connectivity_state()));
}
}
// If there are no more priorities to try, report TRANSIENT_FAILURE.
// If we didn't find any priority to try, pick the first one in state
// CONNECTING.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] no priority reachable, putting channel in "
"TRANSIENT_FAILURE",
"[priority_lb %p] no priority reachable, checking for CONNECTING "
"priority to delegate to",
this);
}
current_child_from_before_update_ = nullptr;
absl::Status status = absl::UnavailableError("no ready priority");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
// If the child for the priority does not exist yet, create it.
const std::string& child_name = config_->priorities()[priority];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
priority, child_name.c_str());
}
auto& child = children_[child_name];
GPR_ASSERT(child != nullptr);
if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
return;
}
}
// Did not find any child in CONNECTING, delegate to last child.
const std::string& child_name = config_->priorities().back();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] no priority in CONNECTING, delegating to "
"lowest priority child %s",
this, child_name.c_str());
}
auto& child = children_[child_name];
GPR_ASSERT(child != nullptr);
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
}
void PriorityLb::SetCurrentPriorityLocked(uint32_t priority) {
@ -719,6 +761,15 @@ void PriorityLb::ChildPriority::Orphan() {
Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
}
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
PriorityLb::ChildPriority::GetPicker() {
if (picker_wrapper_ == nullptr) {
return absl::make_unique<QueuePicker>(
priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
}
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
}
void PriorityLb::ChildPriority::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests) {

@ -171,22 +171,18 @@ class RingHash : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, address, std::move(subchannel)),
address_(address) {}
grpc_connectivity_state GetConnectivityState() const {
return connectivity_state_for_picker_.load(std::memory_order_relaxed);
}
const ServerAddress& address() const { return address_; }
// Performs connectivity state updates that need to be done both when we
// first start watching and when a watcher notification is received.
void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
grpc_connectivity_state GetConnectivityState() const {
return connectivity_state_.load(std::memory_order_relaxed);
}
private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
ServerAddress address_;
@ -195,12 +191,8 @@ class RingHash : public LoadBalancingPolicy {
// subchannel in some cases; for example, once this is set to
// TRANSIENT_FAILURE, we do not change it again until we get READY,
// so we skip any interim stops in CONNECTING.
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
// Connectivity state seen by picker.
// Uses an atomic so that it can be accessed outside of the WorkSerializer.
std::atomic<grpc_connectivity_state> connectivity_state_for_picker_{
GRPC_CHANNEL_IDLE};
std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
};
// A list of subchannels.
@ -214,7 +206,9 @@ class RingHash : public LoadBalancingPolicy {
? "RingHashSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
args),
num_idle_(num_subchannels()),
ring_(MakeRefCounted<Ring>(policy, Ref(DEBUG_LOCATION, "Ring"))) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -222,13 +216,11 @@ class RingHash : public LoadBalancingPolicy {
}
~RingHashSubchannelList() override {
ring_.reset(DEBUG_LOCATION, "~RingHashSubchannelList");
RingHash* p = static_cast<RingHash*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
// Starts watching the subchannels in this list.
void StartWatchingLocked();
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
@ -241,18 +233,30 @@ class RingHash : public LoadBalancingPolicy {
// UpdateRingHashConnectivityStateLocked().
// connection_attempt_complete is true if the subchannel just
// finished a connection attempt.
void UpdateRingHashConnectivityStateLocked(
size_t index, bool connection_attempt_complete);
// Create a new ring from this subchannel list.
RefCountedPtr<Ring> MakeRing();
void UpdateRingHashConnectivityStateLocked(size_t index,
bool connection_attempt_complete,
absl::Status status);
private:
size_t num_idle_ = 0;
bool AllSubchannelsSeenInitialState() {
for (size_t i = 0; i < num_subchannels(); ++i) {
if (!subchannel(i)->connectivity_state().has_value()) return false;
}
return true;
}
void ShutdownLocked() override {
ring_.reset(DEBUG_LOCATION, "RingHashSubchannelList::ShutdownLocked()");
SubchannelList::ShutdownLocked();
}
size_t num_idle_;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
RefCountedPtr<Ring> ring_;
// The index of the subchannel currently doing an internally
// triggered connection attempt, if any.
absl::optional<size_t> internally_triggered_connection_index_;
@ -334,11 +338,9 @@ class RingHash : public LoadBalancingPolicy {
// list of subchannels.
OrphanablePtr<RingHashSubchannelList> subchannel_list_;
OrphanablePtr<RingHashSubchannelList> latest_pending_subchannel_list_;
// indicating if we are shutting down.
bool shutdown_ = false;
// Current ring.
RefCountedPtr<Ring> ring_;
};
//
@ -559,31 +561,8 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
// RingHash::RingHashSubchannelList
//
void RingHash::RingHashSubchannelList::StartWatchingLocked() {
GPR_ASSERT(num_subchannels() != 0);
// Check current state of each subchannel synchronously.
for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked();
subchannel(i)->UpdateConnectivityStateLocked(state);
}
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
}
}
// Send updated state to parent based on reported subchannel states.
// Pretend we're getting this update from the last subchannel, so that
// if we need to proactively start connecting, we'll start from the
// first subchannel.
UpdateRingHashConnectivityStateLocked(num_subchannels() - 1,
/*connection_attempt_complete=*/false);
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_idle_ > 0);
--num_idle_;
@ -597,6 +576,7 @@ void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (new_state == GRPC_CHANNEL_IDLE) {
++num_idle_;
} else if (new_state == GRPC_CHANNEL_READY) {
@ -609,8 +589,19 @@ void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
}
void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
size_t index, bool connection_attempt_complete) {
size_t index, bool connection_attempt_complete, absl::Status status) {
RingHash* p = static_cast<RingHash*>(policy());
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ as soon as we get the initial connectivity state
// report for every subchannel in the list.
if (p->latest_pending_subchannel_list_.get() == this &&
AllSubchannelsSeenInitialState()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] replacing subchannel list %p with %p", p,
p->subchannel_list_.get(), this);
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return;
// The overall aggregation rules here are:
@ -626,13 +617,11 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
//
// We set start_connection_attempt to true if we match rules 2, 3, or 6.
grpc_connectivity_state state;
absl::Status status;
bool start_connection_attempt = false;
if (num_ready_ > 0) {
state = GRPC_CHANNEL_READY;
} else if (num_transient_failure_ >= 2) {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError("connections to backends failing");
start_connection_attempt = true;
} else if (num_connecting_ > 0) {
state = GRPC_CHANNEL_CONNECTING;
@ -643,15 +632,16 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
state = GRPC_CHANNEL_IDLE;
} else {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError("connections to backends failing");
start_connection_attempt = true;
}
// Pass along status only in TRANSIENT_FAILURE.
if (state != GRPC_CHANNEL_TRANSIENT_FAILURE) status = absl::OkStatus();
// Generate new picker and return it to the channel.
// Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState(
state, status,
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
p->ring_));
ring_));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
@ -690,18 +680,15 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
}
}
RefCountedPtr<RingHash::Ring> RingHash::RingHashSubchannelList::MakeRing() {
RingHash* p = static_cast<RingHash*>(policy());
return MakeRefCounted<Ring>(p, Ref(DEBUG_LOCATION, "Ring"));
}
//
// RingHash::RingHashSubchannelData
//
void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
grpc_connectivity_state last_connectivity_state = GetConnectivityState();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(
GPR_INFO,
@ -709,53 +696,43 @@ void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
ConnectivityStateName(last_connectivity_state_),
ConnectivityStateName(connectivity_state));
}
// Decide what state to report for the purposes of aggregation and
// picker behavior.
// If the last recorded state was TRANSIENT_FAILURE, ignore the update
// unless the new state is READY.
if (last_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
connectivity_state != GRPC_CHANNEL_READY) {
return;
ConnectivityStateName(last_connectivity_state),
ConnectivityStateName(new_state));
}
// Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state);
// Update state seen by picker.
connectivity_state_for_picker_.store(connectivity_state,
std::memory_order_relaxed);
// Update last seen connectivity state.
last_connectivity_state_ = connectivity_state;
}
void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If this is not the initial state notification and the new state is
// TRANSIENT_FAILURE or IDLE, re-resolve and attempt to reconnect.
// Note that we don't want to do this on the initial state
// notification, because that would result in an endless loop of
// re-resolution.
if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
new_state == GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
p, subchannel());
"[RH %p] Subchannel %p reported %s; requesting re-resolution", p,
subchannel(), ConnectivityStateName(new_state));
}
p->channel_control_helper()->RequestReresolution();
}
// Update state counters.
UpdateConnectivityStateLocked(connectivity_state);
const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING;
// Decide what state to report for the purposes of aggregation and
// picker behavior.
// If the last recorded state was TRANSIENT_FAILURE, ignore the update
// unless the new state is READY.
if (last_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE &&
new_state != GRPC_CHANNEL_READY) {
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
}
// Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state,
new_state);
// Update last seen state, also used by picker.
connectivity_state_.store(new_state, std::memory_order_relaxed);
// Update the RH policy's connectivity state, creating new picker and new
// ring.
bool connection_attempt_complete =
connectivity_state != GRPC_CHANNEL_CONNECTING;
subchannel_list()->UpdateRingHashConnectivityStateLocked(
Index(), connection_attempt_complete);
Index(), connection_attempt_complete,
absl::UnavailableError("connections to backends failing"));
}
//
@ -773,6 +750,7 @@ RingHash::~RingHash() {
gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
}
void RingHash::ShutdownLocked() {
@ -781,10 +759,15 @@ void RingHash::ShutdownLocked() {
}
shutdown_ = true;
subchannel_list_.reset();
ring_.reset(DEBUG_LOCATION, "RingHash");
latest_pending_subchannel_list_.reset();
}
void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
void RingHash::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
void RingHash::UpdateLocked(UpdateArgs args) {
config_ = std::move(args.config);
@ -813,22 +796,41 @@ void RingHash::UpdateLocked(UpdateArgs args) {
// failure and keep using the existing list.
if (subchannel_list_ != nullptr) return;
}
subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
this, std::move(addresses), *args.args);
if (subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately transition to TRANSIENT_FAILURE.
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
} else {
// Build the ring.
ring_ = subchannel_list_->MakeRing();
// Start watching the new list.
subchannel_list_->StartWatchingLocked();
// If we have no existing list or the new list is empty, immediately
// promote the new list.
// Otherwise, do nothing; the new list will be promoted when the
// initial subchannel states are reported.
if (subchannel_list_ == nullptr ||
latest_pending_subchannel_list_->num_subchannels() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
subchannel_list_ != nullptr) {
gpr_log(GPR_INFO,
"[RH %p] empty address list, replacing subchannel list %p", this,
subchannel_list_.get());
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
// If the new list is empty, report TRANSIENT_FAILURE.
if (subchannel_list_->num_subchannels() == 0) {
absl::Status status =
args.addresses.ok()
? absl::UnavailableError(
absl::StrCat("empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
} else {
// Otherwise, report IDLE.
subchannel_list_->UpdateRingHashConnectivityStateLocked(
/*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
}
}
}

@ -28,6 +28,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -89,25 +90,28 @@ class RoundRobin : public LoadBalancingPolicy {
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
grpc_connectivity_state connectivity_state() const {
absl::optional<grpc_connectivity_state> connectivity_state() const {
return logical_connectivity_state_;
}
// Computes and updates the logical connectivity state of the subchannel.
// Note that the logical connectivity state may differ from the
// actual reported state in some cases (e.g., after we see
// TRANSIENT_FAILURE, we ignore any subsequent state changes until
// we see READY). Returns true if the state changed.
bool UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override;
// Updates the logical connectivity state. Returns true if the
// state has changed.
bool UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE;
// The logical connectivity state of the subchannel.
// Note that the logical connectivity state may differ from the
// actual reported state in some cases (e.g., after we see
// TRANSIENT_FAILURE, we ignore any subsequent state changes until
// we see READY).
absl::optional<grpc_connectivity_state> logical_connectivity_state_;
};
// A list of subchannels.
@ -127,6 +131,10 @@ class RoundRobin : public LoadBalancingPolicy {
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
// Start connecting to all subchannels.
for (size_t i = 0; i < num_subchannels(); i++) {
subchannel(i)->subchannel()->RequestConnection();
}
}
~RoundRobinSubchannelList() override {
@ -134,13 +142,11 @@ class RoundRobin : public LoadBalancingPolicy {
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
// Starts watching the subchannels in this list.
void StartWatchingLocked(absl::Status status_for_tf);
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state);
void UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state);
// Ensures that the right subchannel list is used and then updates
// the RR policy's connectivity state based on the subchannel list's
@ -197,7 +203,8 @@ RoundRobin::Picker::Picker(RoundRobin* parent,
: parent_(parent) {
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
RoundRobinSubchannelData* sd = subchannel_list->subchannel(i);
if (sd->connectivity_state() == GRPC_CHANNEL_READY) {
if (sd->connectivity_state().value_or(GRPC_CHANNEL_IDLE) ==
GRPC_CHANNEL_READY) {
subchannels_.push_back(sd->subchannel()->Ref());
}
}
@ -285,55 +292,54 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, std::move(addresses), *args.args);
// Start watching the new list. If appropriate, this will cause it to be
// immediately promoted to subchannel_list_ and to generate a new picker.
latest_pending_subchannel_list_->StartWatchingLocked(
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status());
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[RR %p] replacing previous subchannel list %p", this,
subchannel_list_.get());
}
subchannel_list_ = std::move(latest_pending_subchannel_list_);
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
}
// Otherwise, if this is the initial update, immediately promote it to
// subchannel_list_ and report CONNECTING.
else if (subchannel_list_.get() == nullptr) {
subchannel_list_ = std::move(latest_pending_subchannel_list_);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
}
//
// RoundRobinSubchannelList
//
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked(
absl::Status status_for_tf) {
// Check current state of each subchannel synchronously, since any
// subchannel already used by some other channel may have a non-IDLE
// state.
for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked();
if (state != GRPC_CHANNEL_IDLE) {
subchannel(i)->UpdateLogicalConnectivityStateLocked(state);
}
}
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
subchannel(i)->subchannel()->RequestConnection();
void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
if (old_state.has_value()) {
GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN);
if (*old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (*old_state == GRPC_CHANNEL_CONNECTING) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
}
// Update RR connectivity state if needed.
MaybeUpdateRoundRobinConnectivityStateLocked(status_for_tf);
}
void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (old_state == GRPC_CHANNEL_CONNECTING) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING) {
@ -348,17 +354,13 @@ void RoundRobin::RoundRobinSubchannelList::
RoundRobin* p = static_cast<RoundRobin*>(policy());
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ in the following cases:
// - subchannel_list_ is null (i.e., this is the first update).
// - subchannel_list_ has no READY subchannels.
// - This list has at least one READY subchannel.
// - All of the subchannels in this list are in TRANSIENT_FAILURE, or
// the list is empty. (This may cause the channel to go from READY
// to TRANSIENT_FAILURE, but we're doing what the control plane told
// us to do.
// - All of the subchannels in this list are in TRANSIENT_FAILURE.
// (This may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we're doing what the control plane told us to do.)
if (p->latest_pending_subchannel_list_.get() == this &&
(p->subchannel_list_ == nullptr || p->subchannel_list_->num_ready_ == 0 ||
num_ready_ > 0 ||
// Note: num_transient_failure_ and num_subchannels() may both be 0.
(p->subchannel_list_->num_ready_ == 0 || num_ready_ > 0 ||
num_transient_failure_ == num_subchannels())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
const std::string old_counters_string =
@ -409,6 +411,34 @@ void RoundRobin::RoundRobinSubchannelList::
// RoundRobinSubchannelData
//
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If this is not the initial state notification and the new state is
// TRANSIENT_FAILURE or IDLE, re-resolve and attempt to reconnect.
// Note that we don't want to do this on the initial state
// notification, because that would result in an endless loop of
// re-resolution.
if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
new_state == GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p reported %s; requesting re-resolution", p,
subchannel(), ConnectivityStateName(new_state));
}
p->channel_control_helper()->RequestReresolution();
subchannel()->RequestConnection();
}
// Update logical connectivity state.
// If it changed, update the policy state.
if (UpdateLogicalConnectivityStateLocked(new_state)) {
subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
absl::UnavailableError("connections to all backends failing"));
}
}
bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
@ -419,13 +449,16 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
ConnectivityStateName(logical_connectivity_state_),
(logical_connectivity_state_.has_value()
? ConnectivityStateName(*logical_connectivity_state_)
: "N/A"),
ConnectivityStateName(connectivity_state));
}
// Decide what state to report for aggregation purposes.
// If the last logical state was TRANSIENT_FAILURE, then ignore the
// state change unless the new state is READY.
if (logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
if (logical_connectivity_state_.has_value() &&
*logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
connectivity_state != GRPC_CHANNEL_READY) {
return false;
}
@ -442,7 +475,10 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
connectivity_state = GRPC_CHANNEL_CONNECTING;
}
// If no change, return false.
if (logical_connectivity_state_ == connectivity_state) return false;
if (logical_connectivity_state_.has_value() &&
*logical_connectivity_state_ == connectivity_state) {
return false;
}
// Otherwise, update counters and logical state.
subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
connectivity_state);
@ -450,34 +486,6 @@ bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
return true;
}
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE or IDLE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
// Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
connectivity_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p reported %s; requesting re-resolution", p,
subchannel(), ConnectivityStateName(connectivity_state));
}
p->channel_control_helper()->RequestReresolution();
subchannel()->RequestConnection();
}
// Update logical connectivity state.
// If it changed, update the policy state.
if (UpdateLogicalConnectivityStateLocked(connectivity_state)) {
subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
absl::UnavailableError("connections to all backends failing"));
}
}
//
// factory
//

@ -1,20 +1,18 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
@ -29,6 +27,7 @@
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -55,7 +54,8 @@ class MySubchannelData
: public SubchannelData<MySubchannelList, MySubchannelData> {
public:
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override {
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) override {
// ...code to handle connectivity changes...
}
};
@ -93,29 +93,14 @@ class SubchannelData {
// Returns a pointer to the subchannel.
SubchannelInterface* subchannel() const { return subchannel_.get(); }
// Synchronously checks the subchannel's connectivity state.
// Must not be called while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() and
// calling CancelConnectivityWatchLocked()).
grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(pending_watcher_ == nullptr);
connectivity_state_ = subchannel_->CheckConnectivityState();
// Returns the cached connectivity state, if any.
absl::optional<grpc_connectivity_state> connectivity_state() {
return connectivity_state_;
}
// Resets the connection backoff.
// TODO(roth): This method should go away when we move the backoff
// code out of the subchannel and into the LB policies.
void ResetBackoffLocked();
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called whenever the
// connectivity state changes.
void StartConnectivityWatchLocked();
// Cancels watching the connectivity state of the subchannel.
void CancelConnectivityWatchLocked(const char* reason);
// Cancels any pending connectivity watch and unrefs the subchannel.
void ShutdownLocked();
@ -127,13 +112,17 @@ class SubchannelData {
virtual ~SubchannelData();
// After StartConnectivityWatchLocked() is called, this method will be
// invoked whenever the subchannel's connectivity state changes.
// To stop watching, use CancelConnectivityWatchLocked().
// This method will be invoked once soon after instantiation to report
// the current connectivity state, and it will then be invoked again
// whenever the connectivity state changes.
virtual void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) = 0;
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) = 0;
private:
// For accessing StartConnectivityWatchLocked().
friend class SubchannelList<SubchannelListType, SubchannelDataType>;
// Watcher for subchannel connectivity state.
class Watcher
: public SubchannelInterface::ConnectivityStateWatcherInterface {
@ -159,6 +148,14 @@ class SubchannelData {
RefCountedPtr<SubchannelListType> subchannel_list_;
};
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called whenever the
// connectivity state changes.
void StartConnectivityWatchLocked();
// Cancels watching the connectivity state of the subchannel.
void CancelConnectivityWatchLocked(const char* reason);
// Unrefs the subchannel.
void UnrefSubchannelLocked(const char* reason);
@ -170,7 +167,7 @@ class SubchannelData {
SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
nullptr;
// Data updated by the watcher.
grpc_connectivity_state connectivity_state_;
absl::optional<grpc_connectivity_state> connectivity_state_;
};
// A list of subchannels.
@ -198,8 +195,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
const char* tracer() const { return tracer_; }
// Resets connection backoff of all subchannels.
// TODO(roth): We will probably need to rethink this as part of moving
// the backoff code out of subchannels and into LB policies.
void ResetBackoffLocked();
void Orphan() override {
@ -215,12 +210,12 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
virtual ~SubchannelList();
virtual void ShutdownLocked();
private:
// For accessing Ref() and Unref().
friend class SubchannelData<SubchannelListType, SubchannelDataType>;
void ShutdownLocked();
// Backpointer to owning policy.
LoadBalancingPolicy* policy_;
@ -247,22 +242,28 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(grpc_connectivity_state new_state) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: state=%s, "
"shutting_down=%d, pending_watcher=%p",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_.get(), subchannel_data_->Index(),
subchannel_list_->num_subchannels(),
subchannel_data_->subchannel_.get(),
ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
subchannel_data_->pending_watcher_);
gpr_log(
GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
"shutting_down=%d, pending_watcher=%p",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_.get(), subchannel_data_->Index(),
subchannel_list_->num_subchannels(),
subchannel_data_->subchannel_.get(),
(subchannel_data_->connectivity_state_.has_value()
? ConnectivityStateName(*subchannel_data_->connectivity_state_)
: "N/A"),
ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
subchannel_data_->pending_watcher_);
}
if (!subchannel_list_->shutting_down() &&
subchannel_data_->pending_watcher_ != nullptr) {
absl::optional<grpc_connectivity_state> old_state =
subchannel_data_->connectivity_state_;
subchannel_data_->connectivity_state_ = new_state;
// Call the subclass's ProcessConnectivityChangeLocked() method.
subchannel_data_->ProcessConnectivityChangeLocked(new_state);
subchannel_data_->ProcessConnectivityChangeLocked(old_state, new_state);
}
}
@ -275,11 +276,7 @@ SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& /*address*/,
RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list),
subchannel_(std::move(subchannel)),
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
connectivity_state_(GRPC_CHANNEL_IDLE) {}
: subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {}
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
@ -316,16 +313,15 @@ void SubchannelData<SubchannelListType,
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): starting watch (from %s)",
" (subchannel %p): starting watch",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get(), ConnectivityStateName(connectivity_state_));
subchannel_.get());
}
GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ =
new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState(
connectivity_state_,
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
pending_watcher_));
}
@ -333,15 +329,15 @@ void SubchannelData<SubchannelListType,
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::
CancelConnectivityWatchLocked(const char* reason) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): canceling connectivity watch (%s)",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get(), reason);
}
if (pending_watcher_ != nullptr) {
if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): canceling connectivity watch (%s)",
subchannel_list_->tracer(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get(), reason);
}
subchannel_->CancelConnectivityStateWatch(pending_watcher_);
pending_watcher_ = nullptr;
}
@ -349,7 +345,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
CancelConnectivityWatchLocked("shutdown");
UnrefSubchannelLocked("shutdown");
}
@ -395,6 +391,10 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
subchannels_.emplace_back();
subchannels_.back().Init(this, std::move(address), std::move(subchannel));
}
// Start watching subchannel connectivity state.
for (auto& sd : subchannels_) {
sd->StartConnectivityWatchLocked();
}
}
template <typename SubchannelListType, typename SubchannelDataType>
@ -431,4 +431,4 @@ void SubchannelList<SubchannelListType,
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H

@ -432,11 +432,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher
grpc_connectivity_state state() const { return state_; }
void AddWatcherLocked(
grpc_connectivity_state initial_state,
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) {
if (state_ != initial_state) {
new AsyncWatcherNotifierLocked(watcher, state_, status_);
}
new AsyncWatcherNotifierLocked(watcher, state_, status_);
watcher_list_.AddWatcherLocked(std::move(watcher));
}
@ -509,7 +506,6 @@ class Subchannel::HealthWatcherMap::HealthWatcher
void Subchannel::HealthWatcherMap::AddWatcherLocked(
WeakRefCountedPtr<Subchannel> subchannel,
grpc_connectivity_state initial_state,
const std::string& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
// If the health check service name is not already present in the map,
@ -525,7 +521,7 @@ void Subchannel::HealthWatcherMap::AddWatcherLocked(
health_watcher = it->second.get();
}
// Add the watcher to the entry.
health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
health_watcher->AddWatcherLocked(std::move(watcher));
}
void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
@ -748,18 +744,7 @@ channelz::SubchannelNode* Subchannel::channelz_node() {
return channelz_node_.get();
}
grpc_connectivity_state Subchannel::CheckConnectivityState(
const absl::optional<std::string>& health_check_service_name) {
MutexLock lock(&mu_);
if (health_check_service_name.has_value()) {
return health_watcher_map_.CheckConnectivityStateLocked(
this, *health_check_service_name);
}
return state_;
}
void Subchannel::WatchConnectivityState(
grpc_connectivity_state initial_state,
const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
MutexLock lock(&mu_);
@ -768,14 +753,12 @@ void Subchannel::WatchConnectivityState(
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
}
if (!health_check_service_name.has_value()) {
if (state_ != initial_state) {
new AsyncWatcherNotifierLocked(watcher, state_, status_);
}
new AsyncWatcherNotifierLocked(watcher, state_, status_);
watcher_list_.AddWatcherLocked(std::move(watcher));
} else {
health_watcher_map_.AddWatcherLocked(
WeakRef(DEBUG_LOCATION, "health_watcher"), initial_state,
*health_check_service_name, std::move(watcher));
WeakRef(DEBUG_LOCATION, "health_watcher"), *health_check_service_name,
std::move(watcher));
}
}

@ -242,24 +242,13 @@ class Subchannel : public DualRefCounted<Subchannel> {
channelz::SubchannelNode* channelz_node();
// Returns the current connectivity state of the subchannel.
// If health_check_service_name is non-null, the returned connectivity
// state will be based on the state reported by the backend for that
// service name.
grpc_connectivity_state CheckConnectivityState(
const absl::optional<std::string>& health_check_service_name)
ABSL_LOCKS_EXCLUDED(mu_);
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
// subchannel's connectivity state becomes a value other than
// initial_state, which may happen immediately.
// The first callback to the watcher will be delivered ~immediately.
// Subsequent callbacks will be delivered as the subchannel's state
// changes.
// The watcher will be destroyed either when the subchannel is
// destroyed or when CancelConnectivityStateWatch() is called.
void WatchConnectivityState(
grpc_connectivity_state initial_state,
const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
ABSL_LOCKS_EXCLUDED(mu_);
@ -336,7 +325,6 @@ class Subchannel : public DualRefCounted<Subchannel> {
public:
void AddWatcherLocked(
WeakRefCountedPtr<Subchannel> subchannel,
grpc_connectivity_state initial_state,
const std::string& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(const std::string& health_check_service_name,

@ -61,13 +61,8 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
~SubchannelInterface() override = default;
// Returns the current connectivity state of the subchannel.
virtual grpc_connectivity_state CheckConnectivityState() = 0;
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
// subchannel's connectivity state becomes a value other than
// initial_state, which may happen immediately.
// The first callback to the watcher will be delivered ~immediately.
// Subsequent callbacks will be delivered as the subchannel's state
// changes.
// The watcher will be destroyed either when the subchannel is
@ -76,7 +71,6 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
// valid to call this method a second time without first cancelling
// the previous watcher using CancelConnectivityStateWatch().
virtual void WatchConnectivityState(
grpc_connectivity_state initial_state,
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) = 0;
// Cancels a connectivity state watch.
@ -115,14 +109,9 @@ class DelegatingSubchannel : public SubchannelInterface {
return wrapped_subchannel_;
}
grpc_connectivity_state CheckConnectivityState() override {
return wrapped_subchannel_->CheckConnectivityState();
}
void WatchConnectivityState(
grpc_connectivity_state initial_state,
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
return wrapped_subchannel_->WatchConnectivityState(initial_state,
std::move(watcher));
return wrapped_subchannel_->WatchConnectivityState(std::move(watcher));
}
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override {

@ -61,6 +61,7 @@
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/cpp/server/secure_server_credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -840,6 +841,168 @@ TEST_F(ClientLbEnd2endTest,
EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
}
TEST_F(
PickFirstTest,
TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
// A connection attempt injector that allows us to control timing of
// connection attempts.
class ConnectionInjector : public ConnectionAttemptInjector {
private:
grpc_core::Mutex mu_; // Needs to be declared up front.
public:
class Hold {
public:
Hold(ConnectionInjector* injector, int port)
: injector_(injector), port_(port) {}
int port() const { return port_; }
void set_queued_attempt(std::unique_ptr<QueuedAttempt> queued_attempt)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ConnectionInjector::mu_) {
queued_attempt_ = std::move(queued_attempt);
cv_.Signal();
}
void Wait() {
grpc_core::MutexLock lock(&injector_->mu_);
while (queued_attempt_ == nullptr) {
cv_.Wait(&injector_->mu_);
}
}
void Resume() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&injector_->mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
private:
ConnectionInjector* injector_;
const int port_;
std::unique_ptr<QueuedAttempt> queued_attempt_
ABSL_GUARDED_BY(&ConnectionInjector::mu_);
grpc_core::CondVar cv_;
};
std::unique_ptr<Hold> AddHold(int port) {
grpc_core::MutexLock lock(&mu_);
auto hold = absl::make_unique<Hold>(this, port);
holds_.push_back(hold.get());
return hold;
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
{
grpc_core::MutexLock lock(&mu_);
for (auto it = holds_.begin(); it != holds_.end(); ++it) {
Hold* hold = *it;
if (port == hold->port()) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
hold->set_queued_attempt(absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline));
holds_.erase(it);
return;
}
}
}
// Anything we're not holding should proceed normally.
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
std::vector<Hold*> holds_;
};
// Start connection injector.
ConnectionInjector injector;
injector.Start();
// Get 5 unused ports. Each channel will have 2 unique ports followed
// by a common port.
std::vector<int> ports1 = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
std::vector<int> ports2 = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(), ports1[2]};
// Create channel 1.
auto response_generator1 = BuildResolverResponseGenerator();
auto channel1 = BuildChannel("pick_first", response_generator1);
auto stub1 = BuildStub(channel1);
response_generator1.SetNextResolution(ports1);
// Allow the connection attempts for ports 0 and 1 to fail normally.
// Inject a hold for the connection attempt to port 2.
auto hold_channel1_port2 = injector.AddHold(ports1[2]);
// Trigger connection attempt.
gpr_log(GPR_INFO, "=== START CONNECTING CHANNEL 1 ===");
channel1->GetState(/*try_to_connect=*/true);
// Wait for connection attempt to port 2.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 1 PORT 2 TO START ===");
hold_channel1_port2->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 1 PORT 2 STARTED ===");
// Now create channel 2.
auto response_generator2 = BuildResolverResponseGenerator();
auto channel2 = BuildChannel("pick_first", response_generator2);
response_generator2.SetNextResolution(ports2);
// Inject a hold for port 0.
auto hold_channel2_port0 = injector.AddHold(ports2[0]);
// Trigger connection attempt.
gpr_log(GPR_INFO, "=== START CONNECTING CHANNEL 2 ===");
channel2->GetState(/*try_to_connect=*/true);
// Wait for connection attempt to port 0.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 0 TO START ===");
hold_channel2_port0->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 2 PORT 0 STARTED ===");
// Inject a hold for port 0, which will be retried by channel 1.
auto hold_channel1_port0 = injector.AddHold(ports1[0]);
// Now allow the connection attempt to port 2 to complete. The subchannel
// will deliver a TRANSIENT_FAILURE notification to both channels.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 2 ===");
hold_channel1_port2->Resume();
// Wait for channel 1 to retry port 0, so that we know it's seen the
// connectivity state notification for port 2.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 1 PORT 0 ===");
hold_channel1_port0->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 1 PORT 0 STARTED ===");
// Channel 1 should now report TRANSIENT_FAILURE.
// Channel 2 should continue to report CONNECTING.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel1->GetState(false));
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
// Inject a hold for port 2, which will eventually be tried by channel 2.
auto hold_channel2_port2 = injector.AddHold(ports2[2]);
// Allow channel 2 to resume port 0. Port 0 will fail, as will port 1.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 0 ===");
hold_channel2_port0->Resume();
// Wait for channel 2 to try port 2.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 2 ===");
hold_channel2_port2->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 2 PORT 2 STARTED ===");
// Channel 2 should still be CONNECTING here.
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
// Add a hold for channel 2 port 0.
hold_channel2_port0 = injector.AddHold(ports2[0]);
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 2 ===");
hold_channel2_port2->Resume();
// Wait for channel 2 to retry port 0.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 0 ===");
hold_channel2_port0->Wait();
// Now channel 2 should be reporting TRANSIENT_FAILURE.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
// Clean up.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 0 AND CHANNEL 2 PORT 0 ===");
hold_channel1_port0->Resume();
hold_channel2_port0->Resume();
}
TEST_F(PickFirstTest, Updates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

@ -1025,7 +1025,8 @@ TEST_P(XdsFederationTest, EdsResourceNameAuthorityUnknown) {
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
EXPECT_EQ(status.error_message(), "no ready priority");
EXPECT_EQ(status.error_message(),
"weighted_target: all children report state TRANSIENT_FAILURE");
ASSERT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
}

@ -983,7 +983,7 @@ TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
// Test that when all backends are down and then up, we may pick a TF backend
// and we will then jump to ready backend.
TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
CreateAndStartBackends(2);
CreateBackends(2);
const uint32_t kConnectionTimeoutMilliseconds = 5000;
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
@ -1005,25 +1005,24 @@ TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
ShutdownBackend(0);
ShutdownBackend(1);
CheckRpcSendFailure(
DEBUG_LOCATION,
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
// Bring up 0, should be picked as the RPC is hashed to it.
// Bring up backend 0. The channel should become connected without
// any picks, because in TF, we are always trying to connect to at
// least one backend at all times.
StartBackend(0);
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
// RPCs should go to backend 0.
WaitForBackend(DEBUG_LOCATION, 0, WaitForBackendOptions(), rpc_options);
// Bring down 0 and bring up 1.
// Bring down backend 0 and bring up backend 1.
// Note the RPC contains a header value that will always be hashed to
// backend 0. So by purposely bring down backend 0 and bring up another
// backend 0. So by purposely bringing down backend 0 and bringing up another
// backend, this will ensure Picker's first choice of backend 0 will fail
// and it will
// 1. reattempt backend 0 and
// 2. go through the remaining subchannels to find one in READY.
// Since the the entries in the ring is pretty distributed and we have
// and it will go through the remaining subchannels to find one in READY.
// Since the the entries in the ring are pretty distributed and we have
// unused ports to fill the ring, it is almost guaranteed that the Picker
// will go through some non-READY entries and skip them as per design.
ShutdownBackend(0);

Loading…
Cancel
Save