Add fallback-at-startup into xds

pull/18665/head
Juanli Shen 6 years ago
parent 5224e88ee5
commit 438cb44378
  1. 4
      include/grpc/impl/codegen/grpc_types.h
  2. 3
      src/core/ext/filters/client_channel/lb_policy.h
  3. 32
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 622
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  5. 68
      test/cpp/end2end/grpclb_end2end_test.cc
  6. 215
      test/cpp/end2end/xds_end2end_test.cc

@ -315,11 +315,11 @@ typedef struct {
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the grpclb load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. Default value is 10000. */
If 0, enter fallback mode immediately. Default value is 10000. */
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the xDS load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. Default value is 10000. */
If 0, enter fallback mode immediately. Default value is 10000. */
#define GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS "grpc.xds_fallback_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \

@ -167,6 +167,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// A proxy object used by the LB policy to communicate with the client
/// channel.
// TODO(juanlishen): Consider adding a mid-layer subclass that helps handle
// things like swapping in pending policy when it's ready. Currently, we are
// duplicating the logic in many subclasses.
class ChannelControlHelper {
public:
ChannelControlHelper() = default;

@ -1129,13 +1129,13 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if (lb_calld == grpclb_policy->lb_calld_.get()) {
// If we did not receive a serverlist and the fallback-at-startup checks
// are pending, go into fallback mode immediately. This short-circuits
// the timeout for the fallback-at-startup case.
if (!lb_calld->seen_serverlist_ &&
grpclb_policy->fallback_at_startup_checks_pending_) {
// If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the fallback-at-startup
// case.
if (grpclb_policy->fallback_at_startup_checks_pending_) {
GPR_ASSERT(!lb_calld->seen_serverlist_);
gpr_log(GPR_INFO,
"[grpclb %p] balancer call finished without receiving "
"[grpclb %p] Balancer call finished without receiving "
"serverlist; entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
@ -1628,20 +1628,16 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
grpc_arg args_to_add[2] = {
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer),
};
size_t num_args_to_add = 1;
InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer));
if (is_backend_from_grpclb_load_balancer) {
args_to_add[num_args_to_add++] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
}
return grpc_channel_args_copy_and_add(args_, args_to_add, num_args_to_add);
return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
args_to_add.size());
}
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(

@ -240,6 +240,10 @@ class XdsLb : public LoadBalancingPolicy {
static void OnCallRetryTimerLocked(void* arg, grpc_error* error);
void StartCallLocked();
void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked();
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
private:
// The owning LB policy.
RefCountedPtr<XdsLb> xdslb_policy_;
@ -247,6 +251,8 @@ class XdsLb : public LoadBalancingPolicy {
// The channel and its status.
grpc_channel* channel_;
bool shutting_down_ = false;
grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure on_connectivity_changed_;
// The data associated with the current LB call. It holds a ref to this LB
// channel. It's instantiated every time we query for backends. It's reset
@ -299,6 +305,28 @@ class XdsLb : public LoadBalancingPolicy {
PickerList pickers_;
};
class FallbackHelper : public ChannelControlHelper {
public:
explicit FallbackHelper(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingFallback() const;
bool CalledByCurrentFallback() const;
RefCountedPtr<XdsLb> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
class LocalityMap {
public:
class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
@ -402,8 +430,13 @@ class XdsLb : public LoadBalancingPolicy {
: lb_chand_.get();
}
// Callback to enter fallback mode.
// Methods for dealing with fallback state.
void MaybeCancelFallbackAtStartupChecks();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void UpdateFallbackPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked(
const char* name, const grpc_channel_args* args);
void MaybeExitFallbackMode();
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
@ -428,17 +461,33 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0;
// Whether the checks for fallback at startup are ALL pending. There are
// several cases where this can be reset:
// 1. The fallback timer fires, we enter fallback mode.
// 2. Before the fallback timer fires, the LB channel becomes
// TRANSIENT_FAILURE or the LB call fails, we enter fallback mode.
// 3. Before the fallback timer fires, we receive a response from the
// balancer, we cancel the fallback timer and use the response to update the
// locality map.
bool fallback_at_startup_checks_pending_ = false;
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
RefCountedPtr<Config> fallback_policy_config_;
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
UniquePtr<ServerAddressList> fallback_backend_addresses_;
ServerAddressList fallback_backend_addresses_;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
// The policy to use for the fallback backends.
RefCountedPtr<Config> fallback_policy_config_;
// Lock held when modifying the value of fallback_policy_ or
// pending_fallback_policy_.
Mutex fallback_policy_mu_;
// Non-null iff we are in fallback mode.
OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
// The policy to use for the backends.
RefCountedPtr<Config> child_policy_config_;
// Map of policies to use in the backend
@ -494,17 +543,90 @@ XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
return pickers_[index].second->Pick(pick, error);
}
//
// XdsLb::FallbackHelper
//
bool XdsLb::FallbackHelper::CalledByPendingFallback() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_fallback_policy_.get();
}
bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->fallback_policy_.get();
}
Subchannel* XdsLb::FallbackHelper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* XdsLb::FallbackHelper::CreateChannel(
const char* target, const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateChannel(target, args);
}
void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) {
if (parent_->shutting_down_) return;
// If this request is from the pending fallback policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingFallback()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(
GPR_INFO,
"[xdslb %p helper %p] pending fallback policy %p reports state=%s",
parent_.get(), this, parent_->pending_fallback_policy_.get(),
grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
parent_->fallback_policy_->interested_parties(),
parent_->interested_parties());
MutexLock lock(&parent_->fallback_policy_mu_);
parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_);
} else if (!CalledByCurrentFallback()) {
// This request is from an outdated fallback policy, so ignore it.
return;
}
parent_->channel_control_helper()->UpdateState(state, std::move(picker));
}
void XdsLb::FallbackHelper::RequestReresolution() {
if (parent_->shutting_down_) return;
const LoadBalancingPolicy* latest_fallback_policy =
parent_->pending_fallback_policy_ != nullptr
? parent_->pending_fallback_policy_.get()
: parent_->fallback_policy_.get();
if (child_ != latest_fallback_policy) return;
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Re-resolution requested from the fallback policy (%p).",
parent_.get(), child_);
}
GPR_ASSERT(parent_->lb_chand_ != nullptr);
parent_->channel_control_helper()->RequestReresolution();
}
//
// serverlist parsing code
//
// Returns the backend addresses extracted from the given addresses.
UniquePtr<ServerAddressList> ExtractBackendAddresses(
const ServerAddressList& addresses) {
auto backend_addresses = MakeUnique<ServerAddressList>();
ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
ServerAddressList backend_addresses;
for (size_t i = 0; i < addresses.size(); ++i) {
if (!addresses[i].IsBalancer()) {
backend_addresses->emplace_back(addresses[i]);
backend_addresses.emplace_back(addresses[i]);
}
}
return backend_addresses;
@ -584,6 +706,9 @@ XdsLb::BalancerChannelState::BalancerChannelState(
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
GRPC_CLOSURE_INIT(&on_connectivity_changed_,
&XdsLb::BalancerChannelState::OnConnectivityChangedLocked,
this, grpc_combiner_scheduler(xdslb_policy_->combiner()));
channel_ = xdslb_policy_->channel_control_helper()->CreateChannel(
balancer_name, args);
GPR_ASSERT(channel_ != nullptr);
@ -652,6 +777,62 @@ void XdsLb::BalancerChannelState::StartCallLocked() {
lb_calld_->StartQuery();
}
void XdsLb::BalancerChannelState::StartConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
xdslb_policy_->interested_parties()),
&connectivity_, &on_connectivity_changed_, nullptr);
}
void XdsLb::BalancerChannelState::CancelConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
xdslb_policy_->interested_parties()),
nullptr, &on_connectivity_changed_, nullptr);
}
void XdsLb::BalancerChannelState::OnConnectivityChangedLocked(
void* arg, grpc_error* error) {
BalancerChannelState* self = static_cast<BalancerChannelState*>(arg);
if (!self->shutting_down_ &&
self->xdslb_policy_->fallback_at_startup_checks_pending_) {
if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
self->xdslb_policy_->interested_parties()),
&self->connectivity_, &self->on_connectivity_changed_, nullptr);
return; // Early out so we don't drop the ref below.
}
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
self);
self->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_);
self->xdslb_policy_->UpdateFallbackPolicyLocked();
}
// Done watching connectivity state, so drop ref.
self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
}
//
// XdsLb::BalancerChannelState::BalancerCallState
//
@ -897,6 +1078,14 @@ void XdsLb::BalancerChannelState::BalancerCallState::
(initial_response = xds_grpclb_initial_response_parse(response_slice)) !=
nullptr) {
// Have NOT seen initial response, look for initial response.
// TODO(juanlishen): When we convert this to use the xds protocol, the
// balancer will send us a fallback timeout such that we should go into
// fallback mode if we have lost contact with the balancer after a certain
// period of time. We will need to save the timeout value here, and then
// when the balancer call ends, we will need to start a timer for the
// specified period of time, and if the timer fires, we go into fallback
// mode. We will also need to cancel the timer when we receive a serverlist
// from the balancer.
if (initial_response->has_client_stats_report_interval) {
const grpc_millis interval = xds_grpclb_duration_to_millis(
&initial_response->client_stats_report_interval);
@ -938,81 +1127,69 @@ void XdsLb::BalancerChannelState::BalancerCallState::
gpr_free(ipport);
}
}
/* update serverlist */
// TODO(juanlishen): Don't ingore empty serverlist.
if (serverlist->num_servers > 0) {
// Pending LB channel receives a serverlist; promote it.
// Note that this call can't be on a discarded pending channel, because
// such channels don't have any current call but we have checked this call
// is a current call.
if (!lb_calld->lb_chand_->IsCurrentChannel()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Promoting pending LB channel %p to replace "
"current LB channel %p",
xdslb_policy, lb_calld->lb_chand_.get(),
lb_calld->xdslb_policy()->lb_chand_.get());
}
lb_calld->xdslb_policy()->lb_chand_ =
std::move(lb_calld->xdslb_policy()->pending_lb_chand_);
}
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
lb_calld->client_stats_ = MakeRefCounted<XdsLbClientStats>();
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
self.release();
lb_calld->ScheduleNextClientLoadReportLocked();
}
if (!xdslb_policy->locality_serverlist_.empty() &&
xds_grpclb_serverlist_equals(
xdslb_policy->locality_serverlist_[0]->serverlist, serverlist)) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Incoming server list identical to current, "
"ignoring.",
xdslb_policy);
}
xds_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
if (!xdslb_policy->locality_serverlist_.empty()) {
/* dispose of the old serverlist */
xds_grpclb_destroy_serverlist(
xdslb_policy->locality_serverlist_[0]->serverlist);
} else {
/* or dispose of the fallback */
xdslb_policy->fallback_backend_addresses_.reset();
if (xdslb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
}
/* Initialize locality serverlist, currently the list only handles
* one child */
xdslb_policy->locality_serverlist_.emplace_back(
MakeUnique<LocalityServerlistEntry>());
xdslb_policy->locality_serverlist_[0]->locality_name =
static_cast<char*>(gpr_strdup(kDefaultLocalityName));
xdslb_policy->locality_serverlist_[0]->locality_weight =
kDefaultLocalityWeight;
}
// and update the copy in the XdsLb instance. This
// serverlist instance will be destroyed either upon the next
// update or when the XdsLb instance is destroyed.
xdslb_policy->locality_serverlist_[0]->serverlist = serverlist;
xdslb_policy->locality_map_.UpdateLocked(
xdslb_policy->locality_serverlist_,
xdslb_policy->child_policy_config_.get(), xdslb_policy->args_,
xdslb_policy);
// Pending LB channel receives a serverlist; promote it.
// Note that this call can't be on a discarded pending channel, because
// such channels don't have any current call but we have checked this call
// is a current call.
if (!lb_calld->lb_chand_->IsCurrentChannel()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Promoting pending LB channel %p to replace "
"current LB channel %p",
xdslb_policy, lb_calld->lb_chand_.get(),
lb_calld->xdslb_policy()->lb_chand_.get());
}
} else {
lb_calld->xdslb_policy()->lb_chand_ =
std::move(lb_calld->xdslb_policy()->pending_lb_chand_);
}
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
lb_calld->client_stats_ = MakeRefCounted<XdsLbClientStats>();
lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
lb_calld->ScheduleNextClientLoadReportLocked();
}
if (!xdslb_policy->locality_serverlist_.empty() &&
xds_grpclb_serverlist_equals(
xdslb_policy->locality_serverlist_[0]->serverlist, serverlist)) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Received empty server list, ignoring.",
gpr_log(GPR_INFO,
"[xdslb %p] Incoming server list identical to current, "
"ignoring.",
xdslb_policy);
}
xds_grpclb_destroy_serverlist(serverlist);
} else { // New serverlist.
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
// TODO(juanlishen): When we add EDS drop, we should change to check
// drop_percentage.
if (serverlist->num_servers == 0) xdslb_policy->MaybeExitFallbackMode();
if (!xdslb_policy->locality_serverlist_.empty()) {
xds_grpclb_destroy_serverlist(
xdslb_policy->locality_serverlist_[0]->serverlist);
} else {
// This is the first serverlist we've received, don't enter fallback
// mode.
xdslb_policy->MaybeCancelFallbackAtStartupChecks();
// Initialize locality serverlist, currently the list only handles
// one child.
xdslb_policy->locality_serverlist_.emplace_back(
MakeUnique<LocalityServerlistEntry>());
xdslb_policy->locality_serverlist_[0]->locality_name =
static_cast<char*>(gpr_strdup(kDefaultLocalityName));
xdslb_policy->locality_serverlist_[0]->locality_weight =
kDefaultLocalityWeight;
}
// Update the serverlist in the XdsLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
// XdsLb instance is destroyed.
xdslb_policy->locality_serverlist_[0]->serverlist = serverlist;
xdslb_policy->locality_map_.UpdateLocked(
xdslb_policy->locality_serverlist_,
xdslb_policy->child_policy_config_.get(), xdslb_policy->args_,
xdslb_policy);
}
} else {
// No valid initial response or serverlist found.
@ -1089,6 +1266,18 @@ void XdsLb::BalancerChannelState::BalancerCallState::
lb_chand->StartCallRetryTimerLocked();
}
xdslb_policy->channel_control_helper()->RequestReresolution();
// If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the
// fallback-at-startup case.
if (xdslb_policy->fallback_at_startup_checks_pending_) {
gpr_log(GPR_INFO,
"[xdslb %p] Balancer call finished; entering fallback mode",
xdslb_policy);
xdslb_policy->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
lb_chand->CancelConnectivityWatchLocked();
xdslb_policy->UpdateFallbackPolicyLocked();
}
}
}
lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
@ -1164,7 +1353,7 @@ XdsLb::XdsLb(Args args)
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
// Record fallback timeout.
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
arg = grpc_channel_args_find(args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS);
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
}
@ -1177,14 +1366,25 @@ XdsLb::~XdsLb() {
void XdsLb::ShutdownLocked() {
shutting_down_ = true;
if (fallback_timer_callback_pending_) {
if (fallback_at_startup_checks_pending_) {
grpc_timer_cancel(&lb_fallback_timer_);
}
locality_map_.ShutdownLocked();
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
interested_parties());
}
if (pending_fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_fallback_policy_->interested_parties(), interested_parties());
}
{
MutexLock lock(&fallback_policy_mu_);
fallback_policy_.reset();
pending_fallback_policy_.reset();
}
// We reset the LB channels here instead of in our destructor because they
// hold refs to XdsLb.
{
MutexLock lock(&lb_chand_mu_);
lb_chand_.reset();
@ -1204,12 +1404,31 @@ void XdsLb::ResetBackoffLocked() {
grpc_channel_reset_connect_backoff(pending_lb_chand_->channel());
}
locality_map_.ResetBackoffLocked();
if (fallback_policy_ != nullptr) {
fallback_policy_->ResetBackoffLocked();
}
if (pending_fallback_policy_ != nullptr) {
pending_fallback_policy_->ResetBackoffLocked();
}
}
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
// Delegate to the child_policy_ to fill the children subchannels.
// Delegate to the locality_map_ to fill the children subchannels.
locality_map_.FillChildRefsForChannelz(child_subchannels, child_channels);
{
// This must be done holding fallback_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock(&fallback_policy_mu_);
if (fallback_policy_ != nullptr) {
fallback_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
if (pending_fallback_policy_ != nullptr) {
pending_fallback_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
MutexLock lock(&lb_chand_mu_);
if (lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
@ -1301,57 +1520,213 @@ void XdsLb::ParseLbConfig(Config* xds_config) {
void XdsLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_chand_ == nullptr;
ParseLbConfig(args.config.get());
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if (balancer_name_ == nullptr) {
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
return;
}
ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
// Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must
// have been created from a serverlist.
// TODO(vpowar): Handle the fallback_address changes when we add support for
// fallback in xDS.
locality_map_.UpdateLocked(locality_serverlist_, child_policy_config_.get(),
args_, this);
// If this is the initial update, start the fallback timer.
// Update the existing fallback policy. The fallback policy config and/or the
// fallback addresses may be new.
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks.
if (is_initial_update) {
if (lb_fallback_timeout_ms_ > 0 && locality_serverlist_.empty() &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// TODO(juanlishen): Monitor the connectivity state of the balancer
// channel. If the channel reports TRANSIENT_FAILURE before the
// fallback timeout expires, go into fallback mode early.
}
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_at_startup_checks_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE, we go into fallback mode even if
// the fallback timeout has not elapsed.
lb_chand_->StartConnectivityWatchLocked();
}
}
//
// code for balancer channel and call
// fallback-related methods
//
void XdsLb::MaybeCancelFallbackAtStartupChecks() {
if (!fallback_at_startup_checks_pending_) return;
gpr_log(GPR_INFO,
"[xdslb %p] Cancelling fallback timer and LB channel connectivity "
"watch",
this);
grpc_timer_cancel(&lb_fallback_timer_);
lb_chand_->CancelConnectivityWatchLocked();
fallback_at_startup_checks_pending_ = false;
}
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
xdslb_policy->fallback_timer_callback_pending_ = false;
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (xdslb_policy->locality_serverlist_.empty() &&
// If some fallback-at-startup check is done after the timer fires but before
// this callback actually runs, don't fall back.
if (xdslb_policy->fallback_at_startup_checks_pending_ &&
!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Fallback timer fired. Not using fallback backends",
"[xdslb %p] Child policy not ready after fallback timeout; "
"entering fallback mode",
xdslb_policy);
}
xdslb_policy->fallback_at_startup_checks_pending_ = false;
xdslb_policy->UpdateFallbackPolicyLocked();
xdslb_policy->lb_chand_->CancelConnectivityWatchLocked();
}
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
void XdsLb::UpdateFallbackPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
update_args.addresses = fallback_backend_addresses_;
update_args.config = fallback_policy_config_ == nullptr
? nullptr
: fallback_policy_config_->Ref();
update_args.args = grpc_channel_args_copy(args_);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* fallback_policy_name = fallback_policy_config_ == nullptr
? "round_robin"
: fallback_policy_config_->name();
const bool create_policy =
// case 1
fallback_policy_ == nullptr ||
// case 2b
(pending_fallback_policy_ == nullptr &&
strcmp(fallback_policy_->name(), fallback_policy_name) != 0) ||
// case 3b
(pending_fallback_policy_ != nullptr &&
strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this,
fallback_policy_ == nullptr ? "" : "pending ",
fallback_policy_name);
}
auto new_policy =
CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_
: pending_fallback_policy_;
{
MutexLock lock(&fallback_policy_mu_);
lb_policy = std::move(new_policy);
}
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_fallback_policy_ != nullptr
? pending_fallback_policy_.get()
: fallback_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (grpc_lb_xds_trace.enabled()) {
gpr_log(
GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this,
policy_to_update == pending_fallback_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(update_args));
}
OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked(
const char* name, const grpc_channel_args* args) {
FallbackHelper* helper = New<FallbackHelper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating fallback policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created new fallback policy %s (%p)", this,
name, lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on xDS
// LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void XdsLb::MaybeExitFallbackMode() {
if (fallback_policy_ == nullptr) return;
gpr_log(GPR_INFO, "[xdslb %p] Exiting fallback mode", this);
fallback_policy_.reset();
pending_fallback_policy_.reset();
}
//
// XdsLb::LocalityMap
//
void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) {
for (auto iter = map_.begin(); iter != map_.end();) {
bool found = false;
@ -1391,18 +1766,18 @@ void XdsLb::LocalityMap::UpdateLocked(
PruneLocalities(locality_serverlist);
}
void grpc_core::XdsLb::LocalityMap::ShutdownLocked() {
void XdsLb::LocalityMap::ShutdownLocked() {
MutexLock lock(&child_refs_mu_);
map_.clear();
}
void grpc_core::XdsLb::LocalityMap::ResetBackoffLocked() {
void XdsLb::LocalityMap::ResetBackoffLocked() {
for (auto& p : map_) {
p.second->ResetBackoffLocked();
}
}
void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz(
void XdsLb::LocalityMap::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
MutexLock lock(&child_refs_mu_);
@ -1411,7 +1786,9 @@ void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz(
}
}
// Locality Entry child policy methods
//
// XdsLb::LocalityMap::LocalityEntry
//
grpc_channel_args*
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked(
@ -1466,18 +1843,12 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args_in) {
if (parent_->shutting_down_) return;
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
// TODO(juanlishen): Change this as part of implementing fallback mode.
GPR_ASSERT(serverlist != nullptr);
GPR_ASSERT(serverlist->num_servers > 0);
// Construct update args.
UpdateArgs update_args;
update_args.addresses = ProcessServerlist(serverlist);
update_args.config =
child_policy_config == nullptr ? nullptr : child_policy_config->Ref();
update_args.args = CreateChildPolicyArgsLocked(args_in);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -1618,7 +1989,7 @@ void XdsLb::LocalityMap::LocalityEntry::Orphan() {
}
//
// LocalityEntry::Helper implementation
// XdsLb::LocalityEntry::Helper
//
bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const {
@ -1671,9 +2042,10 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
// This request is from an outdated child, so ignore it.
return;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
// At this point, child_ must be the current child policy.
if (state == GRPC_CHANNEL_READY) entry_->parent_->MaybeExitFallbackMode();
// If we are in fallback mode, ignore update request from the child policy.
if (entry_->parent_->fallback_policy_ != nullptr) return;
GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
RefCountedPtr<XdsLbClientStats> client_stats =
entry_->parent_->lb_chand_->lb_calld() == nullptr

@ -1034,12 +1034,12 @@ TEST_F(SingleBalancerTest, Fallback) {
SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 2;
const size_t kNumBackendsInResolution = backends_.size() / 2;
ResetStub(kFallbackTimeoutMs);
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
addresses.emplace_back(AddressData{backends_[i]->port_, false, ""});
}
SetNextResolution(addresses);
@ -1048,45 +1048,45 @@ TEST_F(SingleBalancerTest, Fallback) {
ScheduleResponseForBalancer(
0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumBackendInResolution /* start_index */), {}),
GetBackendPorts(kNumBackendsInResolution /* start_index */), {}),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
WaitForBackend(i);
}
// The first request.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendInResolution);
CheckRpcSendOk(kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// Fallback is used: each backend returned by the resolver should have
// gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
// Wait until the serverlist reception has been processed and all backends
// in the serverlist are reachable.
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
WaitForBackend(i);
}
// Send out the second request.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendInResolution);
CheckRpcSendOk(backends_.size() - kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// Serverlist is used: each backend returned by the balancer should
// have gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
@ -1101,13 +1101,13 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 3;
const size_t kNumBackendInResolutionUpdate = backends_.size() / 3;
const size_t kNumBackendsInResolution = backends_.size() / 3;
const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3;
ResetStub(kFallbackTimeoutMs);
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
addresses.emplace_back(AddressData{backends_[i]->port_, false, ""});
}
SetNextResolution(addresses);
@ -1116,84 +1116,84 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
ScheduleResponseForBalancer(
0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumBackendInResolution +
kNumBackendInResolutionUpdate /* start_index */),
GetBackendPorts(kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */),
{}),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
WaitForBackend(i);
}
// The first request.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendInResolution);
CheckRpcSendOk(kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// Fallback is used: each backend returned by the resolver should have
// gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
addresses.clear();
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
for (size_t i = kNumBackendInResolution;
i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
for (size_t i = kNumBackendsInResolution;
i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
addresses.emplace_back(AddressData{backends_[i]->port_, false, ""});
}
SetNextResolution(addresses);
// Wait until the resolution update has been processed and all the new
// fallback backends are reachable.
for (size_t i = kNumBackendInResolution;
i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
for (size_t i = kNumBackendsInResolution;
i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
WaitForBackend(i);
}
// Send out the second request.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(kNumBackendInResolutionUpdate);
CheckRpcSendOk(kNumBackendsInResolutionUpdate);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// The resolution update is used: each backend in the resolution update should
// have gotten one request.
for (size_t i = 0; i < kNumBackendInResolution; ++i) {
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendInResolution;
i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
for (size_t i = kNumBackendsInResolution;
i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
// Wait until the serverlist reception has been processed and all backends
// in the serverlist are reachable.
for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
i < backends_.size(); ++i) {
WaitForBackend(i);
}
// Send out the third request.
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendInResolution -
kNumBackendInResolutionUpdate);
CheckRpcSendOk(backends_.size() - kNumBackendsInResolution -
kNumBackendsInResolutionUpdate);
gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
// Serverlist is used: each backend returned by the balancer should
// have gotten one request.
for (size_t i = 0;
i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
i < backends_.size(); ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}

@ -413,7 +413,9 @@ class XdsEnd2endTest : public ::testing::Test {
const grpc::string& expected_targets = "") {
ChannelArguments args;
// TODO(juanlishen): Add setter to ChannelArguments.
args.SetInt(GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, fallback_timeout);
if (fallback_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, fallback_timeout);
}
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
if (!expected_targets.empty()) {
@ -855,15 +857,214 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
// The fallback tests are deferred because the fallback mode hasn't been
// supported yet.
TEST_F(SingleBalancerTest, Fallback) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendsInResolution = backends_.size() / 2;
ResetStub(kFallbackTimeoutMs);
SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution),
kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Send non-empty serverlist only after kServerlistDelayMs.
ScheduleResponseForBalancer(
0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumBackendsInResolution /* start_index */), {}),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
kNumBackendsInResolution /* stop_index */);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// Fallback is used: each backend returned by the resolver should have
// gotten one request.
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
// Wait until the serverlist reception has been processed and all backends
// in the serverlist are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumBackendsInResolution /* start_index */);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// Serverlist is used: each backend returned by the balancer should
// have gotten one request.
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
// The balancer got a single request.
EXPECT_EQ(1U, balancers_[0]->service_.request_count());
// and sent a single response.
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
TEST_F(SingleBalancerTest, FallbackUpdate) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendsInResolution = backends_.size() / 3;
const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3;
ResetStub(kFallbackTimeoutMs);
SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution),
kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Send non-empty serverlist only after kServerlistDelayMs.
ScheduleResponseForBalancer(
0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */),
{}),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
kNumBackendsInResolution /* stop_index */);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// Fallback is used: each backend returned by the resolver should have
// gotten one request.
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
SetNextResolution(GetBackendPorts(kNumBackendsInResolution,
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate),
kDefaultServiceConfig_.c_str());
// Wait until the resolution update has been processed and all the new
// fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumBackendsInResolution /* start_index */,
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* stop_index */);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(kNumBackendsInResolutionUpdate);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// The resolution update is used: each backend in the resolution update should
// have gotten one request.
for (size_t i = 0; i < kNumBackendsInResolution; ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendsInResolution;
i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
// Wait until the serverlist reception has been processed and all backends
// in the serverlist are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */);
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendsInResolution -
kNumBackendsInResolutionUpdate);
gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
// Serverlist is used: each backend returned by the balancer should
// have gotten one request.
for (size_t i = 0;
i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) {
EXPECT_EQ(0U, backends_[i]->service_.request_count());
}
for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate;
i < backends_.size(); ++i) {
EXPECT_EQ(1U, backends_[i]->service_.request_count());
}
// The balancer got a single request.
EXPECT_EQ(1U, balancers_[0]->service_.request_count());
// and sent a single response.
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port_}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()});
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
/* wait_for_ready */ false);
}
// TODO(juanlishen): Add TEST_F(SingleBalancerTest, Fallback)
TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return one balancer and one fallback backend.
SetNextResolution({backends_[0]->port_}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Balancer drops call without sending a serverlist.
balancers_[0]->service_.NotifyDoneWithServerlists();
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
/* wait_for_ready */ false);
}
// TODO(juanlishen): Add TEST_F(SingleBalancerTest, FallbackUpdate)
TEST_F(SingleBalancerTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port_}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()});
// Enter fallback mode because the LB channel fails to connect.
WaitForBackend(0);
// Return a new balancer that sends an empty serverlist.
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends({}, {}), 0);
SetNextResolutionForLbChannelAllBalancers();
// Send RPCs until failure.
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(5000, GPR_TIMESPAN));
do {
auto status = SendRpc();
if (!status.ok()) break;
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
CheckRpcSendFailure();
}
// TODO(juanlishen): Add TEST_F(SingleBalancerTest,
// FallbackEarlyWhenBalancerChannelFails)
TEST_F(SingleBalancerTest, FallbackModeIsExitedAfterChildRready) {
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port_}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()});
// Enter fallback mode because the LB channel fails to connect.
WaitForBackend(0);
// Return a new balancer that sends a dead backend.
ShutdownBackend(1);
ScheduleResponseForBalancer(
0,
BalancerServiceImpl::BuildResponseForBackends({backends_[1]->port_}, {}),
0);
SetNextResolutionForLbChannelAllBalancers();
// The state (TRANSIENT_FAILURE) update from the child policy will be ignored
// because we are still in fallback mode.
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(5000, GPR_TIMESPAN));
// Send 5 seconds worth of RPCs.
do {
CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// After the backend is restarted, the child policy will eventually be READY,
// and we will exit fallback mode.
StartBackend(1);
WaitForBackend(1);
// We have exited fallback mode, so calls will go to the child policy
// exclusively.
CheckRpcSendOk(100);
EXPECT_EQ(0U, backends_[0]->service_.request_count());
EXPECT_EQ(100U, backends_[1]->service_.request_count());
}
TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());

Loading…
Cancel
Save