Use fallback before timeout if balancer channel reports TRANSIENT_FAILURE.

pull/18275/head
Mark D. Roth 6 years ago
parent 5fb10c1949
commit 827c77bd24
  1. 80
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 3
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 14
      test/cpp/end2end/grpclb_end2end_test.cc
  4. 3
      test/cpp/end2end/xds_end2end_test.cc

@ -295,8 +295,10 @@ class GrpcLb : public LoadBalancingPolicy {
// Helper functions used in UpdateLocked().
void ProcessChannelArgsLocked(const grpc_channel_args& args);
void ParseLbConfig(Config* grpclb_config);
static void OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error);
// Methods for dealing with the balancer channel and call.
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void StartBalancerCallRetryTimerLocked();
@ -323,6 +325,9 @@ class GrpcLb : public LoadBalancingPolicy {
gpr_atm lb_channel_uuid_ = 0;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// Connectivity state notification.
grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure lb_channel_on_connectivity_changed_;
// The data associated with the current LB call. It holds a ref to this LB
// policy. It's initialized every time we query for backends. It's reset to
@ -1030,6 +1035,12 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
} else { // New serverlist.
if (grpclb_policy->serverlist_ == nullptr) {
// Dispose of the fallback.
if (grpclb_policy->child_policy_ != nullptr) {
gpr_log(GPR_INFO,
"[grpclb %p] Received response from balancer; exiting "
"fallback mode",
grpclb_policy);
}
grpclb_policy->fallback_backend_addresses_.reset();
if (grpclb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
@ -1219,6 +1230,10 @@ GrpcLb::GrpcLb(Args args)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
gpr_mu_init(&child_policy_mu_);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
@ -1329,6 +1344,20 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE, we go into fallback mode even if
// the fallback timeout has not elapsed.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
}
StartBalancerCallLocked();
}
@ -1420,6 +1449,37 @@ void GrpcLb::ParseLbConfig(Config* grpclb_config) {
}
}
void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
if (!self->shutting_down_ && self->fallback_timer_callback_pending_) {
if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
self->interested_parties()),
&self->lb_channel_connectivity_,
&self->lb_channel_on_connectivity_changed_, nullptr);
return; // Early out so we don't drop the ref below.
}
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
self);
grpc_timer_cancel(&self->lb_fallback_timer_);
self->CreateOrUpdateChildPolicyLocked();
}
// Done watching connectivity state, so drop ref.
self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
}
//
// code for balancer channel and call
//
@ -1445,13 +1505,21 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
// actually runs, don't fall back.
if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Falling back to use backends from resolver",
grpclb_policy);
}
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
grpclb_policy->CreateOrUpdateChildPolicyLocked();
// Cancel connectivity watch, since we no longer need it.
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
grpclb_policy->interested_parties()),
nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}

@ -1254,6 +1254,9 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// TODO(juanlishen): Monitor the connectivity state of the balancer
// channel. If the channel reports TRANSIENT_FAILURE before the
// fallback timeout expires, go into fallback mode early.
}
}
}

@ -1194,6 +1194,20 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
}
TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return an unreachable balancer and one fallback backend.
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{grpc_pick_unused_port_or_die(), true, ""});
addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""});
SetNextResolution(addresses);
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
/* wait_for_ready */ false);
}
TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;

@ -868,6 +868,9 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
// TODO(juanlishen): Add TEST_F(SingleBalancerTest, FallbackUpdate)
// TODO(juanlishen): Add TEST_F(SingleBalancerTest,
// FallbackEarlyWhenBalancerChannelFails)
TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();

Loading…
Cancel
Save