|
|
|
@ -47,7 +47,6 @@ |
|
|
|
|
#include "src/core/lib/channel/metrics.h" |
|
|
|
|
#include "src/core/lib/config/core_configuration.h" |
|
|
|
|
#include "src/core/lib/debug/trace.h" |
|
|
|
|
#include "src/core/lib/experiments/experiments.h" |
|
|
|
|
#include "src/core/lib/gpr/useful.h" |
|
|
|
|
#include "src/core/lib/gprpp/crash.h" |
|
|
|
|
#include "src/core/lib/gprpp/debug_location.h" |
|
|
|
@ -198,10 +197,6 @@ class PickFirst : public LoadBalancingPolicy { |
|
|
|
|
// subchannel.
|
|
|
|
|
void ProcessUnselectedReadyLocked(); |
|
|
|
|
|
|
|
|
|
// Reacts to the current connectivity state while trying to connect.
|
|
|
|
|
// TODO(roth): Remove this when we remove the Happy Eyeballs experiment.
|
|
|
|
|
void ReactToConnectivityStateLocked(); |
|
|
|
|
|
|
|
|
|
// Backpointer to owning subchannel list. Not owned.
|
|
|
|
|
SubchannelList* subchannel_list_; |
|
|
|
|
const size_t index_; |
|
|
|
@ -274,9 +269,6 @@ class PickFirst : public LoadBalancingPolicy { |
|
|
|
|
// finished processing.
|
|
|
|
|
bool shutting_down_ = false; |
|
|
|
|
|
|
|
|
|
// TODO(roth): Remove this when we remove the Happy Eyeballs experiment.
|
|
|
|
|
bool in_transient_failure_ = false; |
|
|
|
|
|
|
|
|
|
size_t num_subchannels_seen_initial_notification_ = 0; |
|
|
|
|
|
|
|
|
|
// The index into subchannels_ to which we are currently attempting
|
|
|
|
@ -533,34 +525,30 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) { |
|
|
|
|
for (const auto& endpoint : endpoints) { |
|
|
|
|
for (const auto& address : endpoint.addresses()) { |
|
|
|
|
flattened_endpoints.emplace_back(address, endpoint.args()); |
|
|
|
|
if (IsPickFirstHappyEyeballsEnabled()) { |
|
|
|
|
absl::string_view scheme = GetAddressFamily(address); |
|
|
|
|
bool inserted = address_families.insert(scheme).second; |
|
|
|
|
if (inserted) { |
|
|
|
|
address_family_order.emplace_back(scheme, |
|
|
|
|
flattened_endpoints.size() - 1); |
|
|
|
|
} |
|
|
|
|
absl::string_view scheme = GetAddressFamily(address); |
|
|
|
|
bool inserted = address_families.insert(scheme).second; |
|
|
|
|
if (inserted) { |
|
|
|
|
address_family_order.emplace_back(scheme, |
|
|
|
|
flattened_endpoints.size() - 1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
endpoints = std::move(flattened_endpoints); |
|
|
|
|
// Interleave addresses as per RFC-8305 section 4.
|
|
|
|
|
if (IsPickFirstHappyEyeballsEnabled()) { |
|
|
|
|
EndpointAddressesList interleaved_endpoints; |
|
|
|
|
interleaved_endpoints.reserve(endpoints.size()); |
|
|
|
|
std::vector<bool> endpoints_moved(endpoints.size()); |
|
|
|
|
size_t scheme_index = 0; |
|
|
|
|
for (size_t i = 0; i < endpoints.size(); ++i) { |
|
|
|
|
EndpointAddresses* endpoint; |
|
|
|
|
do { |
|
|
|
|
auto& iterator = address_family_order[scheme_index++ % |
|
|
|
|
address_family_order.size()]; |
|
|
|
|
endpoint = iterator.Next(endpoints, &endpoints_moved); |
|
|
|
|
} while (endpoint == nullptr); |
|
|
|
|
interleaved_endpoints.emplace_back(std::move(*endpoint)); |
|
|
|
|
} |
|
|
|
|
endpoints = std::move(interleaved_endpoints); |
|
|
|
|
EndpointAddressesList interleaved_endpoints; |
|
|
|
|
interleaved_endpoints.reserve(endpoints.size()); |
|
|
|
|
std::vector<bool> endpoints_moved(endpoints.size()); |
|
|
|
|
size_t scheme_index = 0; |
|
|
|
|
for (size_t i = 0; i < endpoints.size(); ++i) { |
|
|
|
|
EndpointAddresses* endpoint; |
|
|
|
|
do { |
|
|
|
|
auto& iterator = address_family_order[scheme_index++ % |
|
|
|
|
address_family_order.size()]; |
|
|
|
|
endpoint = iterator.Next(endpoints, &endpoints_moved); |
|
|
|
|
} while (endpoint == nullptr); |
|
|
|
|
interleaved_endpoints.emplace_back(std::move(*endpoint)); |
|
|
|
|
} |
|
|
|
|
endpoints = std::move(interleaved_endpoints); |
|
|
|
|
args.addresses = |
|
|
|
|
std::make_shared<EndpointAddressesListIterator>(std::move(endpoints)); |
|
|
|
|
} |
|
|
|
@ -738,9 +726,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( |
|
|
|
|
p->UnsetSelectedSubchannel(); |
|
|
|
|
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); |
|
|
|
|
// Set our state to that of the pending subchannel list.
|
|
|
|
|
if (IsPickFirstHappyEyeballsEnabled() |
|
|
|
|
? p->subchannel_list_->IsHappyEyeballsPassComplete() |
|
|
|
|
: p->subchannel_list_->in_transient_failure_) { |
|
|
|
|
if (p->subchannel_list_->IsHappyEyeballsPassComplete()) { |
|
|
|
|
status = absl::UnavailableError(absl::StrCat( |
|
|
|
|
"selected subchannel failed; switching to pending update; " |
|
|
|
|
"last failure: ", |
|
|
|
@ -772,9 +758,6 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( |
|
|
|
|
// select in place of the current one.
|
|
|
|
|
// If the subchannel is READY, use it.
|
|
|
|
|
if (new_state == GRPC_CHANNEL_READY) { |
|
|
|
|
if (!IsPickFirstHappyEyeballsEnabled()) { |
|
|
|
|
subchannel_list_->in_transient_failure_ = false; |
|
|
|
|
} |
|
|
|
|
// We consider it a successful connection attempt only if the
|
|
|
|
|
// previous state was CONNECTING. In particular, we don't want to
|
|
|
|
|
// increment this counter if we got a new address list and found the
|
|
|
|
@ -807,10 +790,6 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( |
|
|
|
|
// see its initial notification. Start trying to connect, starting
|
|
|
|
|
// with the first subchannel.
|
|
|
|
|
if (!old_state.has_value()) { |
|
|
|
|
if (!IsPickFirstHappyEyeballsEnabled()) { |
|
|
|
|
subchannel_list_->subchannels_.front().ReactToConnectivityStateLocked(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
subchannel_list_->StartConnectingNextSubchannel(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -821,14 +800,6 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( |
|
|
|
|
kMetricConnectionAttemptsFailed, 1, |
|
|
|
|
{subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {}); |
|
|
|
|
} |
|
|
|
|
if (!IsPickFirstHappyEyeballsEnabled()) { |
|
|
|
|
// Ignore any other updates for subchannels we're not currently trying to
|
|
|
|
|
// connect to.
|
|
|
|
|
if (index_ != subchannel_list_->attempting_index_) return; |
|
|
|
|
// React to the connectivity state.
|
|
|
|
|
ReactToConnectivityStateLocked(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Otherwise, process connectivity state change.
|
|
|
|
|
switch (*connectivity_state_) { |
|
|
|
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
|
|
|
@ -898,99 +869,6 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PickFirst::SubchannelList::SubchannelData:: |
|
|
|
|
ReactToConnectivityStateLocked() { |
|
|
|
|
PickFirst* p = subchannel_list_->policy_.get(); |
|
|
|
|
// Otherwise, process connectivity state.
|
|
|
|
|
switch (connectivity_state_.value()) { |
|
|
|
|
case GRPC_CHANNEL_READY: |
|
|
|
|
// Already handled this case above, so this should not happen.
|
|
|
|
|
GPR_UNREACHABLE_CODE(break); |
|
|
|
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
|
|
|
|
// Find the next subchannel not in state TRANSIENT_FAILURE.
|
|
|
|
|
// We skip subchannels in state TRANSIENT_FAILURE to avoid a
|
|
|
|
|
// large recursion that could overflow the stack.
|
|
|
|
|
SubchannelData* found_subchannel = nullptr; |
|
|
|
|
for (size_t next_index = index_ + 1; |
|
|
|
|
next_index < subchannel_list_->size(); ++next_index) { |
|
|
|
|
SubchannelData* sc = &subchannel_list_->subchannels_[next_index]; |
|
|
|
|
GPR_ASSERT(sc->connectivity_state_.has_value()); |
|
|
|
|
if (sc->connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { |
|
|
|
|
subchannel_list_->attempting_index_ = next_index; |
|
|
|
|
found_subchannel = sc; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// If we found another subchannel in the list not in state
|
|
|
|
|
// TRANSIENT_FAILURE, trigger the right behavior for that subchannel.
|
|
|
|
|
if (found_subchannel != nullptr) { |
|
|
|
|
found_subchannel->ReactToConnectivityStateLocked(); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
// We didn't find another subchannel not in state TRANSIENT_FAILURE,
|
|
|
|
|
// so report TRANSIENT_FAILURE and wait for the first subchannel
|
|
|
|
|
// in the list to report IDLE before continuing.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"Pick First %p subchannel list %p failed to connect to " |
|
|
|
|
"all subchannels", |
|
|
|
|
p, subchannel_list_); |
|
|
|
|
} |
|
|
|
|
subchannel_list_->attempting_index_ = 0; |
|
|
|
|
subchannel_list_->in_transient_failure_ = true; |
|
|
|
|
// In case 2, swap to the new subchannel list. This means reporting
|
|
|
|
|
// TRANSIENT_FAILURE and dropping the existing (working) connection,
|
|
|
|
|
// but we can't ignore what the control plane has told us.
|
|
|
|
|
if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"Pick First %p promoting pending subchannel list %p to " |
|
|
|
|
"replace %p", |
|
|
|
|
p, p->latest_pending_subchannel_list_.get(), |
|
|
|
|
p->subchannel_list_.get()); |
|
|
|
|
} |
|
|
|
|
p->UnsetSelectedSubchannel(); |
|
|
|
|
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); |
|
|
|
|
} |
|
|
|
|
// If this is the current subchannel list (either because we were
|
|
|
|
|
// in case 1 or because we were in case 2 and just promoted it to
|
|
|
|
|
// be the current list), re-resolve and report new state.
|
|
|
|
|
if (subchannel_list_ == p->subchannel_list_.get()) { |
|
|
|
|
p->channel_control_helper()->RequestReresolution(); |
|
|
|
|
absl::Status status = absl::UnavailableError(absl::StrCat( |
|
|
|
|
(p->omit_status_message_prefix_ |
|
|
|
|
? "" |
|
|
|
|
: "failed to connect to all addresses; last error: "), |
|
|
|
|
connectivity_status_.ToString())); |
|
|
|
|
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status, |
|
|
|
|
MakeRefCounted<TransientFailurePicker>(status)); |
|
|
|
|
} |
|
|
|
|
// If the first subchannel is already IDLE, trigger the next connection
|
|
|
|
|
// attempt immediately. Otherwise, we'll wait for it to report
|
|
|
|
|
// its own connectivity state change.
|
|
|
|
|
auto& subchannel0 = subchannel_list_->subchannels_.front(); |
|
|
|
|
if (subchannel0.connectivity_state_ == GRPC_CHANNEL_IDLE) { |
|
|
|
|
subchannel0.subchannel_->RequestConnection(); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case GRPC_CHANNEL_IDLE: |
|
|
|
|
subchannel_->RequestConnection(); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHANNEL_CONNECTING: |
|
|
|
|
// Only update connectivity state in case 1, and only if we're not
|
|
|
|
|
// already in TRANSIENT_FAILURE.
|
|
|
|
|
if (subchannel_list_ == p->subchannel_list_.get() && |
|
|
|
|
p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { |
|
|
|
|
p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(), |
|
|
|
|
MakeRefCounted<QueuePicker>(nullptr)); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHANNEL_SHUTDOWN: |
|
|
|
|
GPR_UNREACHABLE_CODE(break); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() { |
|
|
|
|
GPR_ASSERT(connectivity_state_.has_value()); |
|
|
|
|
if (connectivity_state_ == GRPC_CHANNEL_IDLE) { |
|
|
|
|