subchannel: report IDLE upon existing connection failure and after backoff interval (#29428)

* subchannel: report IDLE upon existing connection failure and after backoff interval

* rename AttemptToConnect() to RequestConnection()

* clang-format

* fix unused parameter warning

* fix subchannel to handle either TF or SHUTDOWN from transport

* fix handling of ConnectedSubchannel failure

* pass status up in IDLE state to communicate keepalive info

* update comment

* split pick_first and round_robin tests into their own test suites

* improve log message

* add test

* clang-format

* appease clang-tidy

* fix test to do a poor man's graceful shutdown to avoid spurious RPC failures

* simplify round_robin logic and fix test flakes

* fix grpclb bug
pull/29515/head
Mark D. Roth 3 years ago committed by GitHub
parent 41cbd23998
commit 1cd6e69347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/client_channel.cc
  2. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 70
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 27
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  5. 313
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 243
      src/core/ext/filters/client_channel/subchannel.cc
  7. 50
      src/core/ext/filters/client_channel/subchannel.h
  8. 10
      src/core/ext/filters/client_channel/subchannel_interface.h
  9. 9
      src/core/lib/surface/server.cc
  10. 2
      src/core/lib/surface/server.h
  11. 439
      test/cpp/end2end/client_lb_end2end_test.cc
  12. 3
      test/cpp/end2end/connection_delay_injector.cc
  13. 3
      test/cpp/end2end/connection_delay_injector.h

@ -525,7 +525,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
return subchannel_->connected_subchannel(); return subchannel_->connected_subchannel();
} }
void AttemptToConnect() override { subchannel_->AttemptToConnect(); } void RequestConnection() override { subchannel_->RequestConnection(); }
void ResetBackoff() override { subchannel_->ResetBackoff(); } void ResetBackoff() override { subchannel_->ResetBackoff(); }

@ -1239,6 +1239,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// If the fallback-at-startup checks are pending, go into fallback mode // If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the fallback-at-startup // immediately. This short-circuits the timeout for the fallback-at-startup
// case. // case.
grpclb_policy()->lb_calld_.reset();
if (grpclb_policy()->fallback_at_startup_checks_pending_) { if (grpclb_policy()->fallback_at_startup_checks_pending_) {
GPR_ASSERT(!seen_serverlist_); GPR_ASSERT(!seen_serverlist_);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -1254,7 +1255,6 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// This handles the fallback-after-startup case. // This handles the fallback-after-startup case.
grpclb_policy()->MaybeEnterFallbackModeAfterStartup(); grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
} }
grpclb_policy()->lb_calld_.reset();
GPR_ASSERT(!grpclb_policy()->shutting_down_); GPR_ASSERT(!grpclb_policy()->shutting_down_);
grpclb_policy()->channel_control_helper()->RequestReresolution(); grpclb_policy()->channel_control_helper()->RequestReresolution();
if (seen_initial_response_) { if (seen_initial_response_) {

@ -201,6 +201,10 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status)); absl::make_unique<TransientFailurePicker>(status));
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
return; return;
} }
// If one of the subchannels in the new list is already in state // If one of the subchannels in the new list is already in state
@ -232,7 +236,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// here, since we've already checked the initial connectivity // here, since we've already checked the initial connectivity
// state of all subchannels above. // state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect(); subchannel_list_->subchannel(0)->subchannel()->RequestConnection();
} else { } else {
// We do have a selected subchannel (which means it's READY), so keep // We do have a selected subchannel (which means it's READY), so keep
// using it until one of the subchannels in the new list reports READY. // using it until one of the subchannels in the new list reports READY.
@ -255,7 +259,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
->StartConnectivityWatchLocked(); ->StartConnectivityWatchLocked();
latest_pending_subchannel_list_->subchannel(0) latest_pending_subchannel_list_->subchannel(0)
->subchannel() ->subchannel()
->AttemptToConnect(); ->RequestConnection();
} }
} }
@ -307,10 +311,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"Pick First %p selected subchannel connectivity changed to %s", p, "Pick First %p selected subchannel connectivity changed to %s", p,
ConnectivityStateName(connectivity_state)); ConnectivityStateName(connectivity_state));
} }
// If the new state is anything other than READY and there is a // We might miss a connectivity state update between calling
// pending update, switch to the pending update. // CheckConnectivityStateLocked() and StartConnectivityWatchLocked().
if (connectivity_state != GRPC_CHANNEL_READY && // If the new state is READY, just ignore it; otherwise, regardless of
p->latest_pending_subchannel_list_ != nullptr) { // what state it is, we treat it as a failure of the existing connection.
if (connectivity_state == GRPC_CHANNEL_READY) return;
// If there is a pending update, switch to the pending update.
if (p->latest_pending_subchannel_list_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to " "Pick First %p promoting pending subchannel list %p to "
@ -335,38 +342,19 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
absl::make_unique<QueuePicker>( absl::make_unique<QueuePicker>(
p->Ref(DEBUG_LOCATION, "QueuePicker"))); p->Ref(DEBUG_LOCATION, "QueuePicker")));
} }
} else { return;
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected subchannel goes bad, request a re-resolution. We
// also set the channel state to IDLE. The reason is that if the new
// state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want
// to connect to the re-resolved backends until we leave IDLE state.
// TODO(qianchengz): We may want to request re-resolution in
// ExitIdleLocked().
p->idle_ = true;
p->channel_control_helper()->RequestReresolution();
p->selected_ = nullptr;
p->subchannel_list_.reset();
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
absl::make_unique<QueuePicker>(
p->Ref(DEBUG_LOCATION, "QueuePicker")));
} else {
// This is unlikely but can happen when a subchannel has been asked
// to reconnect by a different channel and this channel has dropped
// some connectivity state notifications.
if (connectivity_state == GRPC_CHANNEL_READY) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(subchannel()->Ref()));
} else { // CONNECTING
p->channel_control_helper()->UpdateState(
connectivity_state, absl::Status(),
absl::make_unique<QueuePicker>(
p->Ref(DEBUG_LOCATION, "QueuePicker")));
}
}
} }
// If the selected subchannel goes bad, request a re-resolution.
// TODO(qianchengz): We may want to request re-resolution in
// ExitIdleLocked().
p->channel_control_helper()->RequestReresolution();
// Enter idle.
p->idle_ = true;
p->selected_ = nullptr;
p->subchannel_list_.reset();
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return; return;
} }
// If we get here, there are two possible cases: // If we get here, there are two possible cases:
@ -384,7 +372,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
ProcessUnselectedReadyLocked(); ProcessUnselectedReadyLocked();
break; break;
} }
case GRPC_CHANNEL_TRANSIENT_FAILURE: { case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_IDLE: {
CancelConnectivityWatchLocked("connection attempt failed"); CancelConnectivityWatchLocked("connection attempt failed");
PickFirstSubchannelData* sd = this; PickFirstSubchannelData* sd = this;
size_t next_index = size_t next_index =
@ -428,8 +417,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
sd->CheckConnectivityStateAndStartWatchingLocked(); sd->CheckConnectivityStateAndStartWatchingLocked();
break; break;
} }
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING: {
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1, and only if we're not // Only update connectivity state in case 1, and only if we're not
// already in TRANSIENT_FAILURE. // already in TRANSIENT_FAILURE.
if (subchannel_list() == p->subchannel_list_.get() && if (subchannel_list() == p->subchannel_list_.get() &&
@ -499,7 +487,7 @@ void PickFirst::PickFirstSubchannelData::
if (current_state == GRPC_CHANNEL_READY) { if (current_state == GRPC_CHANNEL_READY) {
if (p->selected_ != this) ProcessUnselectedReadyLocked(); if (p->selected_ != this) ProcessUnselectedReadyLocked();
} else { } else {
subchannel()->AttemptToConnect(); subchannel()->RequestConnection();
} }
} }

@ -273,7 +273,7 @@ class RingHash : public LoadBalancingPolicy {
[self]() { [self]() {
if (!self->ring_hash_lb_->shutdown_) { if (!self->ring_hash_lb_->shutdown_) {
for (auto& subchannel : self->subchannels_) { for (auto& subchannel : self->subchannels_) {
subchannel->AttemptToConnect(); subchannel->RequestConnection();
} }
} }
delete self; delete self;
@ -648,7 +648,7 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
num_subchannels()); num_subchannels());
} }
internally_triggered_connection_index_ = next_index; internally_triggered_connection_index_ = next_index;
subchannel(next_index)->subchannel()->AttemptToConnect(); subchannel(next_index)->subchannel()->RequestConnection();
} }
} }
@ -676,24 +676,11 @@ void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
} }
// Decide what state to report for the purposes of aggregation and // Decide what state to report for the purposes of aggregation and
// picker behavior. // picker behavior.
// If we haven't seen a failure since the last time we were in state // If the last recorded state was TRANSIENT_FAILURE, ignore the update
// READY, then we report the state change as-is. However, once we do see // unless the new state is READY.
// a failure, we report TRANSIENT_FAILURE and do not report any subsequent if (last_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
// state changes until we go back into state READY. connectivity_state != GRPC_CHANNEL_READY) {
if (last_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { return;
// If not transitioning to READY, ignore the update, since we want
// to continue to consider ourselves in TRANSIENT_FAILURE.
if (connectivity_state != GRPC_CHANNEL_READY) return;
} else if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If we go from READY to TF, treat it as IDLE.
// This transition can be caused by a "normal" connection failure, such
// as the server closing the connection due to a max-age setting. In
// this case, we want to have RPCs that hash to this subchannel wait for
// the reconnection attempt rather than assuming that the subchannel is
// bad and moving on to a subsequent subchannel in the ring.
if (last_connectivity_state_ == GRPC_CHANNEL_READY) {
connectivity_state = GRPC_CHANNEL_IDLE;
}
} }
// Update state counters used for aggregation. // Update state counters used for aggregation.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,

@ -75,14 +75,15 @@ class RoundRobin : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, address, std::move(subchannel)) {} : SubchannelData(subchannel_list, address, std::move(subchannel)) {}
grpc_connectivity_state connectivity_state() const { grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_; return logical_connectivity_state_;
} }
bool seen_failure_since_ready() const { return seen_failure_since_ready_; } // Computes and updates the logical connectivity state of the subchannel.
// Note that the logical connectivity state may differ from the
// Performs connectivity state updates that need to be done both when we // actual reported state in some cases (e.g., after we see
// first start watching and when a watcher notification is received. // TRANSIENT_FAILURE, we ignore any subsequent state changes until
void UpdateConnectivityStateLocked( // we see READY). Returns true if the state changed.
bool UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state); grpc_connectivity_state connectivity_state);
private: private:
@ -91,8 +92,7 @@ class RoundRobin : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked( void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override; grpc_connectivity_state connectivity_state) override;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE;
bool seen_failure_since_ready_ = false;
}; };
// A list of subchannels. // A list of subchannels.
@ -117,23 +117,27 @@ class RoundRobin : public LoadBalancingPolicy {
} }
// Starts watching the subchannels in this list. // Starts watching the subchannels in this list.
void StartWatchingLocked(); void StartWatchingLocked(absl::Status status_for_tf);
// Updates the counters of subchannels in each state when a // Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state. // subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state, void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state); grpc_connectivity_state new_state);
// If this subchannel list is the RR policy's current subchannel // Ensures that the right subchannel list is used and then updates
// list, updates the RR policy's connectivity state based on the // the RR policy's connectivity state based on the subchannel list's
// subchannel list's state counters. // state counters.
void MaybeUpdateRoundRobinConnectivityStateLocked(); void MaybeUpdateRoundRobinConnectivityStateLocked(
absl::Status status_for_tf);
// Updates the RR policy's overall state based on the counters of
// subchannels in each state.
void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
private: private:
std::string CountersString() const {
return absl::StrCat("num_subchannels=", num_subchannels(),
" num_ready=", num_ready_,
" num_connecting=", num_connecting_,
" num_transient_failure=", num_transient_failure_);
}
size_t num_ready_ = 0; size_t num_ready_ = 0;
size_t num_connecting_ = 0; size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0; size_t num_transient_failure_ = 0;
@ -238,8 +242,45 @@ void RoundRobin::ResetBackoffLocked() {
} }
} }
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { void RoundRobin::UpdateLocked(UpdateArgs args) {
if (num_subchannels() == 0) return; ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
addresses = std::move(*args.addresses);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
// If we already have a subchannel list, then ignore the resolver
// failure and keep using the existing list.
if (subchannel_list_ != nullptr) return;
}
// Create new subchannel list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, std::move(addresses), *args.args);
// Start watching the new list. If appropriate, this will cause it to be
// immediately promoted to subchannel_list_ and to generate a new picker.
latest_pending_subchannel_list_->StartWatchingLocked(
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status());
}
//
// RoundRobinSubchannelList
//
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked(
absl::Status status_for_tf) {
// Check current state of each subchannel synchronously, since any // Check current state of each subchannel synchronously, since any
// subchannel already used by some other channel may have a non-IDLE // subchannel already used by some other channel may have a non-IDLE
// state. // state.
@ -247,18 +288,18 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
grpc_connectivity_state state = grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked(); subchannel(i)->CheckConnectivityStateLocked();
if (state != GRPC_CHANNEL_IDLE) { if (state != GRPC_CHANNEL_IDLE) {
subchannel(i)->UpdateConnectivityStateLocked(state); subchannel(i)->UpdateLogicalConnectivityStateLocked(state);
} }
} }
// Start connectivity watch for each subchannel. // Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) { for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) { if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked(); subchannel(i)->StartConnectivityWatchLocked();
subchannel(i)->subchannel()->AttemptToConnect(); subchannel(i)->subchannel()->RequestConnection();
} }
} }
// Now set the LB policy's state based on the subchannels' states. // Update RR connectivity state if needed.
UpdateRoundRobinStateFromSubchannelStateCountsLocked(); MaybeUpdateRoundRobinConnectivityStateLocked(status_for_tf);
} }
void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
@ -284,80 +325,73 @@ void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
} }
} }
// Sets the RR policy's connectivity state and generates a new picker based
// on the current subchannel list.
void RoundRobin::RoundRobinSubchannelList:: void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked() { MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
RoundRobin* p = static_cast<RoundRobin*>(policy()); RoundRobin* p = static_cast<RoundRobin*>(policy());
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ in the following cases:
// - subchannel_list_ is null (i.e., this is the first update).
// - subchannel_list_ has no READY subchannels.
// - This list has at least one READY subchannel.
// - All of the subchannels in this list are in TRANSIENT_FAILURE, or
// the list is empty. (This may cause the channel to go from READY
// to TRANSIENT_FAILURE, but we're doing what the control plane told
// us to do.
if (p->latest_pending_subchannel_list_.get() == this &&
(p->subchannel_list_ == nullptr || p->subchannel_list_->num_ready_ == 0 ||
num_ready_ > 0 ||
// Note: num_transient_failure_ and num_subchannels() may both be 0.
num_transient_failure_ == num_subchannels())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
const std::string old_counters_string =
p->subchannel_list_ != nullptr ? p->subchannel_list_->CountersString()
: "";
gpr_log(
GPR_INFO,
"[RR %p] swapping out subchannel list %p (%s) in favor of %p (%s)", p,
p->subchannel_list_.get(), old_counters_string.c_str(), this,
CountersString().c_str());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Only set connectivity state if this is the current subchannel list. // Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return; if (p->subchannel_list_.get() != this) return;
// In priority order. The first rule to match terminates the search (ie, if we // First matching rule wins:
// are on rule n, all previous rules were unfulfilled). // 1) ANY subchannel is READY => policy is READY.
// // 2) ANY subchannel is CONNECTING => policy is CONNECTING.
// 1) RULE: ANY subchannel is READY => policy is READY. // 3) ALL subchannels are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
// CHECK: subchannel_list->num_ready > 0.
//
// 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
// CHECK: sd->curr_connectivity_state == CONNECTING.
//
// 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
// TRANSIENT_FAILURE.
// CHECK: subchannel_list->num_transient_failures ==
// subchannel_list->num_subchannels.
if (num_ready_ > 0) { if (num_ready_ > 0) {
// 1) READY if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting READY with subchannel list %p", p,
this);
}
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(), absl::make_unique<Picker>(p, this)); GRPC_CHANNEL_READY, absl::Status(), absl::make_unique<Picker>(p, this));
} else if (num_connecting_ > 0) { } else if (num_connecting_ > 0) {
// 2) CONNECTING if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with subchannel list %p",
p, this);
}
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(), GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker"))); absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
} else if (num_transient_failure_ == num_subchannels()) { } else if (num_transient_failure_ == num_subchannels()) {
// 3) TRANSIENT_FAILURE if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
absl::Status status = gpr_log(GPR_INFO,
absl::UnavailableError("connections to all backends failing"); "[RR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s",
p, this, status_for_tf.ToString().c_str());
}
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status, GRPC_CHANNEL_TRANSIENT_FAILURE, status_for_tf,
absl::make_unique<TransientFailurePicker>(status)); absl::make_unique<TransientFailurePicker>(status_for_tf));
} }
} }
void RoundRobin::RoundRobinSubchannelList:: //
UpdateRoundRobinStateFromSubchannelStateCountsLocked() { // RoundRobinSubchannelData
RoundRobin* p = static_cast<RoundRobin*>(policy()); //
// If we have at least one READY subchannel, then swap to the new list.
// Also, if all of the subchannels are in TRANSIENT_FAILURE, then we know
// we've tried all of them and failed, so we go ahead and swap over
// anyway; this may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we are doing what the control plane told us to do.
if (num_ready_ > 0 || num_transient_failure_ == num_subchannels()) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
// This list must be p->latest_pending_subchannel_list_, because
// any previous update would have been shut down already and
// therefore we would not be receiving a notification for them.
GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
GPR_ASSERT(!shutting_down());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
const size_t old_num_subchannels =
p->subchannel_list_ != nullptr
? p->subchannel_list_->num_subchannels()
: 0;
gpr_log(GPR_INFO,
"[RR %p] phasing out subchannel list %p (size %" PRIuPTR
") in favor of %p (size %" PRIuPTR ")",
p, p->subchannel_list_.get(), old_num_subchannels, this,
num_subchannels());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
}
// Update the RR policy's connectivity state if needed.
MaybeUpdateRoundRobinConnectivityStateLocked();
}
void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( bool RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state) { grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
@ -367,115 +401,74 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(), p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(), subchannel_list()->num_subchannels(),
ConnectivityStateName(last_connectivity_state_), ConnectivityStateName(logical_connectivity_state_),
ConnectivityStateName(connectivity_state)); ConnectivityStateName(connectivity_state));
} }
// Decide what state to report for aggregation purposes. // Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state // If the last logical state was TRANSIENT_FAILURE, then ignore the
// READY, then we report the state change as-is. However, once we do see // state change unless the new state is READY.
// a failure, we report TRANSIENT_FAILURE and do not report any subsequent if (logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
// state changes until we go back into state READY. connectivity_state != GRPC_CHANNEL_READY) {
if (!seen_failure_since_ready_) { return false;
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { }
seen_failure_since_ready_ = true; // If the new state is IDLE, treat it as CONNECTING, since it will
} // immediately transition into CONNECTING anyway.
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, if (connectivity_state == GRPC_CHANNEL_IDLE) {
connectivity_state); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
} else { gpr_log(GPR_INFO,
if (connectivity_state == GRPC_CHANNEL_READY) { "[RR %p] subchannel %p, subchannel_list %p (index %" PRIuPTR
seen_failure_since_ready_ = false; " of %" PRIuPTR "): treating IDLE as CONNECTING",
subchannel_list()->UpdateStateCountersLocked( p, subchannel(), subchannel_list(), Index(),
GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state); subchannel_list()->num_subchannels());
} }
connectivity_state = GRPC_CHANNEL_CONNECTING;
} }
// Record last seen connectivity state. // If no change, return false.
last_connectivity_state_ = connectivity_state; if (logical_connectivity_state_ == connectivity_state) return false;
// Otherwise, update counters and logical state.
subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
connectivity_state);
logical_connectivity_state_ = connectivity_state;
return true;
} }
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) { grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr); GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE, re-resolve. // If the new state is TRANSIENT_FAILURE or IDLE, re-resolve.
// Only do this if we've started watching, not at startup time. // Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant // when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution. // loop of re-resolution.
// Also attempt to reconnect. // Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
connectivity_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "[RR %p] Subchannel %p reported %s; requesting re-resolution", p,
"Requesting re-resolution", subchannel(), ConnectivityStateName(connectivity_state));
p, subchannel());
} }
p->channel_control_helper()->RequestReresolution(); p->channel_control_helper()->RequestReresolution();
subchannel()->AttemptToConnect(); subchannel()->RequestConnection();
} }
// Update state counters. // Update logical connectivity state.
UpdateConnectivityStateLocked(connectivity_state); // If it changed, update the policy state.
// Update overall state and renew notification. if (UpdateLogicalConnectivityStateLocked(connectivity_state)) {
subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked(
} absl::UnavailableError("connections to all backends failing"));
void RoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
addresses = std::move(*args.addresses);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
// If we already have a subchannel list, then ignore the resolver
// failure and keep using the existing list.
if (subchannel_list_ != nullptr) return;
}
// Replace latest_pending_subchannel_list_.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
latest_pending_subchannel_list_ != nullptr) {
gpr_log(GPR_INFO,
"[RR %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, std::move(addresses), *args.args);
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else if (subchannel_list_ == nullptr) {
// If there is no current list, immediately promote the new list to
// the current list and start watching it.
subchannel_list_ = std::move(latest_pending_subchannel_list_);
subchannel_list_->StartWatchingLocked();
} else {
// Start watching the pending list. It will get swapped into the
// current list when it reports READY.
latest_pending_subchannel_list_->StartWatchingLocked();
} }
} }
//
// factory
//
class RoundRobinConfig : public LoadBalancingPolicy::Config { class RoundRobinConfig : public LoadBalancingPolicy::Config {
public: public:
const char* name() const override { return kRoundRobin; } const char* name() const override { return kRoundRobin; }
}; };
//
// factory
//
class RoundRobinFactory : public LoadBalancingPolicyFactory { class RoundRobinFactory : public LoadBalancingPolicyFactory {
public: public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(

@ -308,43 +308,33 @@ class Subchannel::ConnectedSubchannelStateWatcher
const absl::Status& status) override { const absl::Status& status) override {
Subchannel* c = subchannel_.get(); Subchannel* c = subchannel_.get();
MutexLock lock(&c->mu_); MutexLock lock(&c->mu_);
switch (new_state) { // If we're either shutting down or have already seen this connection
case GRPC_CHANNEL_TRANSIENT_FAILURE: // failure (i.e., c->connected_subchannel_ is null), do nothing.
case GRPC_CHANNEL_SHUTDOWN: { //
if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { // upon connection close. So if the server gracefully shuts down,
gpr_log(GPR_INFO, // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we
"subchannel %p %s: Connected subchannel %p has gone into " // will see only SHUTDOWN. Either way, we react to the first one we
"%s. Attempting to reconnect.", // see, ignoring anything that happens after that.
c, c->key_.ToString().c_str(), if (c->connected_subchannel_ == nullptr) return;
c->connected_subchannel_.get(), if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
ConnectivityStateName(new_state)); new_state == GRPC_CHANNEL_SHUTDOWN) {
} if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
c->connected_subchannel_.reset(); gpr_log(GPR_INFO,
if (c->channelz_node() != nullptr) { "subchannel %p %s: Connected subchannel %p reports %s: %s", c,
c->channelz_node()->SetChildSocket(nullptr); c->key_.ToString().c_str(), c->connected_subchannel_.get(),
} ConnectivityStateName(new_state), status.ToString().c_str());
// We need to construct our own status if the underlying state was
// shutdown since the accompanying status will be StatusCode::OK
// otherwise.
c->SetConnectivityStateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
new_state == GRPC_CHANNEL_SHUTDOWN
? absl::Status(absl::StatusCode::kUnavailable,
"Subchannel has disconnected.")
: status);
c->backoff_begun_ = false;
c->backoff_.Reset();
}
break;
} }
default: { c->connected_subchannel_.reset();
// In principle, this should never happen. We should not get if (c->channelz_node() != nullptr) {
// a callback for READY, because that was the state we started c->channelz_node()->SetChildSocket(nullptr);
// this watch from. And a connected subchannel should never go
// from READY to CONNECTING or IDLE.
c->SetConnectivityStateLocked(new_state, status);
} }
// Even though we're reporting IDLE instead of TRANSIENT_FAILURE here,
// pass along the status from the transport, since it may have
// keepalive info attached to it that the channel needs.
// TODO(roth): Consider whether there's a cleaner way to do this.
c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status);
c->backoff_.Reset();
} }
} }
@ -563,6 +553,25 @@ Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); } void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
//
// Subchannel::ConnectivityStateWatcherInterface
//
void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
ConnectivityStateChange state_change) {
MutexLock lock(&mu_);
connectivity_state_queue_.push_back(std::move(state_change));
}
Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
MutexLock lock(&mu_);
GPR_ASSERT(!connectivity_state_queue_.empty());
ConnectivityStateChange state_change = connectivity_state_queue_.front();
connectivity_state_queue_.pop_front();
return state_change;
}
// //
// Subchannel // Subchannel
// //
@ -622,21 +631,6 @@ BackOff::Options ParseArgsForBackoffValues(const grpc_channel_args* args,
} // namespace } // namespace
void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
ConnectivityStateChange state_change) {
MutexLock lock(&mu_);
connectivity_state_queue_.push_back(std::move(state_change));
}
Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
MutexLock lock(&mu_);
GPR_ASSERT(!connectivity_state_queue_.empty());
ConnectivityStateChange state_change = connectivity_state_queue_.front();
connectivity_state_queue_.pop_front();
return state_change;
}
Subchannel::Subchannel(SubchannelKey key, Subchannel::Subchannel(SubchannelKey key,
OrphanablePtr<SubchannelConnector> connector, OrphanablePtr<SubchannelConnector> connector,
const grpc_channel_args* args) const grpc_channel_args* args)
@ -650,6 +644,7 @@ Subchannel::Subchannel(SubchannelKey key,
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, nullptr);
// Check proxy mapper to determine address to connect to and channel // Check proxy mapper to determine address to connect to and channel
// args to use. // args to use.
address_for_connect_ = key_.address(); address_for_connect_ = key_.address();
@ -789,20 +784,20 @@ void Subchannel::CancelConnectivityStateWatch(
} }
} }
void Subchannel::AttemptToConnect() { void Subchannel::RequestConnection() {
MutexLock lock(&mu_); MutexLock lock(&mu_);
MaybeStartConnectingLocked(); if (state_ == GRPC_CHANNEL_IDLE) {
StartConnectingLocked();
} else if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
connection_requested_ = true;
}
} }
void Subchannel::ResetBackoff() { void Subchannel::ResetBackoff() {
MutexLock lock(&mu_); MutexLock lock(&mu_);
backoff_.Reset(); backoff_.Reset();
if (have_retry_alarm_) { if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
retry_immediately_ = true; grpc_timer_cancel(&retry_timer_);
grpc_timer_cancel(&retry_alarm_);
} else {
backoff_begun_ = false;
MaybeStartConnectingLocked();
} }
} }
@ -814,8 +809,8 @@ void Subchannel::Orphan() {
subchannel_pool_.reset(); subchannel_pool_.reset();
} }
MutexLock lock(&mu_); MutexLock lock(&mu_);
GPR_ASSERT(!disconnected_); GPR_ASSERT(!shutdown_);
disconnected_ = true; shutdown_ = true;
connector_.reset(); connector_.reset();
connected_subchannel_.reset(); connected_subchannel_.reset();
health_watcher_map_.ShutdownLocked(); health_watcher_map_.ShutdownLocked();
@ -885,77 +880,44 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
health_watcher_map_.NotifyLocked(state, status); health_watcher_map_.NotifyLocked(state, status);
} }
void Subchannel::MaybeStartConnectingLocked() { void Subchannel::OnRetryTimer(void* arg, grpc_error_handle /*error*/) {
if (disconnected_) { WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
// Don't try to connect if we're already disconnected. {
return; MutexLock lock(&c->mu_);
} c->OnRetryTimerLocked();
if (connecting_) {
// Already connecting: don't restart.
return;
}
if (connected_subchannel_ != nullptr) {
// Already connected: don't restart.
return;
}
connecting_ = true;
WeakRef(DEBUG_LOCATION, "connecting")
.release(); // ref held by pending connect
if (!backoff_begun_) {
backoff_begun_ = true;
ContinueConnectingLocked();
} else {
GPR_ASSERT(!have_retry_alarm_);
have_retry_alarm_ = true;
const Duration time_til_next =
next_attempt_deadline_ - ExecCtx::Get()->Now();
if (time_til_next <= Duration::Zero()) {
gpr_log(GPR_INFO, "subchannel %p %s: Retry immediately", this,
key_.ToString().c_str());
} else {
gpr_log(GPR_INFO, "subchannel %p %s: Retry in %" PRId64 " milliseconds",
this, key_.ToString().c_str(), time_til_next.millis());
}
GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_);
} }
c.reset(DEBUG_LOCATION, "RetryTimer");
} }
void Subchannel::OnRetryAlarm(void* arg, grpc_error_handle error) { void Subchannel::OnRetryTimerLocked() {
WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg)); if (shutdown_) return;
MutexLock lock(&c->mu_); if (connection_requested_) {
c->have_retry_alarm_ = false;
if (c->disconnected_) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
&error, 1);
} else if (c->retry_immediately_) {
c->retry_immediately_ = false;
error = GRPC_ERROR_NONE;
} else {
(void)GRPC_ERROR_REF(error);
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"subchannel %p %s: failed to connect to channel, retrying", c.get(), "subchannel %p %s: connection attempt requested while backoff "
c->key_.ToString().c_str()); "timer was pending, retrying now",
c->ContinueConnectingLocked(); this, key_.ToString().c_str());
// Still connecting, keep ref around. Note that this stolen ref won't connection_requested_ = false;
// be dropped without first acquiring c->mu_. StartConnectingLocked();
c.release(); } else {
gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE",
this, key_.ToString().c_str());
SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus());
} }
GRPC_ERROR_UNREF(error);
} }
void Subchannel::ContinueConnectingLocked() { void Subchannel::StartConnectingLocked() {
// Set next attempt time.
const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now();
next_attempt_time_ = backoff_.NextAttemptTime();
// Report CONNECTING.
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
// Start connection attempt.
SubchannelConnector::Args args; SubchannelConnector::Args args;
args.address = &address_for_connect_; args.address = &address_for_connect_;
args.interested_parties = pollset_set_; args.interested_parties = pollset_set_;
const Timestamp min_deadline = min_connect_timeout_ + ExecCtx::Get()->Now(); args.deadline = std::max(next_attempt_time_, min_deadline);
next_attempt_deadline_ = backoff_.NextAttemptTime();
args.deadline = std::max(next_attempt_deadline_, min_deadline);
args.channel_args = args_; args.channel_args = args_;
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status()); WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback.
connector_->Connect(args, &connecting_result_, &on_connecting_finished_); connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
} }
@ -965,19 +927,36 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
c->connecting_result_.channel_args; c->connecting_result_.channel_args;
{ {
MutexLock lock(&c->mu_); MutexLock lock(&c->mu_);
c->connecting_ = false; c->OnConnectingFinishedLocked(GRPC_ERROR_REF(error));
if (c->connecting_result_.transport != nullptr &&
c->PublishTransportLocked()) {
// Do nothing, transport was published.
} else if (!c->disconnected_) {
gpr_log(GPR_INFO, "subchannel %p %s: connect failed: %s", c.get(),
c->key_.ToString().c_str(), grpc_error_std_string(error).c_str());
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
}
} }
grpc_channel_args_destroy(delete_channel_args); grpc_channel_args_destroy(delete_channel_args);
c.reset(DEBUG_LOCATION, "connecting"); c.reset(DEBUG_LOCATION, "Connect");
}
void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
if (shutdown_) {
(void)GRPC_ERROR_UNREF(error);
return;
}
// If we didn't get a transport or we fail to publish it, report
// TRANSIENT_FAILURE and start the retry timer.
// Note that if the connection attempt took longer than the backoff
// time, then the timer will fire immediately, and we will quickly
// transition back to IDLE.
if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
const Duration time_until_next_attempt =
next_attempt_time_ - ExecCtx::Get()->Now();
gpr_log(GPR_INFO,
"subchannel %p %s: connect failed (%s), backing off for %" PRId64
" ms",
this, key_.ToString().c_str(), grpc_error_std_string(error).c_str(),
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
WeakRef(DEBUG_LOCATION, "RetryTimer").release(); // Ref held by callback.
grpc_timer_init(&retry_timer_, next_attempt_time_, &on_retry_timer_);
}
(void)GRPC_ERROR_UNREF(error);
} }
bool Subchannel::PublishTransportLocked() { bool Subchannel::PublishTransportLocked() {
@ -1001,7 +980,7 @@ bool Subchannel::PublishTransportLocked() {
RefCountedPtr<channelz::SocketNode> socket = RefCountedPtr<channelz::SocketNode> socket =
std::move(connecting_result_.socket_node); std::move(connecting_result_.socket_node);
connecting_result_.Reset(); connecting_result_.Reset();
if (disconnected_) return false; if (shutdown_) return false;
// Publish. // Publish.
connected_subchannel_.reset( connected_subchannel_.reset(
new ConnectedSubchannel(stk->release(), args_, channelz_node_)); new ConnectedSubchannel(stk->release(), args_, channelz_node_));

@ -253,7 +253,7 @@ class Subchannel : public DualRefCounted<Subchannel> {
} }
// Attempt to connect to the backend. Has no effect if already connected. // Attempt to connect to the backend. Has no effect if already connected.
void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_); void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_);
// Resets the connection backoff of the subchannel. // Resets the connection backoff of the subchannel.
void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_); void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
@ -344,12 +344,14 @@ class Subchannel : public DualRefCounted<Subchannel> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Methods for connection. // Methods for connection.
void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); static void OnRetryTimer(void* arg, grpc_error_handle error)
static void OnRetryAlarm(void* arg, grpc_error_handle error)
ABSL_LOCKS_EXCLUDED(mu_); ABSL_LOCKS_EXCLUDED(mu_);
void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static void OnConnectingFinished(void* arg, grpc_error_handle error) static void OnConnectingFinished(void* arg, grpc_error_handle error)
ABSL_LOCKS_EXCLUDED(mu_); ABSL_LOCKS_EXCLUDED(mu_);
void OnConnectingFinishedLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// The subchannel pool this subchannel is in. // The subchannel pool this subchannel is in.
@ -365,6 +367,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
grpc_pollset_set* pollset_set_; grpc_pollset_set* pollset_set_;
// Channelz tracking. // Channelz tracking.
RefCountedPtr<channelz::SubchannelNode> channelz_node_; RefCountedPtr<channelz::SubchannelNode> channelz_node_;
// Minimum connection timeout.
Duration min_connect_timeout_;
// Connection state. // Connection state.
OrphanablePtr<SubchannelConnector> connector_; OrphanablePtr<SubchannelConnector> connector_;
@ -374,12 +378,18 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Protects the other members. // Protects the other members.
Mutex mu_; Mutex mu_;
// Active connection, or null. bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
bool connecting_ ABSL_GUARDED_BY(mu_) = false; // Records if RequestConnection() was called while in backoff.
bool disconnected_ ABSL_GUARDED_BY(mu_) = false; bool connection_requested_ ABSL_GUARDED_BY(mu_) = false;
// Connectivity state tracking. // Connectivity state tracking.
// Note that the connectivity state implies the state of the
// Subchannel object:
// - IDLE: no retry timer pending, can start a connection attempt at any time
// - CONNECTING: connection attempt in progress
// - READY: connection attempt succeeded, connected_subchannel_ created
// - TRANSIENT_FAILURE: connection attempt failed, retry timer pending
grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE; grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE;
absl::Status status_ ABSL_GUARDED_BY(mu_); absl::Status status_ ABSL_GUARDED_BY(mu_);
// The list of watchers without a health check service name. // The list of watchers without a health check service name.
@ -387,25 +397,21 @@ class Subchannel : public DualRefCounted<Subchannel> {
// The map of watchers with health check service names. // The map of watchers with health check service names.
HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_); HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_);
// Data provider map. // Active connection, or null.
std::map<const char* /*type*/, DataProducerInterface*> data_producer_map_ RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
ABSL_GUARDED_BY(mu_);
// Minimum connect timeout - must be located before backoff_.
Duration min_connect_timeout_ ABSL_GUARDED_BY(mu_);
// Backoff state. // Backoff state.
BackOff backoff_ ABSL_GUARDED_BY(mu_); BackOff backoff_ ABSL_GUARDED_BY(mu_);
Timestamp next_attempt_deadline_ ABSL_GUARDED_BY(mu_); Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_);
bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false; grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_closure on_retry_timer_ ABSL_GUARDED_BY(mu_);
// Retry alarm.
grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_);
grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_);
bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false;
// reset_backoff() was called while alarm was pending.
bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false;
// Keepalive time period (-1 for unset) // Keepalive time period (-1 for unset)
int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1; int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;
// Data producer map.
std::map<const char* /*type*/, DataProducerInterface*> data_producer_map_
ABSL_GUARDED_BY(mu_);
}; };
} // namespace grpc_core } // namespace grpc_core

@ -84,12 +84,12 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
// If the subchannel is currently in backoff delay due to a previously // If the subchannel is currently in backoff delay due to a previously
// failed attempt, the new connection attempt will not start until the // failed attempt, the new connection attempt will not start until the
// backoff delay has elapsed. // backoff delay has elapsed.
virtual void AttemptToConnect() = 0; virtual void RequestConnection() = 0;
// Resets the subchannel's connection backoff state. If AttemptToConnect() // Resets the subchannel's connection backoff state. If RequestConnection()
// has been called since the subchannel entered TRANSIENT_FAILURE state, // has been called since the subchannel entered TRANSIENT_FAILURE state,
// starts a new connection attempt immediately; otherwise, a new connection // starts a new connection attempt immediately; otherwise, a new connection
// attempt will be started as soon as AttemptToConnect() is called. // attempt will be started as soon as RequestConnection() is called.
virtual void ResetBackoff() = 0; virtual void ResetBackoff() = 0;
// Registers a new data watcher. // Registers a new data watcher.
@ -124,7 +124,9 @@ class DelegatingSubchannel : public SubchannelInterface {
ConnectivityStateWatcherInterface* watcher) override { ConnectivityStateWatcherInterface* watcher) override {
return wrapped_subchannel_->CancelConnectivityStateWatch(watcher); return wrapped_subchannel_->CancelConnectivityStateWatch(watcher);
} }
void AttemptToConnect() override { wrapped_subchannel_->AttemptToConnect(); } void RequestConnection() override {
wrapped_subchannel_->RequestConnection();
}
void ResetBackoff() override { wrapped_subchannel_->ResetBackoff(); } void ResetBackoff() override { wrapped_subchannel_->ResetBackoff(); }
const grpc_channel_args* channel_args() override { const grpc_channel_args* channel_args() override {
return wrapped_subchannel_->channel_args(); return wrapped_subchannel_->channel_args();

@ -849,6 +849,15 @@ void Server::CancelAllCalls() {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
} }
void Server::SendGoaways() {
ChannelBroadcaster broadcaster;
{
MutexLock lock(&mu_global_);
broadcaster.FillChannelsLocked(GetChannelsLocked());
}
broadcaster.BroadcastShutdown(/*send_goaway=*/true, GRPC_ERROR_NONE);
}
void Server::Orphan() { void Server::Orphan() {
{ {
MutexLock lock(&mu_global_); MutexLock lock(&mu_global_);

@ -170,6 +170,8 @@ class Server : public InternallyRefCounted<Server>,
void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_); void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_);
void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
private: private:
struct RequestedCall; struct RequestedCall;

@ -58,6 +58,7 @@
#include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_impl.h" #include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/client/secure_credentials.h" #include "src/cpp/client/secure_credentials.h"
#include "src/cpp/server/secure_server_credentials.h" #include "src/cpp/server/secure_server_credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -324,8 +325,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
const bool success = const bool success =
SendRpc(stub, &response, 2000, &status, wait_for_ready); SendRpc(stub, &response, 2000, &status, wait_for_ready);
ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line() ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
<< "\n" << "\nError: " << status.error_message() << " "
<< "Error: " << status.error_message() << " "
<< status.error_details(); << status.error_details();
ASSERT_EQ(response.message(), kRequestMessage) ASSERT_EQ(response.message(), kRequestMessage)
<< "From " << location.file() << ":" << location.line(); << "From " << location.file() << ":" << location.line();
@ -399,20 +399,44 @@ class ClientLbEnd2endTest : public ::testing::Test {
for (const auto& server : servers_) server->service_.ResetCounters(); for (const auto& server : servers_) server->service_.ResetCounters();
} }
void WaitForServer( bool SeenAllServers(size_t start_index, size_t stop_index) {
for (size_t i = start_index; i < stop_index; ++i) {
if (servers_[i]->service_.request_count() == 0) return false;
}
return true;
}
void WaitForServers(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub, const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t server_idx, const grpc_core::DebugLocation& location, size_t start_index, size_t stop_index,
bool ignore_failure = false) { const grpc_core::DebugLocation& location, bool ignore_failure = false) {
do { auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
gpr_log(GPR_INFO,
"========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
") ==========",
start_index, stop_index);
while (!SeenAllServers(start_index, stop_index)) {
if (ignore_failure) { if (ignore_failure) {
SendRpc(stub); SendRpc(stub);
} else { } else {
CheckRpcSendOk(stub, location, true); CheckRpcSendOk(stub, location, true);
} }
} while (servers_[server_idx]->service_.request_count() == 0); EXPECT_LE(absl::Now(), deadline)
<< " at " << location.file() << ":" << location.line();
if (absl::Now() >= deadline) break;
}
ResetCounters(); ResetCounters();
} }
void WaitForServer(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t server_index, const grpc_core::DebugLocation& location,
bool ignore_failure = false) {
WaitForServers(stub, server_index, server_index + 1, location,
ignore_failure);
}
bool WaitForChannelState( bool WaitForChannelState(
Channel* channel, Channel* channel,
const std::function<bool(grpc_connectivity_state)>& predicate, const std::function<bool(grpc_connectivity_state)>& predicate,
@ -494,7 +518,42 @@ TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
EXPECT_TRUE(WaitForChannelReady(channel.get())); EXPECT_TRUE(WaitForChannelReady(channel.get()));
} }
TEST_F(ClientLbEnd2endTest, PickFirst) { TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
// Start server.
const int kNumServers = 1;
StartServers(kNumServers);
// Set max idle time and build the channel.
ChannelArguments args;
args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("", response_generator, args);
auto stub = BuildStub(channel);
// The initial channel state should be IDLE.
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// After sending RPC, channel state should be READY.
gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***");
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// After a period time not using the channel, the channel state should switch
// to IDLE.
gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***");
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// Sending a new RPC should awake the IDLE channel.
gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***");
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
}
//
// pick_first tests
//
using PickFirstTest = ClientLbEnd2endTest;
TEST_F(PickFirstTest, Basic) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -521,7 +580,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { TEST_F(PickFirstTest, ProcessPending) {
StartServers(1); // Single server StartServers(1); // Single server
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel( auto channel = BuildChannel(
@ -541,7 +600,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
CheckRpcSendOk(second_stub, DEBUG_LOCATION); CheckRpcSendOk(second_stub, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { TEST_F(PickFirstTest, SelectsReadyAtStartup) {
ChannelArguments args; ChannelArguments args;
constexpr int kInitialBackOffMs = 5000; constexpr int kInitialBackOffMs = 5000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
@ -567,7 +626,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */)); EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
} }
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { TEST_F(PickFirstTest, BackOffInitialReconnect) {
ChannelArguments args; ChannelArguments args;
constexpr int kInitialBackOffMs = 100; constexpr int kInitialBackOffMs = 100;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
@ -599,7 +658,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
0); 0);
} }
TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { TEST_F(PickFirstTest, BackOffMinReconnect) {
ChannelArguments args; ChannelArguments args;
constexpr int kMinReconnectBackOffMs = 1000; constexpr int kMinReconnectBackOffMs = 1000;
args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs); args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
@ -625,7 +684,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
EXPECT_GE(waited.millis(), kMinReconnectBackOffMs - 1); EXPECT_GE(waited.millis(), kMinReconnectBackOffMs - 1);
} }
TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { TEST_F(PickFirstTest, ResetConnectionBackoff) {
ChannelArguments args; ChannelArguments args;
constexpr int kInitialBackOffMs = 1000; constexpr int kInitialBackOffMs = 1000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
@ -660,7 +719,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
} }
TEST_F(ClientLbEnd2endTest, TEST_F(ClientLbEnd2endTest,
PickFirstResetConnectionBackoffNextAttemptStartsImmediately) { ResetConnectionBackoffNextAttemptStartsImmediately) {
ChannelArguments args; ChannelArguments args;
constexpr int kInitialBackOffMs = 1000; constexpr int kInitialBackOffMs = 1000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
@ -707,7 +766,7 @@ TEST_F(ClientLbEnd2endTest,
EXPECT_LT(waited.millis(), kWaitMs); EXPECT_LT(waited.millis(), kWaitMs);
} }
TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { TEST_F(PickFirstTest, Updates) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -756,7 +815,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { TEST_F(PickFirstTest, UpdateSuperset) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -789,7 +848,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstUpdateToUnconnected) { TEST_F(PickFirstTest, UpdateToUnconnected) {
const int kNumServers = 2; const int kNumServers = 2;
CreateServers(kNumServers); CreateServers(kNumServers);
StartServer(0); StartServer(0);
@ -822,7 +881,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateToUnconnected) {
EXPECT_TRUE(WaitForChannelReady(channel.get())); EXPECT_TRUE(WaitForChannelReady(channel.get()));
} }
TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) { TEST_F(PickFirstTest, GlobalSubchannelPool) {
// Start one server. // Start one server.
const int kNumServers = 1; const int kNumServers = 1;
StartServers(kNumServers); StartServers(kNumServers);
@ -847,7 +906,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstGlobalSubchannelPool) {
EXPECT_EQ(1UL, servers_[0]->service_.clients().size()); EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
} }
TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) { TEST_F(PickFirstTest, LocalSubchannelPool) {
// Start one server. // Start one server.
const int kNumServers = 1; const int kNumServers = 1;
StartServers(kNumServers); StartServers(kNumServers);
@ -874,7 +933,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstLocalSubchannelPool) {
EXPECT_EQ(2UL, servers_[0]->service_.clients().size()); EXPECT_EQ(2UL, servers_[0]->service_.clients().size());
} }
TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { TEST_F(PickFirstTest, ManyUpdates) {
const int kNumUpdates = 1000; const int kNumUpdates = 1000;
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -894,7 +953,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { TEST_F(PickFirstTest, ReresolutionNoSelected) {
// Prepare the ports for up servers and down servers. // Prepare the ports for up servers and down servers.
const int kNumServers = 3; const int kNumServers = 3;
const int kNumAliveServers = 1; const int kNumAliveServers = 1;
@ -926,7 +985,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) { TEST_F(PickFirstTest, ReconnectWithoutNewResolverResult) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()}; std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports); StartServers(1, ports);
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
@ -943,8 +1002,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 0, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, TEST_F(PickFirstTest, ReconnectWithoutNewResolverResultStartsFromTopOfList) {
PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
std::vector<int> ports = {grpc_pick_unused_port_or_die(), std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()}; grpc_pick_unused_port_or_die()};
CreateServers(2, ports); CreateServers(2, ports);
@ -963,7 +1021,7 @@ TEST_F(ClientLbEnd2endTest,
WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 0, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { TEST_F(PickFirstTest, CheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()}; std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports); StartServers(1, ports);
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
@ -1003,7 +1061,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName()); EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) { TEST_F(PickFirstTest, IdleOnDisconnect) {
// Start server, send RPC, and make sure channel is READY. // Start server, send RPC, and make sure channel is READY.
const int kNumServers = 1; const int kNumServers = 1;
StartServers(kNumServers); StartServers(kNumServers);
@ -1022,7 +1080,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
servers_.clear(); servers_.clear();
} }
TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) { TEST_F(PickFirstTest, PendingUpdateAndSelectedSubchannelFails) {
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
auto channel = auto channel =
BuildChannel("", response_generator); // pick_first is the default. BuildChannel("", response_generator); // pick_first is the default.
@ -1071,7 +1129,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */); WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
} }
TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) { TEST_F(PickFirstTest, StaysIdleUponEmptyUpdate) {
// Start server, send RPC, and make sure channel is READY. // Start server, send RPC, and make sure channel is READY.
const int kNumServers = 1; const int kNumServers = 1;
StartServers(kNumServers); StartServers(kNumServers);
@ -1099,8 +1157,8 @@ TEST_F(ClientLbEnd2endTest, PickFirstStaysIdleUponEmptyUpdate) {
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
} }
TEST_F(ClientLbEnd2endTest, TEST_F(PickFirstTest,
PickFirstStaysTransientFailureOnFailedConnectionAttemptUntilReady) { StaysTransientFailureOnFailedConnectionAttemptUntilReady) {
// Allocate 3 ports, with no servers running. // Allocate 3 ports, with no servers running.
std::vector<int> ports = {grpc_pick_unused_port_or_die(), std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(), grpc_pick_unused_port_or_die(),
@ -1127,7 +1185,13 @@ TEST_F(ClientLbEnd2endTest,
CheckRpcSendOk(stub, DEBUG_LOCATION); CheckRpcSendOk(stub, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, RoundRobin) { //
// round_robin tests
//
using RoundRobinTest = ClientLbEnd2endTest;
TEST_F(RoundRobinTest, Basic) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -1156,7 +1220,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) { TEST_F(RoundRobinTest, ProcessPending) {
StartServers(1); // Single server StartServers(1); // Single server
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator); auto channel = BuildChannel("round_robin", response_generator);
@ -1175,17 +1239,16 @@ TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
CheckRpcSendOk(second_stub, DEBUG_LOCATION); CheckRpcSendOk(second_stub, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { TEST_F(RoundRobinTest, Updates) {
// Start servers and send one RPC per server. // Start servers.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator); auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel); auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server. // Start with a single server.
gpr_log(GPR_INFO, "*** FIRST BACKEND ***"); gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
ports.emplace_back(servers_[0]->port_); std::vector<int> ports = {servers_[0]->port_};
response_generator.SetNextResolution(ports); response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go servers_[0] // Send RPCs. They should all go servers_[0]
@ -1193,7 +1256,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ(10, servers_[0]->service_.request_count()); EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count());
servers_[0]->service_.ResetCounters(); ResetCounters();
// And now for the second server. // And now for the second server.
gpr_log(GPR_INFO, "*** SECOND BACKEND ***"); gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
ports.clear(); ports.clear();
@ -1207,7 +1270,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count()); EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count());
servers_[1]->service_.ResetCounters(); ResetCounters();
// ... and for the last server. // ... and for the last server.
gpr_log(GPR_INFO, "*** THIRD BACKEND ***"); gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
ports.clear(); ports.clear();
@ -1218,7 +1281,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count()); EXPECT_EQ(10, servers_[2]->service_.request_count());
servers_[2]->service_.ResetCounters(); ResetCounters();
// Back to all servers. // Back to all servers.
gpr_log(GPR_INFO, "*** ALL BACKENDS ***"); gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
ports.clear(); ports.clear();
@ -1226,23 +1289,19 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_); ports.emplace_back(servers_[2]->port_);
response_generator.SetNextResolution(ports); response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServers(stub, 0, 3, DEBUG_LOCATION);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server. // Send three RPCs, one per server.
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[0]->service_.request_count()); EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count()); EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count()); EXPECT_EQ(1, servers_[2]->service_.request_count());
ResetCounters();
// An empty update will result in the channel going into TRANSIENT_FAILURE. // An empty update will result in the channel going into TRANSIENT_FAILURE.
gpr_log(GPR_INFO, "*** NO BACKENDS ***"); gpr_log(GPR_INFO, "*** NO BACKENDS ***");
ports.clear(); ports.clear();
response_generator.SetNextResolution(ports); response_generator.SetNextResolution(ports);
grpc_connectivity_state channel_state; WaitForChannelNotReady(channel.get());
do { CheckRpcSendFailure(stub);
channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters(); servers_[0]->service_.ResetCounters();
// Next update introduces servers_[1], making the channel recover. // Next update introduces servers_[1], making the channel recover.
gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***"); gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
@ -1250,13 +1309,12 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[1]->port_);
response_generator.SetNextResolution(ports); response_generator.SetNextResolution(ports);
WaitForServer(stub, 1, DEBUG_LOCATION); WaitForServer(stub, 1, DEBUG_LOCATION);
channel_state = channel->GetState(false /* try to connect */); EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
// Check LB policy name for the channel. // Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { TEST_F(RoundRobinTest, UpdateInError) {
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
@ -1286,7 +1344,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count());
} }
TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { TEST_F(RoundRobinTest, ManyUpdates) {
// Start servers and send one RPC per server. // Start servers and send one RPC per server.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -1304,66 +1362,33 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
} }
TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) { TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) {
// TODO(dgq): replicate the way internal testing exercises the concurrent // Start 3 servers.
// update provisions of RR. StartServers(3);
} // Create channel.
TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
std::vector<int> first_ports;
std::vector<int> second_ports;
first_ports.reserve(kNumServers);
for (int i = 0; i < kNumServers; ++i) {
first_ports.push_back(grpc_pick_unused_port_or_die());
}
second_ports.reserve(kNumServers);
for (int i = 0; i < kNumServers; ++i) {
second_ports.push_back(grpc_pick_unused_port_or_die());
}
StartServers(kNumServers, first_ports);
auto response_generator = BuildResolverResponseGenerator(); auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator); auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel); auto stub = BuildStub(channel);
response_generator.SetNextResolution(first_ports); // Initially, tell the channel about only the first two servers.
// Send a number of RPCs, which succeed. std::vector<int> ports = {servers_[0]->port_, servers_[1]->port_};
for (size_t i = 0; i < 100; ++i) { response_generator.SetNextResolution(ports);
CheckRpcSendOk(stub, DEBUG_LOCATION); // Wait for both servers to be seen.
} WaitForServers(stub, 0, 2, DEBUG_LOCATION);
// Kill all servers // Tell the fake resolver to send an update that adds the last server, but
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******"); // only when the LB policy requests re-resolution.
for (size_t i = 0; i < servers_.size(); ++i) { ports.push_back(servers_[2]->port_);
servers_[i]->Shutdown(); response_generator.SetNextResolutionUponError(ports);
// Have server 0 send a GOAWAY. This should trigger a re-resolution.
gpr_log(GPR_INFO, "****** SENDING GOAWAY FROM SERVER 0 *******");
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
} }
gpr_log(GPR_INFO, "****** SERVERS KILLED *******"); // Wait for the client to see server 2.
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******"); WaitForServer(stub, 2, DEBUG_LOCATION);
// Client requests should fail. Send enough to tickle all subchannels.
for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
// Bring servers back up on a different set of ports. We need to do this to be
// sure that the eventual success is *not* due to subchannel reconnection
// attempts and that an actual re-resolution has happened as a result of the
// RR policy going into transient failure when all its subchannels become
// unavailable (in transient failure as well).
gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
StartServers(kNumServers, second_ports);
// Don't notify of the update. Wait for the LB policy's re-resolution to
// "pull" the new ports.
response_generator.SetNextResolutionUponError(second_ports);
gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
// Client request should eventually (but still fairly soon) succeed.
const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
while (gpr_time_cmp(deadline, now) > 0) {
if (SendRpc(stub)) break;
now = gpr_now(GPR_CLOCK_MONOTONIC);
}
ASSERT_GT(gpr_time_cmp(deadline, now), 0);
} }
TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) { TEST_F(RoundRobinTest, TransientFailure) {
// Start servers and create channel. Channel should go to READY state. // Start servers and create channel. Channel should go to READY state.
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
@ -1391,7 +1416,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) {
EXPECT_TRUE(WaitForChannelState(channel.get(), predicate)); EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
} }
TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) { TEST_F(RoundRobinTest, TransientFailureAtStartup) {
// Create channel and return servers that don't exist. Channel should // Create channel and return servers that don't exist. Channel should
// quickly transition into TRANSIENT_FAILURE. // quickly transition into TRANSIENT_FAILURE.
// TODO(roth): This test should ideally check that even when the // TODO(roth): This test should ideally check that even when the
@ -1420,7 +1445,146 @@ TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) {
EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true)); EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
} }
TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { TEST_F(RoundRobinTest, DoesNotFailRpcsUponDisconnection) {
// A connection attempt injector that allows us to control timing.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
explicit ConnectionInjector(int port) : port_(port) {}
void InterceptNextAttempt() {
grpc_core::MutexLock lock(&mu_);
intercept_next_attempt_ = true;
}
void WaitForAttemptToStart() {
grpc_core::MutexLock lock(&mu_);
while (queued_attempt_ == nullptr) {
start_cond_.Wait(&mu_);
}
}
void ResumeAttempt() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
attempt = std::move(queued_attempt_);
}
attempt->Resume();
}
void WaitForAttemptComplete() {
grpc_core::MutexLock lock(&mu_);
while (!attempt_complete_) {
complete_cond_.Wait(&mu_);
}
}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
if (port == port_) {
grpc_core::MutexLock lock(&mu_);
if (intercept_next_attempt_) {
gpr_log(GPR_INFO, "*** INTERCEPTING CONNECTION ATTEMPT");
original_closure_ = closure;
closure = GRPC_CLOSURE_INIT(&closure_, OnComplete, this, nullptr);
intercept_next_attempt_ = false;
queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
start_cond_.Signal();
return;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
private:
static void OnComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<ConnectionInjector*>(arg);
{
grpc_core::MutexLock lock(&self->mu_);
self->attempt_complete_ = true;
self->complete_cond_.Signal();
}
grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
GRPC_ERROR_REF(error));
}
const int port_;
grpc_core::Mutex mu_;
bool intercept_next_attempt_ ABSL_GUARDED_BY(mu_) = false;
grpc_core::CondVar start_cond_;
std::unique_ptr<QueuedAttempt> queued_attempt_ ABSL_GUARDED_BY(mu_);
grpc_closure* original_closure_ = nullptr;
grpc_closure closure_;
grpc_core::CondVar complete_cond_;
bool attempt_complete_ ABSL_GUARDED_BY(mu_) = false;
};
// Start server.
StartServers(1);
ConnectionInjector injector(servers_[0]->port_);
injector.Start();
// Create channel.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
// Start a thread constantly sending RPCs in a loop.
gpr_log(GPR_ERROR, "=== STARTING CLIENT THREAD ===");
std::atomic<bool> shutdown{false};
gpr_event ev;
gpr_event_init(&ev);
std::thread thd([&]() {
gpr_log(GPR_INFO, "sending first RPC");
CheckRpcSendOk(stub, DEBUG_LOCATION);
gpr_event_set(&ev, reinterpret_cast<void*>(1));
while (!shutdown.load()) {
gpr_log(GPR_INFO, "sending RPC");
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
});
// Wait for first RPC to complete.
gpr_log(GPR_ERROR, "=== WAITING FOR FIRST RPC TO COMPLETE ===");
ASSERT_EQ(reinterpret_cast<void*>(1),
gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(1)));
// Channel should now be READY.
ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
// Tell injector to intercept the next connection attempt.
injector.InterceptNextAttempt();
// Now kill the server. The subchannel should report IDLE and be
// immediately reconnected to, but this should not cause any test
// failures.
gpr_log(GPR_ERROR, "=== SHUTTING DOWN SERVER ===");
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways();
}
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
servers_[0]->Shutdown();
// Wait for next attempt to start.
gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT ===");
injector.WaitForAttemptToStart();
// Start server and allow attempt to continue.
gpr_log(GPR_ERROR, "=== RESTARTING SERVER ===");
StartServer(0);
injector.ResumeAttempt();
// Wait for next attempt to complete.
gpr_log(GPR_ERROR, "=== WAITING FOR RECONNECTION ATTEMPT TO COMPLETE ===");
injector.WaitForAttemptComplete();
// Now shut down the thread.
gpr_log(GPR_ERROR, "=== SHUTTING DOWN CLIENT THREAD ===");
shutdown.store(true);
thd.join();
}
TEST_F(RoundRobinTest, SingleReconnect) {
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
const auto ports = GetServersPorts(); const auto ports = GetServersPorts();
@ -1464,8 +1628,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// If health checking is required by client but health checking service // If health checking is required by client but health checking service
// is not running on the server, the channel should be treated as healthy. // is not running on the server, the channel should be treated as healthy.
TEST_F(ClientLbEnd2endTest, TEST_F(RoundRobinTest, ServersHealthCheckingUnimplementedTreatedAsHealthy) {
RoundRobinServersHealthCheckingUnimplementedTreatedAsHealthy) {
StartServers(1); // Single server StartServers(1); // Single server
ChannelArguments args; ChannelArguments args;
args.SetServiceConfigJSON( args.SetServiceConfigJSON(
@ -1479,7 +1642,7 @@ TEST_F(ClientLbEnd2endTest,
CheckRpcSendOk(stub, DEBUG_LOCATION); CheckRpcSendOk(stub, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) { TEST_F(RoundRobinTest, HealthChecking) {
EnableDefaultHealthCheckService(true); EnableDefaultHealthCheckService(true);
// Start servers. // Start servers.
const int kNumServers = 3; const int kNumServers = 3;
@ -1553,8 +1716,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) {
EnableDefaultHealthCheckService(false); EnableDefaultHealthCheckService(false);
} }
TEST_F(ClientLbEnd2endTest, TEST_F(RoundRobinTest, HealthCheckingHandlesSubchannelFailure) {
RoundRobinWithHealthCheckingHandlesSubchannelFailure) {
EnableDefaultHealthCheckService(true); EnableDefaultHealthCheckService(true);
// Start servers. // Start servers.
const int kNumServers = 3; const int kNumServers = 3;
@ -1581,7 +1743,7 @@ TEST_F(ClientLbEnd2endTest,
} }
} }
TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
EnableDefaultHealthCheckService(true); EnableDefaultHealthCheckService(true);
// Start server. // Start server.
const int kNumServers = 1; const int kNumServers = 1;
@ -1618,7 +1780,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
EnableDefaultHealthCheckService(false); EnableDefaultHealthCheckService(false);
} }
TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
EnableDefaultHealthCheckService(true); EnableDefaultHealthCheckService(true);
// Start server. // Start server.
const int kNumServers = 1; const int kNumServers = 1;
@ -1661,8 +1823,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
EnableDefaultHealthCheckService(false); EnableDefaultHealthCheckService(false);
} }
TEST_F(ClientLbEnd2endTest, TEST_F(RoundRobinTest,
RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) { HealthCheckingServiceNameChangesAfterSubchannelsCreated) {
EnableDefaultHealthCheckService(true); EnableDefaultHealthCheckService(true);
// Start server. // Start server.
const int kNumServers = 1; const int kNumServers = 1;
@ -1689,34 +1851,9 @@ TEST_F(ClientLbEnd2endTest,
EnableDefaultHealthCheckService(false); EnableDefaultHealthCheckService(false);
} }
TEST_F(ClientLbEnd2endTest, ChannelIdleness) { //
// Start server. // LB policy pick args
const int kNumServers = 1; //
StartServers(kNumServers);
// Set max idle time and build the channel.
ChannelArguments args;
args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("", response_generator, args);
auto stub = BuildStub(channel);
// The initial channel state should be IDLE.
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// After sending RPC, channel state should be READY.
gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***");
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// After a period time not using the channel, the channel state should switch
// to IDLE.
gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***");
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// Sending a new RPC should awake the IDLE channel.
gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***");
response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
}
class ClientLbPickArgsTest : public ClientLbEnd2endTest { class ClientLbPickArgsTest : public ClientLbEnd2endTest {
protected: protected:
@ -1795,6 +1932,10 @@ TEST_F(ClientLbPickArgsTest, Basic) {
<< ArgsSeenListString(pick_args_seen_list); << ArgsSeenListString(pick_args_seen_list);
} }
//
// tests that LB policies can get the call's trailing metadata
//
xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport( xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport(
const grpc_core::LoadBalancingPolicy::BackendMetricAccessor:: const grpc_core::LoadBalancingPolicy::BackendMetricAccessor::
BackendMetricData& backend_metric_data) { BackendMetricData& backend_metric_data) {
@ -2053,6 +2194,10 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
EXPECT_EQ(kNumRpcs, trailers_intercepted()); EXPECT_EQ(kNumRpcs, trailers_intercepted());
} }
//
// tests that address attributes from the resolver are visible to the LB policy
//
class ClientLbAddressTest : public ClientLbEnd2endTest { class ClientLbAddressTest : public ClientLbEnd2endTest {
protected: protected:
static const char* kAttributeKey; static const char* kAttributeKey;
@ -2131,6 +2276,10 @@ TEST_F(ClientLbAddressTest, Basic) {
EXPECT_EQ(addresses_seen(), expected); EXPECT_EQ(addresses_seen(), expected);
} }
//
// tests OOB backend metric API
//
class OobBackendMetricTest : public ClientLbEnd2endTest { class OobBackendMetricTest : public ClientLbEnd2endTest {
protected: protected:
using BackendMetricReport = using BackendMetricReport =
@ -2239,7 +2388,9 @@ TEST_F(OobBackendMetricTest, Basic) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
grpc::testing::ConnectionAttemptInjector::Init(); grpc::testing::ConnectionAttemptInjector::Init();
const auto result = RUN_ALL_TESTS(); const auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result; return result;
} }

@ -69,6 +69,9 @@ ConnectionAttemptInjector::~ConnectionAttemptInjector() {
} }
void ConnectionAttemptInjector::Start() { void ConnectionAttemptInjector::Start() {
// Fail if ConnectionAttemptInjector::Init() was not called after
// grpc_init() to inject the vtable.
GPR_ASSERT(grpc_tcp_client_impl == &kDelayedConnectVTable);
grpc_core::MutexLock lock(g_mu); grpc_core::MutexLock lock(g_mu);
GPR_ASSERT(g_injector == nullptr); GPR_ASSERT(g_injector == nullptr);
g_injector = this; g_injector = this;

@ -40,7 +40,8 @@ namespace testing {
class ConnectionAttemptInjector { class ConnectionAttemptInjector {
public: public:
// Global initializer. Replaces the iomgr TCP client vtable. // Global initializer. Replaces the iomgr TCP client vtable.
// Must be called exactly once before any TCP connections are established. // Must be called exactly once after grpc_init() but before any TCP
// connections are established.
static void Init(); static void Init();
virtual ~ConnectionAttemptInjector(); virtual ~ConnectionAttemptInjector();

Loading…
Cancel
Save