[pick_first] implement Happy Eyeballs (#34426)

pull/33567/head
Mark D. Roth 2 years ago committed by GitHub
parent 153824a21c
commit 835775e347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      bazel/experiments.bzl
  2. 4
      include/grpc/impl/channel_arg_names.h
  3. 6
      src/core/BUILD
  4. 253
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  5. 5
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  6. 15
      src/core/lib/experiments/experiments.cc
  7. 11
      src/core/lib/experiments/experiments.h
  8. 6
      src/core/lib/experiments/experiments.yaml
  9. 2
      src/core/lib/experiments/rollouts.yaml
  10. 4
      src/core/lib/security/transport/security_handshaker.cc
  11. 24
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  12. 4
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  13. 179
      test/core/client_channel/lb_policy/pick_first_test.cc
  14. 24
      test/cpp/end2end/channelz_service_test.cc
  15. 77
      test/cpp/end2end/client_lb_end2end_test.cc
  16. 10
      test/cpp/end2end/tls_key_export_test.cc

@ -64,6 +64,7 @@ EXPERIMENTS = {
},
"on": {
"cpp_lb_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -71,6 +72,7 @@ EXPERIMENTS = {
"lazier_stream_updates",
],
"lb_unit_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -78,6 +80,7 @@ EXPERIMENTS = {
"registered_method_lookup_in_transport",
],
"xds_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -130,6 +133,7 @@ EXPERIMENTS = {
},
"on": {
"cpp_lb_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -137,6 +141,7 @@ EXPERIMENTS = {
"lazier_stream_updates",
],
"lb_unit_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -144,6 +149,7 @@ EXPERIMENTS = {
"registered_method_lookup_in_transport",
],
"xds_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -206,6 +212,7 @@ EXPERIMENTS = {
},
"on": {
"cpp_lb_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -213,6 +220,7 @@ EXPERIMENTS = {
"lazier_stream_updates",
],
"lb_unit_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],
@ -220,6 +228,7 @@ EXPERIMENTS = {
"registered_method_lookup_in_transport",
],
"xds_end2end_test": [
"pick_first_happy_eyeballs",
"round_robin_delegate_to_pick_first",
"wrr_delegate_to_pick_first",
],

@ -370,6 +370,10 @@
/** Configure the Differentiated Services Code Point used on outgoing packets.
* Integer value ranging from 0 to 63. */
#define GRPC_ARG_DSCP "grpc.dscp"
/** Connection Attempt Delay for use in Happy Eyeballs, in milliseconds.
* Defaults to 250ms. */
#define GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS \
"grpc.happy_eyeballs_connection_attempt_delay_ms"
/** \} */
#endif /* GRPC_IMPL_CHANNEL_ARG_NAMES_H */

@ -4846,6 +4846,7 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"experiments",
"health_check_client",
"iomgr_fwd",
"json",
@ -4854,14 +4855,19 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"subchannel_interface",
"time",
"useful",
"//:channel_arg_names",
"//:config",
"//:debug_location",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
"//:server_address",
"//:work_serializer",
],
)

@ -24,6 +24,7 @@
#include <algorithm>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -35,6 +36,8 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
@ -42,10 +45,15 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
@ -110,6 +118,9 @@ class PickFirst : public LoadBalancingPolicy {
absl::optional<grpc_connectivity_state> connectivity_state() const {
return connectivity_state_;
}
const absl::Status& connectivity_status() const {
return connectivity_status_;
}
// Returns the index into the subchannel list of this object.
size_t Index() const {
@ -122,6 +133,13 @@ class PickFirst : public LoadBalancingPolicy {
if (subchannel_ != nullptr) subchannel_->ResetBackoff();
}
void RequestConnection() { subchannel_->RequestConnection(); }
// Requests a connection attempt to start on this subchannel,
// with appropriate Connection Attempt Delay.
// Used only during the Happy Eyeballs pass.
void RequestConnectionWithTimer();
// Cancels any pending connectivity watch and unrefs the subchannel.
void ShutdownLocked();
@ -165,6 +183,7 @@ class PickFirst : public LoadBalancingPolicy {
void ProcessUnselectedReadyLocked();
// Reacts to the current connectivity state while trying to connect.
// TODO(roth): Remove this when we remove the Happy Eyeballs experiment.
void ReactToConnectivityStateLocked();
// Backpointer to owning subchannel list. Not owned.
@ -197,6 +216,14 @@ class PickFirst : public LoadBalancingPolicy {
// connectivity state notifications.
bool AllSubchannelsSeenInitialState();
// Looks through subchannels_ starting from attempting_index_ to
// find the first one not currently in TRANSIENT_FAILURE, then
// triggers a connection attempt for that subchannel. If there are
// no more subchannels not in TRANSIENT_FAILURE (i.e., the Happy
// Eyeballs pass is complete), transitions to a mode where we
// try to connect to all subchannels in parallel.
void StartConnectingNextSubchannel();
// Backpointer to owning policy.
RefCountedPtr<PickFirst> policy_;
@ -210,8 +237,20 @@ class PickFirst : public LoadBalancingPolicy {
// finished processing.
bool shutting_down_ = false;
// TODO(roth): Remove this when we remove the Happy Eyeballs experiment.
bool in_transient_failure_ = false;
// The index into subchannels_ to which we are currently attempting
// to connect during the initial Happy Eyeballs pass. Once the
// initial pass is over, this will be equal to size().
size_t attempting_index_ = 0;
// Happy Eyeballs timer handle.
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_;
// After the initial Happy Eyeballs pass, the number of failures
// we've seen. Every size() failures, we trigger re-resolution.
size_t num_failures_ = 0;
};
class HealthWatcher
@ -261,6 +300,8 @@ class PickFirst : public LoadBalancingPolicy {
const bool enable_health_watch_;
// Whether we should omit our status message prefix.
const bool omit_status_message_prefix_;
// Connection Attempt Delay for Happy Eyeballs.
const Duration connection_attempt_delay_;
// Lateset update args.
UpdateArgs latest_update_args_;
@ -291,7 +332,12 @@ PickFirst::PickFirst(Args args)
omit_status_message_prefix_(
channel_args()
.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
.value_or(false)) {
.value_or(false)),
connection_attempt_delay_(Duration::Milliseconds(
Clamp(channel_args()
.GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
.value_or(250),
100, 2000))) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
@ -562,7 +608,10 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure_) {
if (IsPickFirstHappyEyeballsEnabled()
? (p->subchannel_list_->attempting_index_ ==
p->subchannel_list_->size())
: p->subchannel_list_->in_transient_failure_) {
absl::Status status = absl::UnavailableError(absl::StrCat(
"selected subchannel failed; switching to pending update; "
"last failure: ",
@ -595,7 +644,9 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// select in place of the current one.
// If the subchannel is READY, use it.
if (new_state == GRPC_CHANNEL_READY) {
subchannel_list_->in_transient_failure_ = false;
if (!IsPickFirstHappyEyeballsEnabled()) {
subchannel_list_->in_transient_failure_ = false;
}
ProcessUnselectedReadyLocked();
return;
}
@ -607,14 +658,81 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// see its initial notification. Start trying to connect, starting
// with the first subchannel.
if (!old_state.has_value()) {
subchannel_list_->subchannels_.front().ReactToConnectivityStateLocked();
if (!IsPickFirstHappyEyeballsEnabled()) {
subchannel_list_->subchannels_.front().ReactToConnectivityStateLocked();
return;
}
subchannel_list_->StartConnectingNextSubchannel();
return;
}
if (!IsPickFirstHappyEyeballsEnabled()) {
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list_->attempting_index_) return;
// React to the connectivity state.
ReactToConnectivityStateLocked();
return;
}
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list_->attempting_index_) return;
// React to the connectivity state.
ReactToConnectivityStateLocked();
// Otherwise, process connectivity state change.
switch (*connectivity_state_) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
// If a connection attempt fails before the timer fires, then
// cancel the timer and start connecting on the next subchannel.
if (Index() == subchannel_list_->attempting_index_) {
if (subchannel_list_->timer_handle_.has_value()) {
p->channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_list_->timer_handle_);
}
++subchannel_list_->attempting_index_;
subchannel_list_->StartConnectingNextSubchannel();
} else if (subchannel_list_->attempting_index_ ==
subchannel_list_->size()) {
// We're done with the initial Happy Eyeballs pass and in a mode
// where we're attempting to connect to every subchannel in
// parallel. We count the number of failed connection attempts,
// and when that is equal to the number of subchannels, request
// re-resolution and report TRANSIENT_FAILURE again, so that the
// caller has the most recent status message. Note that this
// isn't necessarily the same as saying that we've seen one
// failure for each subchannel in the list, because the backoff
// state may be different in each subchannel, so we may have seen
// one subchannel fail more than once and another subchannel not
// fail at all. But it's a good enough heuristic.
++subchannel_list_->num_failures_;
if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
p->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(absl::StrCat(
(p->omit_status_message_prefix_
? ""
: "failed to connect to all addresses; last error: "),
connectivity_status_.ToString()));
p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
}
break;
}
case GRPC_CHANNEL_IDLE:
// If we've finished the first Happy Eyeballs pass, then we go
// into a mode where we immediately try to connect to every
// subchannel in parallel.
if (subchannel_list_->attempting_index_ == subchannel_list_->size()) {
subchannel_->RequestConnection();
}
break;
case GRPC_CHANNEL_CONNECTING:
// Only update connectivity state in case 1, and only if we're not
// already in TRANSIENT_FAILURE.
if (subchannel_list_ == p->subchannel_list_.get() &&
p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
}
break;
default:
// We handled READY above, and we should never see SHUTDOWN.
GPR_UNREACHABLE_CODE(break);
}
}
void PickFirst::SubchannelList::SubchannelData::
@ -710,8 +828,60 @@ void PickFirst::SubchannelList::SubchannelData::
}
}
void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
GPR_ASSERT(connectivity_state_.has_value());
if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
subchannel_->RequestConnection();
} else {
GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
}
// If this is not the last subchannel in the list, start the timer.
if (Index() != subchannel_list_->size() - 1) {
PickFirst* p = subchannel_list_->policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p: starting Connection "
"Attempt Delay timer for %" PRId64 "ms for index %" PRIuPTR,
p, subchannel_list_, p->connection_attempt_delay_.millis(),
Index());
}
subchannel_list_->timer_handle_ =
p->channel_control_helper()->GetEventEngine()->RunAfter(
p->connection_attempt_delay_,
[subchannel_list =
subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
auto* sl = subchannel_list.get();
sl->policy_->work_serializer()->Run(
[subchannel_list = std::move(subchannel_list)]() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p: Connection "
"Attempt Delay timer fired (shutting_down=%d, "
"selected=%p)",
subchannel_list->policy_.get(),
subchannel_list.get(),
subchannel_list->shutting_down_,
subchannel_list->policy_->selected_);
}
if (subchannel_list->shutting_down_) return;
if (subchannel_list->policy_->selected_ != nullptr) return;
++subchannel_list->attempting_index_;
subchannel_list->StartConnectingNextSubchannel();
},
DEBUG_LOCATION);
});
}
}
void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = subchannel_list_->policy_.get();
// Cancel Happy Eyeballs timer, if any.
if (subchannel_list_->timer_handle_.has_value()) {
p->channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_list_->timer_handle_);
}
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
@ -829,6 +999,9 @@ void PickFirst::SubchannelList::Orphan() {
for (auto& sd : subchannels_) {
sd.ShutdownLocked();
}
if (timer_handle_.has_value()) {
policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
}
Unref();
}
@ -845,6 +1018,68 @@ bool PickFirst::SubchannelList::AllSubchannelsSeenInitialState() {
return true;
}
void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
// Find the next subchannel not in state TRANSIENT_FAILURE.
// We skip subchannels in state TRANSIENT_FAILURE to avoid a
// large recursion that could overflow the stack.
for (; attempting_index_ < size(); ++attempting_index_) {
SubchannelData* sc = &subchannels_[attempting_index_];
GPR_ASSERT(sc->connectivity_state().has_value());
if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Found a subchannel not in TRANSIENT_FAILURE, so trigger a
// connection attempt.
sc->RequestConnectionWithTimer();
return;
}
}
// We didn't find another subchannel not in state TRANSIENT_FAILURE,
// so report TRANSIENT_FAILURE and switch to a mode in which we try to
// connect to all addresses in parallel.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p failed to connect to "
"all subchannels",
policy_.get(), this);
}
// In case 2, swap to the new subchannel list. This means reporting
// TRANSIENT_FAILURE and dropping the existing (working) connection,
// but we can't ignore what the control plane has told us.
if (policy_->latest_pending_subchannel_list_.get() == this) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
policy_.get(), policy_->latest_pending_subchannel_list_.get(),
this);
}
policy_->UnsetSelectedSubchannel();
policy_->subchannel_list_ =
std::move(policy_->latest_pending_subchannel_list_);
}
// If this is the current subchannel list (either because we were
// in case 1 or because we were in case 2 and just promoted it to
// be the current list), re-resolve and report new state.
if (policy_->subchannel_list_.get() == this) {
policy_->channel_control_helper()->RequestReresolution();
absl::Status status = absl::UnavailableError(
absl::StrCat((policy_->omit_status_message_prefix_
? ""
: "failed to connect to all addresses; last error: "),
subchannels_.back().connectivity_status().ToString()));
policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
}
// We now transition into a mode where we try to connect to all
// subchannels in parallel. For any subchannel currently in IDLE,
// trigger a connection attempt. For any subchannel not currently in
// IDLE, we will trigger a connection attempt when it does report IDLE.
for (SubchannelData& sd : subchannels_) {
if (sd.connectivity_state() == GRPC_CHANNEL_IDLE) {
sd.RequestConnection();
}
}
}
//
// factory
//

@ -456,7 +456,6 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
MutexLock connection_lock(&self->connection_->mu_);
if (!error.ok() || self->connection_->shutdown_) {
std::string error_str = StatusToString(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
cleanup_connection = true;
if (error.ok() && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
@ -825,16 +824,12 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
absl::StatusOr<ChannelArgs> args_result =
connection_manager->UpdateChannelArgsForConnection(args, tcp);
if (!args_result.ok()) {
gpr_log(GPR_DEBUG, "Closing connection: %s",
args_result.status().ToString().c_str());
endpoint_cleanup(GRPC_ERROR_CREATE(args_result.status().ToString()));
return;
}
grpc_error_handle error;
args = self->args_modifier_(*args_result, &error);
if (!error.ok()) {
gpr_log(GPR_DEBUG, "Closing connection: %s",
StatusToString(error).c_str());
endpoint_cleanup(error);
return;
}

@ -111,6 +111,9 @@ const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
const char* const additional_constraints_wrr_delegate_to_pick_first = "{}";
const char* const description_pick_first_happy_eyeballs =
"Use Happy Eyeballs in pick_first.";
const char* const additional_constraints_pick_first_happy_eyeballs = "{}";
const char* const description_combiner_offload_to_event_engine =
"Offload Combiner work onto the EventEngine instead of the Executor.";
const char* const additional_constraints_combiner_offload_to_event_engine =
@ -175,6 +178,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_round_robin_delegate_to_pick_first, true, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, true, true},
{"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs,
additional_constraints_pick_first_happy_eyeballs, true, true},
{"combiner_offload_to_event_engine",
description_combiner_offload_to_event_engine,
additional_constraints_combiner_offload_to_event_engine, true, true},
@ -276,6 +281,9 @@ const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
const char* const additional_constraints_wrr_delegate_to_pick_first = "{}";
const char* const description_pick_first_happy_eyeballs =
"Use Happy Eyeballs in pick_first.";
const char* const additional_constraints_pick_first_happy_eyeballs = "{}";
const char* const description_combiner_offload_to_event_engine =
"Offload Combiner work onto the EventEngine instead of the Executor.";
const char* const additional_constraints_combiner_offload_to_event_engine =
@ -340,6 +348,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_round_robin_delegate_to_pick_first, true, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, true, true},
{"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs,
additional_constraints_pick_first_happy_eyeballs, true, true},
{"combiner_offload_to_event_engine",
description_combiner_offload_to_event_engine,
additional_constraints_combiner_offload_to_event_engine, true, true},
@ -441,6 +451,9 @@ const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
const char* const additional_constraints_wrr_delegate_to_pick_first = "{}";
const char* const description_pick_first_happy_eyeballs =
"Use Happy Eyeballs in pick_first.";
const char* const additional_constraints_pick_first_happy_eyeballs = "{}";
const char* const description_combiner_offload_to_event_engine =
"Offload Combiner work onto the EventEngine instead of the Executor.";
const char* const additional_constraints_combiner_offload_to_event_engine =
@ -505,6 +518,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_round_robin_delegate_to_pick_first, true, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, true, true},
{"pick_first_happy_eyeballs", description_pick_first_happy_eyeballs,
additional_constraints_pick_first_happy_eyeballs, true, true},
{"combiner_offload_to_event_engine",
description_combiner_offload_to_event_engine,
additional_constraints_combiner_offload_to_event_engine, true, true},

@ -86,6 +86,8 @@ inline bool IsJitterMaxIdleEnabled() { return true; }
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
@ -121,6 +123,8 @@ inline bool IsJitterMaxIdleEnabled() { return true; }
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
@ -156,6 +160,8 @@ inline bool IsJitterMaxIdleEnabled() { return true; }
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
@ -188,6 +194,7 @@ enum ExperimentIds {
kExperimentIdJitterMaxIdle,
kExperimentIdRoundRobinDelegateToPickFirst,
kExperimentIdWrrDelegateToPickFirst,
kExperimentIdPickFirstHappyEyeballs,
kExperimentIdCombinerOffloadToEventEngine,
kExperimentIdRegisteredMethodLookupInTransport,
kNumExperiments
@ -288,6 +295,10 @@ inline bool IsRoundRobinDelegateToPickFirstEnabled() {
inline bool IsWrrDelegateToPickFirstEnabled() {
return IsExperimentEnabled(kExperimentIdWrrDelegateToPickFirst);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_HAPPY_EYEBALLS
inline bool IsPickFirstHappyEyeballsEnabled() {
return IsExperimentEnabled(kExperimentIdPickFirstHappyEyeballs);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() {
return IsExperimentEnabled(kExperimentIdCombinerOffloadToEventEngine);

@ -191,6 +191,12 @@
expiry: 2023/11/15
owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]
- name: pick_first_happy_eyeballs
description:
Use Happy Eyeballs in pick_first.
expiry: 2023/12/15
owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]
- name: combiner_offload_to_event_engine
description:
Offload Combiner work onto the EventEngine instead of the Executor.

@ -96,6 +96,8 @@
default: true
- name: wrr_delegate_to_pick_first
default: true
- name: pick_first_happy_eyeballs
default: true
- name: combiner_offload_to_event_engine
default: true
- name: registered_method_lookup_in_transport

@ -49,7 +49,6 @@
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/closure.h"
@ -205,8 +204,6 @@ void SecurityHandshaker::HandshakeFailedLocked(grpc_error_handle error) {
// endpoint callback was invoked, we need to generate our own error.
error = GRPC_ERROR_CREATE("Handshaker shutdown");
}
gpr_log(GPR_DEBUG, "Security handshake failed: %s",
StatusToString(error).c_str());
if (!is_shutdown_) {
tsi_handshaker_shutdown(handshaker_);
// TODO(ctiller): It is currently necessary to shutdown endpoints
@ -379,6 +376,7 @@ grpc_error_handle SecurityHandshaker::OnHandshakeNextDoneLocked(
grpc_error_handle error;
// Handshaker was shutdown.
if (is_shutdown_) {
tsi_handshaker_result_destroy(handshaker_result);
return GRPC_ERROR_CREATE("Handshaker shutdown");
}
// Read more if we need to.

@ -882,6 +882,21 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return final_picker;
}
void ExpectTransientFailureUpdate(
absl::Status expected_status,
SourceLocation location = SourceLocation()) {
auto picker =
ExpectState(GRPC_CHANNEL_TRANSIENT_FAILURE, expected_status, location);
ASSERT_NE(picker, nullptr);
ExpectPickFail(
picker.get(),
[&](const absl::Status& status) {
EXPECT_EQ(status, expected_status)
<< location.file() << ":" << location.line();
},
location);
}
// Waits for the LB policy to fail a connection attempt. There can be
// any number of CONNECTING updates, each of which must return a picker
// that queues picks, followed by one update for state TRANSIENT_FAILURE,
@ -1143,6 +1158,15 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return picker;
}
// Expects zero or more CONNECTING updates.
void DrainConnectingUpdates(SourceLocation location = SourceLocation()) {
gpr_log(GPR_INFO, "Draining CONNECTING updates...");
while (!helper_->QueueEmpty()) {
ExpectConnectingUpdate(location);
}
gpr_log(GPR_INFO, "Done draining CONNECTING updates");
}
// Requests a picker on picker and expects a Fail result.
// The failing status is passed to check_status.
void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker,

@ -229,6 +229,10 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
}
TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
// Can't use timer duration expectation here, because the Happy
// Eyeballs timer inside pick_first will use a different duration than
// the timer in outlier_detection.
SetExpectedTimerDuration(absl::nullopt);
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
// Send initial update.

@ -18,6 +18,7 @@
#include <algorithm>
#include <array>
#include <chrono>
#include <map>
#include <memory>
#include <utility>
@ -34,8 +35,10 @@
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
@ -51,6 +54,11 @@ class PickFirstTest : public LoadBalancingPolicyTest {
protected:
PickFirstTest() : LoadBalancingPolicyTest("pick_first") {}
void SetUp() override {
LoadBalancingPolicyTest::SetUp();
SetExpectedTimerDuration(std::chrono::milliseconds(250));
}
static RefCountedPtr<LoadBalancingPolicy::Config> MakePickFirstConfig(
absl::optional<bool> shuffle_address_list = absl::nullopt) {
return MakeConfig(Json::FromArray({Json::FromObject(
@ -291,9 +299,8 @@ TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The connection attempt succeeds.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
// matter how many) and then report READY.
auto picker = WaitForConnected();
// The LB policy will report READY.
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
@ -354,6 +361,172 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
}
}
TEST_F(PickFirstTest, HappyEyeballs) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing three addresses.
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
auto* subchannel3 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel3, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// The second subchannel should not be connecting.
EXPECT_FALSE(subchannel2->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second subchannel.
EXPECT_TRUE(subchannel2->ConnectionRequested());
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The second subchannel fails before the timer fires.
subchannel2->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// This causes the LB policy to start connecting to the third subchannel.
EXPECT_TRUE(subchannel3->ConnectionRequested());
subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Incrementing the time here has no effect, because the LB policy
// does not use a timer for the last subchannel in the list.
// So if there are any queued updates at this point, they will be
// CONNECTING state.
IncrementTimeBy(Duration::Milliseconds(250));
DrainConnectingUpdates();
// The first subchannel becomes connected.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
// matter how many) and then report READY.
auto picker = WaitForConnected();
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
}
}
TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing three addresses.
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
auto* subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
auto* subchannel3 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel3, nullptr);
// When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// The second subchannel should not be connecting.
EXPECT_FALSE(subchannel2->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second subchannel.
EXPECT_TRUE(subchannel2->ConnectionRequested());
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The second subchannel fails before the timer fires.
subchannel2->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// This causes the LB policy to start connecting to the third subchannel.
EXPECT_TRUE(subchannel3->ConnectionRequested());
subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Incrementing the time here has no effect, because the LB policy
// does not use a timer for the last subchannel in the list.
// So if there are any queued updates at this point, they will be
// CONNECTING state.
IncrementTimeBy(Duration::Milliseconds(250));
DrainConnectingUpdates();
// Set subchannel 2 back to IDLE, so it's already in that state when
// Happy Eyeballs fails.
subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
// Third subchannel fails to connect.
subchannel3->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
// The LB policy should report TRANSIENT_FAILURE.
WaitForConnectionFailed([&](const absl::Status& status) {
EXPECT_EQ(status, absl::UnavailableError(
"failed to connect to all addresses; "
"last error: UNAVAILABLE: failed to connect"));
});
// We are now done with the Happy Eyeballs pass, and we move into a
// mode where we try to connect to all subchannels in parallel.
// Subchannel 2 was already in state IDLE, so the LB policy will
// immediately trigger a connection request on it. It will not do so
// for subchannels 1 (in CONNECTING) or 3 (in TRANSIENT_FAILURE).
EXPECT_FALSE(subchannel->ConnectionRequested());
EXPECT_TRUE(subchannel2->ConnectionRequested());
EXPECT_FALSE(subchannel3->ConnectionRequested());
// Subchannel 2 reports CONNECTING.
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Now subchannel 1 reports TF. This is the first failure since we
// finished Happy Eyeballs.
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
EXPECT_FALSE(subchannel->ConnectionRequested());
// Now subchannel 3 reports IDLE. This should trigger another
// connection attempt.
subchannel3->SetConnectivityState(GRPC_CHANNEL_IDLE);
EXPECT_TRUE(subchannel3->ConnectionRequested());
subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Subchannel 2 reports TF. This is the second failure since we
// finished Happy Eyeballs.
subchannel2->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
EXPECT_FALSE(subchannel2->ConnectionRequested());
// Finally, subchannel 3 reports TF. This is the third failure since
// we finished Happy Eyeballs, so the LB policy will request
// re-resolution and report TF again.
subchannel3->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
EXPECT_FALSE(subchannel3->ConnectionRequested());
ExpectReresolutionRequest();
ExpectTransientFailureUpdate(
absl::UnavailableError("failed to connect to all addresses; "
"last error: UNAVAILABLE: failed to connect"));
// Now the second subchannel goes IDLE.
subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
// The LB policy asks it to connect.
EXPECT_TRUE(subchannel2->ConnectionRequested());
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// This time, the connection attempt succeeds.
subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report READY.
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
}
}
TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
// Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = {

@ -43,6 +43,7 @@
#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/test_credentials_provider.h"
@ -190,13 +191,21 @@ class ChannelzServerTest : public ::testing::TestWithParam<CredentialsType> {
#endif
}
void SetUp() override {
grpc_init();
bool localhost_resolves_to_ipv4 = false;
bool localhost_resolves_to_ipv6 = false;
grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
&localhost_resolves_to_ipv6);
ipv6_only_ = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
// ensure channel server is brought up on all severs we build.
grpc::channelz::experimental::InitChannelzService();
// We set up a proxy server with channelz enabled.
proxy_port_ = grpc_pick_unused_port_or_die();
ServerBuilder proxy_builder;
std::string proxy_server_address = "localhost:" + to_string(proxy_port_);
std::string proxy_server_address =
absl::StrCat(LocalIp(), ":", proxy_port_);
proxy_builder.AddListeningPort(proxy_server_address,
GetServerCredentials(GetParam()));
// forces channelz and channel tracing to be enabled.
@ -212,6 +221,11 @@ class ChannelzServerTest : public ::testing::TestWithParam<CredentialsType> {
backend.server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
}
proxy_server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
grpc_shutdown();
}
absl::string_view LocalIp() const {
return ipv6_only_ ? "127.0.0.1" : "[::1]";
}
// Sets the proxy up to have an arbitrary number of backends.
@ -222,7 +236,7 @@ class ChannelzServerTest : public ::testing::TestWithParam<CredentialsType> {
backends_[i].port = grpc_pick_unused_port_or_die();
ServerBuilder backend_builder;
std::string backend_server_address =
"localhost:" + to_string(backends_[i].port);
absl::StrCat(LocalIp(), ":", backends_[i].port);
backend_builder.AddListeningPort(backend_server_address,
GetServerCredentials(GetParam()));
backends_[i].service = std::make_unique<TestServiceImpl>();
@ -244,7 +258,7 @@ class ChannelzServerTest : public ::testing::TestWithParam<CredentialsType> {
}
void ResetStubs() {
string target = "dns:localhost:" + to_string(proxy_port_);
string target = absl::StrCat("dns:", LocalIp(), ":", proxy_port_);
ChannelArguments args;
// disable channelz. We only want to focus on proxy to backend outbound.
args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
@ -255,7 +269,7 @@ class ChannelzServerTest : public ::testing::TestWithParam<CredentialsType> {
}
std::unique_ptr<grpc::testing::EchoTestService::Stub> NewEchoStub() {
string target = "dns:localhost:" + to_string(proxy_port_);
string target = absl::StrCat("dns:", LocalIp(), ":", proxy_port_);
ChannelArguments args;
// disable channelz. We only want to focus on proxy to backend outbound.
args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
@ -332,6 +346,8 @@ class ChannelzServerTest : public ::testing::TestWithParam<CredentialsType> {
std::unique_ptr<TestServiceImpl> service;
};
bool ipv6_only_;
std::unique_ptr<grpc::channelz::v1::Channelz::Stub> channelz_stub_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> echo_stub_;

@ -977,79 +977,6 @@ TEST_F(ClientLbEnd2endTest,
EXPECT_LT(waited.millis(), 1000 * grpc_test_slowdown_factor());
}
TEST_F(
PickFirstTest,
TriesAllSubchannelsBeforeReportingTransientFailureWithSubchannelSharing) {
// Start connection injector.
ConnectionAttemptInjector injector;
// Get 5 unused ports. Each channel will have 2 unique ports followed
// by a common port.
std::vector<int> ports1 = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
std::vector<int> ports2 = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(), ports1[2]};
// Create channel 1.
auto response_generator1 = BuildResolverResponseGenerator();
auto channel1 = BuildChannel("pick_first", response_generator1);
auto stub1 = BuildStub(channel1);
response_generator1.SetNextResolution(ports1);
// Allow the connection attempts for ports 0 and 1 to fail normally.
// Inject a hold for the connection attempt to port 2.
auto hold_channel1_port2 = injector.AddHold(ports1[2]);
// Trigger connection attempt.
gpr_log(GPR_INFO, "=== START CONNECTING CHANNEL 1 ===");
channel1->GetState(/*try_to_connect=*/true);
// Wait for connection attempt to port 2.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 1 PORT 2 TO START ===");
hold_channel1_port2->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 1 PORT 2 STARTED ===");
// Now create channel 2.
auto response_generator2 = BuildResolverResponseGenerator();
auto channel2 = BuildChannel("pick_first", response_generator2);
response_generator2.SetNextResolution(ports2);
// Inject a hold for port 0.
auto hold_channel2_port0 = injector.AddHold(ports2[0]);
// Trigger connection attempt.
gpr_log(GPR_INFO, "=== START CONNECTING CHANNEL 2 ===");
channel2->GetState(/*try_to_connect=*/true);
// Wait for connection attempt to port 0.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 2 PORT 0 TO START ===");
hold_channel2_port0->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 2 PORT 0 STARTED ===");
// Inject a hold for port 0, which will be retried by channel 1.
auto hold_channel1_port0 = injector.AddHold(ports1[0]);
// Now allow the connection attempt to port 2 to complete. The subchannel
// will deliver a TRANSIENT_FAILURE notification to both channels.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 2 ===");
hold_channel1_port2->Resume();
// Wait for channel 1 to retry port 0, so that we know it's seen the
// connectivity state notification for port 2.
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL 1 PORT 0 ===");
hold_channel1_port0->Wait();
gpr_log(GPR_INFO, "=== CHANNEL 1 PORT 0 STARTED ===");
// Channel 1 should now report TRANSIENT_FAILURE.
// Channel 2 should continue to report CONNECTING.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel1->GetState(false));
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel2->GetState(false));
// Allow channel 2 to resume port 0. Port 0 will fail, as will port 1.
// When it gets to port 2, it will see it already in state
// TRANSIENT_FAILURE due to being shared with channel 1, so it won't
// trigger another connection attempt.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 2 PORT 0 ===");
hold_channel2_port0->Resume();
// Channel 2 should soon report TRANSIENT_FAILURE.
EXPECT_TRUE(
WaitForChannelState(channel2.get(), [](grpc_connectivity_state state) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) return true;
EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
return false;
}));
// Clean up.
gpr_log(GPR_INFO, "=== RESUMING CHANNEL 1 PORT 0 ===");
hold_channel1_port0->Resume();
}
TEST_F(PickFirstTest, Updates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
@ -1823,7 +1750,7 @@ TEST_F(RoundRobinTest, StaysInTransientFailureInSubsequentConnecting) {
TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
// Start connection injector.
ConnectionAttemptInjector injector;
// Get port.
// Get ports.
const std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
// Create channel.
@ -1842,7 +1769,6 @@ TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
hold1->Wait();
hold2->Wait();
// Inject a custom failure message.
hold1->Wait();
hold1->Fail(GRPC_ERROR_CREATE("Survey says... Bzzzzt!"));
// Wait until RPC fails with the right message.
absl::Time deadline =
@ -1856,6 +1782,7 @@ TEST_F(RoundRobinTest, ReportsLatestStatusInTransientFailure) {
"Survey says... Bzzzzt!"))(status.error_message())) {
break;
}
gpr_log(GPR_INFO, "STATUS MESSAGE: %s", status.error_message().c_str());
EXPECT_THAT(status.error_message(),
::testing::MatchesRegex(MakeConnectionFailureRegex(
"connections to all backends failing")));

@ -33,6 +33,7 @@
#include "src/core/lib/gpr/tmpfile.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/test_config.h"
#include "test/core/util/tls_utils.h"
@ -194,9 +195,16 @@ class TlsKeyLoggingEnd2EndTest : public ::testing::TestWithParam<TestScenario> {
server_thread_ =
std::thread(&TlsKeyLoggingEnd2EndTest::RunServerLoop, this);
bool localhost_resolves_to_ipv4 = false;
bool localhost_resolves_to_ipv6 = false;
grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
&localhost_resolves_to_ipv6);
bool ipv6_only = !localhost_resolves_to_ipv4 && localhost_resolves_to_ipv6;
absl::string_view local_ip = ipv6_only ? "127.0.0.1" : "[::1]";
for (int i = 0; i < GetParam().num_listening_ports(); i++) {
ASSERT_NE(0, ports_[i]);
server_addresses_.push_back(absl::StrCat("localhost:", ports_[i]));
server_addresses_.push_back(absl::StrCat(local_ip, ":", ports_[i]));
// Configure tls credential options for each stub. Each stub connects to
// a separate port on the server.

Loading…
Cancel
Save