priority and ring_hash LBs: fix interactions when using ring_hash under priority (#29332)

* refactor connection delay injection from client_lb_end2end_test

* fix build

* fix build on older compilers

* clang-format

* buildifier

* a bit of code cleanup

* start failover time whenever the child reports CONNECTING, and don't cancel when deactivating

* clang-format

* rewrite test

* simplify logic in priority policy

* clang-format

* switch to using a bit to indicate child healthiness

* fix reversed comment

* more changes in priority and ring_hash.

priority:
- go back to starting failover timer upon CONNECTING, but only if seen
  READY or IDLE more recently than TRANSIENT_FAILURE

ring_hash:
- don't flap back and forth between IDLE and CONNECTING; once we go
  CONNECTING, we stay there until either TF or READY
- after the first subchannel goes TF, we proactively start another
  subchannel connecting, just like we do after a second subchannel
  reports TF, to ensure that we don't stay in CONNECTING indefinitely if
  we aren't getting any new picks
- always return ring hash's picker, regardless of connectivity state
- update the subchannel connectivity state seen by the picker upon
  subchannel list creation
- start proactive subchannel connection attempt upon subchannel list
  creation if needed

* ring_hash: fix connectivity state seen by aggregation and picker

* fix obiwan error

* swap the order of ring_hash aggregation rules 3 and 4

* restore original test

* refactor connection injector QueuedAttempt code

* add test showing that ring_hash will continue connecting without picks

* clang-format

* don't actually need seen_failure_since_ready_ anymore

* fix TSAN problem

* address code review comments
reviewable/pr29432/r1
Mark D. Roth 3 years ago committed by GitHub
parent 0d3a9121f7
commit 6273832210
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CMakeLists.txt
  2. 2
      build_autogenerated.yaml
  3. 128
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  4. 177
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  5. 4
      test/cpp/end2end/client_lb_end2end_test.cc
  6. 122
      test/cpp/end2end/connection_delay_injector.cc
  7. 112
      test/cpp/end2end/connection_delay_injector.h
  8. 1
      test/cpp/end2end/xds/BUILD
  9. 288
      test/cpp/end2end/xds/xds_end2end_test.cc

1
CMakeLists.txt generated

@ -17519,6 +17519,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/tls.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/tls.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/tls.grpc.pb.h
test/cpp/end2end/connection_delay_injector.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_end2end_test.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc

@ -8464,6 +8464,7 @@ targets:
run: false
language: c++
headers:
- test/cpp/end2end/connection_delay_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
@ -8507,6 +8508,7 @@ targets:
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- src/proto/grpc/testing/xds/v3/tls.proto
- test/cpp/end2end/connection_delay_injector.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_end2end_test.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc

@ -104,7 +104,7 @@ class PriorityLb : public LoadBalancingPolicy {
bool ignore_reresolution_requests);
void ExitIdleLocked();
void ResetBackoffLocked();
void DeactivateLocked();
void MaybeDeactivateLocked();
void MaybeReactivateLocked();
void Orphan() override;
@ -217,6 +217,8 @@ class PriorityLb : public LoadBalancingPolicy {
absl::Status connectivity_status_;
RefCountedPtr<RefCountedPicker> picker_wrapper_;
bool seen_ready_or_idle_since_transient_failure_ = true;
OrphanablePtr<DeactivationTimer> deactivation_timer_;
OrphanablePtr<FailoverTimer> failover_timer_;
};
@ -225,14 +227,37 @@ class PriorityLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
// Returns UINT32_MAX if child is not in current priority list.
// Returns the priority of the specified child name, or UINT32_MAX if
// the child is not in the current priority list.
uint32_t GetChildPriorityLocked(const std::string& child_name) const;
// Called when a child's connectivity state has changed.
// May propagate the update to the channel or trigger choosing a new
// priority.
void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
// Deletes a child. Called when the child's deactivation timer fires.
void DeleteChild(ChildPriority* child);
void TryNextPriorityLocked(bool report_connecting);
void SelectPriorityLocked(uint32_t priority);
// Iterates through the list of priorities to choose one:
// - If the child for a priority doesn't exist, creates it.
// - If a child's failover timer is pending, returns without selecting
// a priority while we wait for the child to attempt to connect. In
// this case, if report_connecting is true, reports CONNECTING state to
// the channel.
// - If the child is connected, it will be used as the current priority.
// - Otherwise, continues on to the next child.
// Reports TRANSIENT_FAILURE to the channel if all children are not
// connected.
//
// This method is idempotent; it should yield the same result every
// time as a function of the state of the children.
void ChoosePriorityLocked(bool report_connecting);
// Sets the specified priority as the current priority.
// Deactivates any children at lower priorities.
// Returns the child's picker to the channel.
void SetCurrentPriorityLocked(uint32_t priority);
const Duration child_failover_timeout_;
@ -246,6 +271,8 @@ class PriorityLb : public LoadBalancingPolicy {
bool update_in_progress_ = false;
// All children that currently exist.
// Some of these children may be in deactivated state.
std::map<std::string, OrphanablePtr<ChildPriority>> children_;
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
@ -312,7 +339,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
current_child_from_before_update_ = children_[child_name].get();
// Unset current_priority_, since it was an index into the old
// config's priority list and may no longer be valid. It will be
// reset later by TryNextPriorityLocked(), but we unset it here in
// reset later by ChoosePriorityLocked(), but we unset it here in
// case updating any of our children triggers a state update.
current_priority_ = UINT32_MAX;
}
@ -332,7 +359,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
auto config_it = config_->children().find(child_name);
if (config_it == config_->children().end()) {
// Existing child not found in new config. Deactivate it.
child->DeactivateLocked();
child->MaybeDeactivateLocked();
} else {
// Existing child found in new config. Update it.
child->UpdateLocked(config_it->second.config,
@ -341,7 +368,7 @@ void PriorityLb::UpdateLocked(UpdateArgs args) {
}
update_in_progress_ = false;
// Try to get connected.
TryNextPriorityLocked(/*report_connecting=*/children_.empty());
ChoosePriorityLocked(/*report_connecting=*/children_.empty());
}
uint32_t PriorityLb::GetChildPriorityLocked(
@ -380,11 +407,11 @@ void PriorityLb::HandleChildConnectivityStateChangeLocked(
} else {
// If it's no longer READY or IDLE, we should stop using it.
// We already started trying other priorities as a result of the
// update, but calling TryNextPriorityLocked() ensures that we will
// update, but calling ChoosePriorityLocked() ensures that we will
// properly select between CONNECTING and TRANSIENT_FAILURE as the
// new state to report to our parent.
current_child_from_before_update_ = nullptr;
TryNextPriorityLocked(/*report_connecting=*/true);
ChoosePriorityLocked(/*report_connecting=*/true);
}
return;
}
@ -396,52 +423,26 @@ void PriorityLb::HandleChildConnectivityStateChangeLocked(
"priority %u",
this, child_priority, child->name().c_str(), current_priority_);
}
// Ignore priorities not in the current config.
if (child_priority == UINT32_MAX) return;
// Ignore lower-than-current priorities.
if (child_priority > current_priority_) return;
// If a child reports TRANSIENT_FAILURE, start trying the next priority.
// Note that even if this is for a higher-than-current priority, we
// may still need to create some children between this priority and
// the current one (e.g., if we got an update that inserted new
// priorities ahead of the current one).
if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
TryNextPriorityLocked(
/*report_connecting=*/child_priority == current_priority_);
return;
}
// The update is for a higher-than-current priority (or for any
// priority if we don't have any current priority).
if (child_priority < current_priority_) {
// If the child reports READY or IDLE, switch to that priority.
// Otherwise, ignore the update.
if (child->connectivity_state() == GRPC_CHANNEL_READY ||
child->connectivity_state() == GRPC_CHANNEL_IDLE) {
SelectPriorityLocked(child_priority);
}
return;
}
// The current priority has returned a new picker, so pass it up to
// our parent.
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
// Unconditionally call ChoosePriorityLocked(). It should do the
// right thing based on the state of all children.
ChoosePriorityLocked(
/*report_connecting=*/child_priority == current_priority_);
}
void PriorityLb::DeleteChild(ChildPriority* child) {
// If this was the current child from before the most recent update,
// stop using it. We already started trying other priorities as a
// result of the update, but calling TryNextPriorityLocked() ensures that
// result of the update, but calling ChoosePriorityLocked() ensures that
// we will properly select between CONNECTING and TRANSIENT_FAILURE as the
// new state to report to our parent.
if (current_child_from_before_update_ == child) {
current_child_from_before_update_ = nullptr;
TryNextPriorityLocked(/*report_connecting=*/true);
ChoosePriorityLocked(/*report_connecting=*/true);
}
children_.erase(child->name());
}
void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
void PriorityLb::ChoosePriorityLocked(bool report_connecting) {
current_priority_ = UINT32_MAX;
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
@ -471,7 +472,7 @@ void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
// If the child is in state READY or IDLE, switch to it.
if (child->connectivity_state() == GRPC_CHANNEL_READY ||
child->connectivity_state() == GRPC_CHANNEL_IDLE) {
SelectPriorityLocked(priority);
SetCurrentPriorityLocked(priority);
return;
}
// Child is not READY or IDLE.
@ -491,6 +492,13 @@ void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
return;
}
// Child has been failing for a while. Move on to the next priority.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] skipping priority %u, child %s: state=%s, "
"failover timer not pending",
this, priority, child_name.c_str(),
ConnectivityStateName(child->connectivity_state()));
}
}
// If there are no more priorities to try, report TRANSIENT_FAILURE.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
@ -506,7 +514,7 @@ void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
absl::make_unique<TransientFailurePicker>(status));
}
void PriorityLb::SelectPriorityLocked(uint32_t priority) {
void PriorityLb::SetCurrentPriorityLocked(uint32_t priority) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
priority, config_->priorities()[priority].c_str());
@ -517,7 +525,7 @@ void PriorityLb::SelectPriorityLocked(uint32_t priority) {
for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
const std::string& child_name = config_->priorities()[p];
auto it = children_.find(child_name);
if (it != children_.end()) it->second->DeactivateLocked();
if (it != children_.end()) it->second->MaybeDeactivateLocked();
}
// Update picker.
auto& child = children_[config_->priorities()[priority]];
@ -742,9 +750,6 @@ PriorityLb::ChildPriority::CreateChildPolicyLocked(
}
void PriorityLb::ChildPriority::ExitIdleLocked() {
if (connectivity_state_ == GRPC_CHANNEL_IDLE && failover_timer_ == nullptr) {
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
child_policy_->ExitIdleLocked();
}
@ -766,20 +771,31 @@ void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
connectivity_state_ = state;
connectivity_status_ = status;
picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
// If READY or IDLE or TRANSIENT_FAILURE, cancel failover timer.
if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE ||
state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If we transition to state CONNECTING and we've not seen
// TRANSIENT_FAILURE more recently than READY or IDLE, start failover
// timer if not already pending.
// In any other state, update seen_ready_or_idle_since_transient_failure_
// and cancel failover timer.
if (state == GRPC_CHANNEL_CONNECTING) {
if (seen_ready_or_idle_since_transient_failure_ &&
failover_timer_ == nullptr) {
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
} else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
seen_ready_or_idle_since_transient_failure_ = true;
failover_timer_.reset();
} else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_ready_or_idle_since_transient_failure_ = false;
failover_timer_.reset();
}
// Notify the parent policy.
priority_policy_->HandleChildConnectivityStateChangeLocked(this);
}
void PriorityLb::ChildPriority::DeactivateLocked() {
// If already deactivated, don't do it again.
if (deactivation_timer_ != nullptr) return;
failover_timer_.reset();
deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
if (deactivation_timer_ == nullptr) {
deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
}
}
void PriorityLb::ChildPriority::MaybeReactivateLocked() {

@ -143,8 +143,6 @@ class RingHash : public LoadBalancingPolicy {
const ServerAddress& address() const { return address_; }
bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
// Performs connectivity state updates that need to be done both when we
// first start watching and when a watcher notification is received.
void UpdateConnectivityStateLocked(
@ -157,10 +155,18 @@ class RingHash : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state) override;
ServerAddress address_;
// Last logical connectivity state seen.
// Note that this may differ from the state actually reported by the
// subchannel in some cases; for example, once this is set to
// TRANSIENT_FAILURE, we do not change it again until we get READY,
// so we skip any interim stops in CONNECTING.
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
// Connectivity state seen by picker.
// Uses an atomic so that it can be accessed outside of the WorkSerializer.
std::atomic<grpc_connectivity_state> connectivity_state_for_picker_{
GRPC_CHANNEL_IDLE};
bool seen_failure_since_ready_ = false;
};
// A list of subchannels.
@ -193,9 +199,10 @@ class RingHash : public LoadBalancingPolicy {
// Updates the RH policy's connectivity state based on the
// subchannel list's state counters, creating new picker and new ring.
// Furthermore, return a bool indicating whether the aggregated state is
// Transient Failure.
bool UpdateRingHashConnectivityStateLocked();
// The index parameter indicates the index into the list of the subchannel
// whose status report triggered the call to
// UpdateRingHashConnectivityStateLocked().
void UpdateRingHashConnectivityStateLocked(size_t index);
// Create a new ring from this subchannel list.
RefCountedPtr<Ring> MakeRing();
@ -508,7 +515,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
//
void RingHash::RingHashSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
GPR_ASSERT(num_subchannels() != 0);
// Check current state of each subchannel synchronously.
for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_connectivity_state state =
@ -522,7 +529,10 @@ void RingHash::RingHashSubchannelList::StartWatchingLocked() {
}
}
// Send updated state to parent based on reported subchannel states.
UpdateRingHashConnectivityStateLocked();
// Pretend we're getting this update from the last subchannel, so that
// if we need to proactively start connecting, we'll start from the
// first subchannel.
UpdateRingHashConnectivityStateLocked(num_subchannels() - 1);
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
@ -552,47 +562,72 @@ void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
}
}
// Sets the RH policy's connectivity state and generates a new picker based
// on the current subchannel list or requests an re-attempt by returning true..
bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
size_t index) {
RingHash* p = static_cast<RingHash*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return false;
if (p->subchannel_list_.get() != this) return;
// The overall aggregation rules here are:
// 1. If there is at least one subchannel in READY state, report READY.
// 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
// TRANSIENT_FAILURE.
// TRANSIENT_FAILURE.
// 3. If there is at least one subchannel in CONNECTING state, report
// CONNECTING.
// 4. If there is at least one subchannel in IDLE state, report IDLE.
// 5. Otherwise, report TRANSIENT_FAILURE.
// CONNECTING.
// 4. If there is one subchannel in TRANSIENT_FAILURE state and there is
// more than one subchannel, report CONNECTING.
// 5. If there is at least one subchannel in IDLE state, report IDLE.
// 6. Otherwise, report TRANSIENT_FAILURE.
//
// We set start_connection_attempt to true if we match rules 2, 3, or 6.
grpc_connectivity_state state;
absl::Status status;
bool start_connection_attempt = false;
if (num_ready_ > 0) {
/* READY */
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
p->ring_));
return false;
}
if (num_connecting_ > 0 && num_transient_failure_ < 2) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return false;
}
if (num_idle_ > 0 && num_transient_failure_ < 2) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
p->ring_));
return false;
state = GRPC_CHANNEL_READY;
} else if (num_transient_failure_ >= 2) {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError("connections to backends failing");
start_connection_attempt = true;
} else if (num_connecting_ > 0) {
state = GRPC_CHANNEL_CONNECTING;
} else if (num_transient_failure_ == 1 && num_subchannels() > 1) {
state = GRPC_CHANNEL_CONNECTING;
start_connection_attempt = true;
} else if (num_idle_ > 0) {
state = GRPC_CHANNEL_IDLE;
} else {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError("connections to backends failing");
start_connection_attempt = true;
}
absl::Status status =
absl::UnavailableError("connections to backend failing or idle");
// Generate new picker and return it to the channel.
// Note that we use our own picker regardless of connectivity state.
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return true;
state, status,
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
p->ring_));
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
//
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt) {
size_t next_index = (index + 1) % num_subchannels();
subchannel(next_index)->subchannel()->AttemptToConnect();
}
}
RefCountedPtr<RingHash::Ring> RingHash::RingHashSubchannelList::MakeRing() {
@ -617,25 +652,34 @@ void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
ConnectivityStateName(last_connectivity_state_),
ConnectivityStateName(connectivity_state));
}
// Decide what state to report for aggregation purposes.
// Decide what state to report for the purposes of aggregation and
// picker behavior.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and do not report any subsequent
// state changes until we go back into state READY.
if (!seen_failure_since_ready_) {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_failure_since_ready_ = true;
}
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state);
} else {
if (connectivity_state == GRPC_CHANNEL_READY) {
seen_failure_since_ready_ = false;
subchannel_list()->UpdateStateCountersLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state);
if (last_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If not transitioning to READY, ignore the update, since we want
// to continue to consider ourselves in TRANSIENT_FAILURE.
if (connectivity_state != GRPC_CHANNEL_READY) return;
} else if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If we go from READY to TF, treat it as IDLE.
// This transition can be caused by a "normal" connection failure, such
// as the server closing the connection due to a max-age setting. In
// this case, we want to have RPCs that hash to this subchannel wait for
// the reconnection attempt rather than assuming that the subchannel is
// bad and moving on to a subsequent subchannel in the ring.
if (last_connectivity_state_ == GRPC_CHANNEL_READY) {
connectivity_state = GRPC_CHANNEL_IDLE;
}
}
// Record last seen connectivity state.
// Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state);
// Update state seen by picker.
connectivity_state_for_picker_.store(connectivity_state,
std::memory_order_relaxed);
// Update last seen connectivity state.
last_connectivity_state_ = connectivity_state;
}
@ -643,15 +687,11 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// Update connectivity state used by picker.
connectivity_state_for_picker_.store(connectivity_state,
std::memory_order_relaxed);
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
// Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
@ -665,28 +705,7 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
UpdateConnectivityStateLocked(connectivity_state);
// Update the RH policy's connectivity state, creating new picker and new
// ring.
bool transient_failure =
subchannel_list()->UpdateRingHashConnectivityStateLocked();
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
if (transient_failure &&
connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
RingHashSubchannelData* next_sd = subchannel_list()->subchannel(next_index);
next_sd->subchannel()->AttemptToConnect();
}
subchannel_list()->UpdateRingHashConnectivityStateLocked(Index());
}
//

@ -605,8 +605,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
response_generator.SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
ConnectionDelayInjector delay_injector;
auto injected_delay = delay_injector.SetDelay(
ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(kMinReconnectBackOffMs * 1.10));
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
channel->WaitForConnected(
@ -2128,6 +2127,7 @@ TEST_F(ClientLbAddressTest, Basic) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc::testing::ConnectionAttemptInjector::Init();
const auto result = RUN_ALL_TESTS();
return result;
}

@ -20,96 +20,98 @@
#include "absl/memory/memory.h"
#include "absl/utility/utility.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
// defined in tcp_client.cc
extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
namespace grpc {
namespace testing {
//
// ConnectionAttemptInjector
//
namespace {
grpc_tcp_client_vtable* g_original_vtable = nullptr;
std::atomic<ConnectionAttemptInjector*> g_injector{nullptr};
void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
ConnectionAttemptInjector* injector = g_injector.load();
if (injector == nullptr) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
return;
}
injector->HandleConnection(closure, ep, interested_parties, channel_args,
addr, deadline);
}
std::atomic<grpc_core::Duration> g_delay;
grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay};
} // namespace
void ConnectionAttemptInjector::Init() {
g_original_vtable = grpc_tcp_client_impl;
grpc_tcp_client_impl = &kDelayedConnectVTable;
}
ConnectionAttemptInjector::ConnectionAttemptInjector() {
GPR_ASSERT(g_injector.exchange(this) == nullptr);
}
ConnectionAttemptInjector::~ConnectionAttemptInjector() {
g_injector.store(nullptr);
}
void ConnectionAttemptInjector::AttemptConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
}
class InjectedDelay {
//
// ConnectionDelayInjector
//
class ConnectionDelayInjector::InjectedDelay {
public:
InjectedDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
InjectedDelay(grpc_core::Duration duration, grpc_closure* closure,
grpc_endpoint** ep, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline)
: closure_(closure),
endpoint_(ep),
interested_parties_(interested_parties),
channel_args_(grpc_channel_args_copy(channel_args)),
deadline_(deadline) {
memcpy(&address_, addr, sizeof(grpc_resolved_address));
: attempt_(closure, ep, interested_parties, channel_args, addr,
deadline) {
GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr);
grpc_core::Duration duration = g_delay.load();
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
duration = std::min(duration, deadline_ - now);
duration = std::min(duration, deadline - now);
grpc_timer_init(&timer_, now + duration, &timer_callback_);
}
~InjectedDelay() { grpc_channel_args_destroy(channel_args_); }
private:
static void TimerCallback(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<InjectedDelay*>(arg);
g_original_vtable->connect(self->closure_, self->endpoint_,
self->interested_parties_, self->channel_args_,
&self->address_, self->deadline_);
self->attempt_.Resume();
delete self;
}
QueuedAttempt attempt_;
grpc_timer timer_;
grpc_closure timer_callback_;
// Original args.
grpc_closure* closure_;
grpc_endpoint** endpoint_;
grpc_pollset_set* interested_parties_;
const grpc_channel_args* channel_args_;
grpc_resolved_address address_;
grpc_core::Timestamp deadline_;
};
void TcpConnectWithDelay(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
new InjectedDelay(closure, ep, interested_parties, channel_args, addr,
deadline);
}
grpc_tcp_client_vtable kDelayedConnectVTable = {TcpConnectWithDelay};
} // namespace
ConnectionDelayInjector::InjectedDelay::~InjectedDelay() {
g_delay.store(grpc_core::Duration());
}
ConnectionDelayInjector::ConnectionDelayInjector() {
g_original_vtable =
absl::exchange(grpc_tcp_client_impl, &kDelayedConnectVTable);
}
ConnectionDelayInjector::~ConnectionDelayInjector() {
grpc_tcp_client_impl = g_original_vtable;
}
std::unique_ptr<ConnectionDelayInjector::InjectedDelay>
ConnectionDelayInjector::SetDelay(grpc_core::Duration duration) {
GPR_ASSERT(g_delay.exchange(duration) == grpc_core::Duration());
return absl::make_unique<ConnectionDelayInjector::InjectedDelay>();
void ConnectionDelayInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
new InjectedDelay(duration_, closure, ep, interested_parties, channel_args,
addr, deadline);
}
} // namespace testing

@ -17,7 +17,10 @@
#include <memory>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
namespace grpc {
namespace testing {
@ -25,31 +28,104 @@ namespace testing {
// Allows injecting connection-establishment delays into C-core.
// Typical usage:
//
// ConnectionDelayInjector delay_injector;
// auto scoped_delay =
// delay_injector.SetDelay(grpc_core::Duration::Seconds(10));
// // At grpc_init() time.
// ConnectionAttemptInjector::Init();
//
// When ConnectionDelayInjector is instantiated, it replaces the iomgr
// TCP client vtable, and it sets it back to the original value when it
// is destroyed.
// // When an injection is desired.
// ConnectionDelayInjector delay_injector(grpc_core::Duration::Seconds(10));
//
// When SetDelay() is called, it sets the global delay, which will
// automatically be unset when the result goes out of scope.
//
// The injection is global, so there must be only one ConnectionDelayInjector
// object at any one time, and there must be only one scoped delay in effect
// at any one time.
class ConnectionDelayInjector {
// The injection is global, so there must be only one ConnectionAttemptInjector
// object at any one time.
class ConnectionAttemptInjector {
public:
class InjectedDelay {
// Global initializer. Replaces the iomgr TCP client vtable.
// Must be called exactly once before any TCP connections are established.
static void Init();
ConnectionAttemptInjector();
virtual ~ConnectionAttemptInjector();
// Invoked for every TCP connection attempt.
// Implementations must eventually either invoke the closure
// themselves or delegate to the iomgr implementation by calling
// AttemptConnection(). QueuedAttempt may be used to queue an attempt
// for asynchronous processing.
virtual void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) = 0;
protected:
// Represents a queued attempt.
// The caller must invoke either Resume() or Fail() before destroying.
class QueuedAttempt {
public:
~InjectedDelay();
QueuedAttempt(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline)
: closure_(closure),
endpoint_(ep),
interested_parties_(interested_parties),
channel_args_(grpc_channel_args_copy(channel_args)),
deadline_(deadline) {
memcpy(&address_, addr, sizeof(address_));
}
~QueuedAttempt() {
GPR_ASSERT(closure_ == nullptr);
grpc_channel_args_destroy(channel_args_);
}
// Caller must invoke this from a thread with an ExecCtx.
void Resume() {
GPR_ASSERT(closure_ != nullptr);
AttemptConnection(closure_, endpoint_, interested_parties_, channel_args_,
&address_, deadline_);
closure_ = nullptr;
}
// Caller must invoke this from a thread with an ExecCtx.
void Fail(grpc_error_handle error) {
GPR_ASSERT(closure_ != nullptr);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, error);
closure_ = nullptr;
}
private:
grpc_closure* closure_;
grpc_endpoint** endpoint_;
grpc_pollset_set* interested_parties_;
const grpc_channel_args* channel_args_;
grpc_resolved_address address_;
grpc_core::Timestamp deadline_;
};
ConnectionDelayInjector();
~ConnectionDelayInjector();
static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
};
// A concrete implementation that injects a fixed delay.
class ConnectionDelayInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionDelayInjector(grpc_core::Duration duration)
: duration_(duration) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override;
private:
class InjectedDelay;
std::unique_ptr<InjectedDelay> SetDelay(grpc_core::Duration duration);
grpc_core::Duration duration_;
};
} // namespace testing

@ -130,6 +130,7 @@ grpc_cc_test(
"//src/proto/grpc/testing/xds/v3:router_proto",
"//src/proto/grpc/testing/xds/v3:tls_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:connection_delay_injector",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
"//test/cpp/util:tls_test_utils",

@ -67,6 +67,7 @@
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
@ -103,6 +104,7 @@
#include "src/proto/grpc/testing/xds/v3/tls.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/connection_delay_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/tls_test_utils.h"
@ -4843,6 +4845,209 @@ TEST_P(CdsTest, AggregateClusterFallBackFromRingHashAtStartup) {
EXPECT_TRUE(found);
}
TEST_P(CdsTest, AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
CreateAndStartBackends(1);
const char* kEdsClusterName = "eds_cluster";
const char* kLogicalDNSClusterName = "logical_dns_cluster";
// Populate EDS resource.
EdsResourceArgs args({
{"locality0",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
0},
{"locality1",
{MakeNonExistantEndpoint(), MakeNonExistantEndpoint()},
kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Populate new CDS resources.
Cluster eds_cluster = default_cluster_;
eds_cluster.set_name(kEdsClusterName);
balancer_->ads_service()->SetCdsResource(eds_cluster);
// Populate LOGICAL_DNS cluster.
auto logical_dns_cluster = default_cluster_;
logical_dns_cluster.set_name(kLogicalDNSClusterName);
logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
auto* address = logical_dns_cluster.mutable_load_assignment()
->add_endpoints()
->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
address->set_address(kServerName);
address->set_port_value(443);
balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kEdsClusterName);
cluster_config.add_clusters(kLogicalDNSClusterName);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// Set up route with channel id hashing
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Inject connection delay to make this act more realistically.
ConnectionDelayInjector delay_injector(
grpc_core::Duration::Milliseconds(500) * grpc_test_slowdown_factor());
// Send RPC. Need the timeout to be long enough to account for the
// subchannel connection delays.
CheckRpcSendOk(1, RpcOptions().set_timeout_ms(3500));
}
// This test covers a bug found in the following scenario:
// 1. P0 reports TRANSIENT_FAILURE, so we start connecting to P1.
// 2. While P1 is still in CONNECTING, P0 goes back to READY, so we
// switch back to P0, deactivating P1.
// 3. P0 then goes back to TRANSIENT_FAILURE, and we reactivate P1.
// The bug caused us to fail to choose P1 even though it is in state
// CONNECTING (because the failover timer was not running), so we
// incorrectly failed the RPCs.
TEST_P(CdsTest, AggregateClusterFallBackWithConnectivityChurn) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
CreateAndStartBackends(2);
const char* kClusterName1 = "cluster1";
const char* kClusterName2 = "cluster2";
const char* kEdsServiceName2 = "eds_service_name2";
// Populate EDS resources.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kEdsServiceName2));
// Populate new CDS resources.
Cluster cluster1 = default_cluster_;
cluster1.set_name(kClusterName1);
balancer_->ads_service()->SetCdsResource(cluster1);
Cluster cluster2 = default_cluster_;
cluster2.set_name(kClusterName2);
cluster2.mutable_eds_cluster_config()->set_service_name(kEdsServiceName2);
balancer_->ads_service()->SetCdsResource(cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kClusterName1);
cluster_config.add_clusters(kClusterName2);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// This class injects itself into all TCP connection attempts made
// against iomgr. It intercepts the attempts for the P0 and P1
// backends and allows them to proceed as desired to simulate the case
// being tested.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
ConnectionInjector(int p0_port, int p1_port)
: p0_port_(p0_port), p1_port_(p1_port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_,
port);
switch (state_) {
case kInit:
// Make P0 report TF, which should trigger us to try to connect to
// P1.
if (port == p0_port_) {
gpr_log(GPR_INFO, "*** INJECTING FAILURE FOR P0 ENDPOINT");
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"injected connection failure"));
state_ = kP0Failed;
return;
}
break;
case kP0Failed:
// Hold connection attempt to P1 so that it stays in CONNECTING.
if (port == p1_port_) {
gpr_log(GPR_INFO,
"*** DELAYING CONNECTION ATTEMPT FOR P1 ENDPOINT");
queued_p1_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr,
deadline);
state_ = kDone;
return;
}
break;
case kDone:
// P0 should attempt reconnection. Log it to make the test
// easier to debug, but allow it to complete, so that the
// priority policy deactivates P1.
if (port == p0_port_) {
gpr_log(GPR_INFO,
"*** INTERCEPTING CONNECTION ATTEMPT FOR P0 ENDPOINT");
}
break;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
// Invoked by the test when the RPC to the P0 backend has succeeded
// and it's ready to allow the P1 connection attempt to proceed.
void CompletePriority1Connection() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(state_ == kDone);
attempt = std::move(queued_p1_attempt_);
}
attempt->Resume();
}
private:
const int p0_port_;
const int p1_port_;
grpc_core::Mutex mu_;
enum {
kInit,
kP0Failed,
kDone,
} state_ ABSL_GUARDED_BY(mu_) = kInit;
std::unique_ptr<QueuedAttempt> queued_p1_attempt_ ABSL_GUARDED_BY(mu_);
};
ConnectionInjector connection_attempt_injector(backends_[0]->port(),
backends_[1]->port());
// Wait for P0 backend.
// Increase timeout to account for subchannel connection delays.
WaitForBackend(0, WaitForBackendOptions(), RpcOptions().set_timeout_ms(2000));
// Bring down the P0 backend.
ShutdownBackend(0);
// Allow the connection attempt to the P1 backend to resume.
connection_attempt_injector.CompletePriority1Connection();
// Wait for P1 backend to start getting traffic.
WaitForBackend(1);
}
TEST_P(CdsTest, AggregateClusterEdsToLogicalDns) {
ScopedExperimentalEnvVar env_var(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
@ -5751,6 +5956,88 @@ TEST_P(CdsTest, RingHashIdleToReady) {
EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
}
// Test that the channel will transition to READY once it starts
// connecting even if there are no RPCs being sent to the picker.
TEST_P(CdsTest, RingHashContinuesConnectingWithoutPicks) {
// Create EDS resource.
CreateAndStartBackends(1);
auto non_existant_endpoint = MakeNonExistantEndpoint();
EdsResourceArgs args(
{{"locality0", {non_existant_endpoint, CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Change CDS resource to use RING_HASH.
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
// Add hash policy to RDS resource.
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// A connection injector that cancels the RPC after seeing the
// connection attempt for the non-existant endpoint.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): seen_port_=%d, port=%d",
seen_port_, port);
// Initial attempt should be for port0_, which should fail.
// Cancel the RPC at this point, so that it's no longer
// queued when the LB policy updates the picker.
if (!seen_port_ && port == port_) {
gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT");
seen_port_ = true;
cond_.Signal();
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
void WaitForP0ConnectionAttempt() {
grpc_core::MutexLock lock(&mu_);
while (!seen_port_) {
cond_.Wait(&mu_);
}
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
bool seen_port_ ABSL_GUARDED_BY(mu_) = false;
};
ConnectionInjector connection_injector(non_existant_endpoint.port);
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash",
CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
std::move(metadata)));
// Wait for the RPC to trigger the P0 connection attempt, then cancel it.
connection_injector.WaitForP0ConnectionAttempt();
rpc.CancelRpc();
// Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
// RPC should have been cancelled.
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
// Make sure the backend did not get any requests.
EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
}
// Test that when the first pick is down leading to a transient failure, we
// will move on to the next ring hash entry.
TEST_P(CdsTest, RingHashTransientFailureCheckNextOne) {
@ -11225,6 +11512,7 @@ int main(int argc, char** argv) {
absl::make_unique<grpc::testing::FakeCertificateProviderFactory>(
"fake2", &grpc::testing::g_fake2_cert_data_map));
grpc_init();
grpc::testing::ConnectionAttemptInjector::Init();
grpc_core::XdsHttpFilterRegistry::RegisterFilter(
absl::make_unique<grpc::testing::NoOpHttpFilter>(
"grpc.testing.client_only_http_filter",

Loading…
Cancel
Save