Merge pull request #18353 from markdroth/grpclb_fallback_when_balancer_call_fails

grpclb fallback-at-startup improvements
reviewable/pr18437/r1
Mark D. Roth 6 years ago committed by GitHub
commit a41a381302
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 133
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 22
      test/cpp/end2end/grpclb_end2end_test.cc

@ -299,9 +299,10 @@ class GrpcLb : public LoadBalancingPolicy {
void ParseLbConfig(Config* grpclb_config);
static void OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error);
void CancelBalancerChannelConnectivityWatchLocked();
// Methods for dealing with fallback state.
void MaybeEnterFallbackMode();
void MaybeEnterFallbackModeAfterStartup();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
// Methods for dealing with the balancer call.
@ -330,9 +331,6 @@ class GrpcLb : public LoadBalancingPolicy {
gpr_atm lb_channel_uuid_ = 0;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// Connectivity state notification.
grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure lb_channel_on_connectivity_changed_;
// The data associated with the current LB call. It holds a ref to this LB
// policy. It's initialized every time we query for backends. It's reset to
@ -354,15 +352,17 @@ class GrpcLb : public LoadBalancingPolicy {
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
ServerAddressList fallback_backend_addresses_;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
// State for fallback-at-startup checks.
// Timeout after startup after which we will go into fallback mode if
// we have not received a serverlist from the balancer.
int fallback_at_startup_timeout_ = 0;
bool fallback_at_startup_checks_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure lb_channel_on_connectivity_changed_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
@ -647,7 +647,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
// Record whether child policy reports READY.
parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
// Enter fallback mode if needed.
parent_->MaybeEnterFallbackMode();
parent_->MaybeEnterFallbackModeAfterStartup();
// There are three cases to consider here:
// 1. We're in fallback mode. In this case, we're always going to use
// the child policy's result, so we pass its picker through as-is.
@ -804,7 +804,8 @@ void GrpcLb::BalancerCallState::StartQuery() {
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
// Op: send request message.
@ -1073,8 +1074,10 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpclb_policy);
grpclb_policy->fallback_mode_ = false;
}
if (grpclb_policy->fallback_timer_callback_pending_) {
if (grpclb_policy->fallback_at_startup_checks_pending_) {
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
@ -1130,7 +1133,24 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if (lb_calld == grpclb_policy->lb_calld_.get()) {
grpclb_policy->MaybeEnterFallbackMode();
// If we did not receive a serverlist and the fallback-at-startup checks
// are pending, go into fallback mode immediately. This short-circuits
// the timeout for the fallback-at-startup case.
if (!lb_calld->seen_serverlist_ &&
grpclb_policy->fallback_at_startup_checks_pending_) {
gpr_log(GPR_INFO,
"[grpclb %p] balancer call finished without receiving "
"serverlist; entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
} else {
// This handles the fallback-after-startup case.
grpclb_policy->MaybeEnterFallbackModeAfterStartup();
}
grpclb_policy->lb_calld_.reset();
GPR_ASSERT(!grpclb_policy->shutting_down_);
grpclb_policy->channel_control_helper()->RequestReresolution();
@ -1262,6 +1282,8 @@ GrpcLb::GrpcLb(Args args)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
@ -1282,9 +1304,9 @@ GrpcLb::GrpcLb(Args args)
// Record LB call timeout.
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
// Record fallback timeout.
// Record fallback-at-startup timeout.
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
}
@ -1300,8 +1322,9 @@ void GrpcLb::ShutdownLocked() {
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
}
if (fallback_timer_callback_pending_) {
if (fallback_at_startup_checks_pending_) {
grpc_timer_cancel(&lb_fallback_timer_);
CancelBalancerChannelConnectivityWatchLocked();
}
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
@ -1373,31 +1396,28 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args,
ProcessChannelArgsLocked(args);
// Update the existing child policy.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback timer.
// If this is the initial update, start the fallback-at-startup checks
// and the balancer call.
if (is_initial_update) {
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE, we go into fallback mode even if
// the fallback timeout has not elapsed.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
}
fallback_at_startup_checks_pending_ = true;
// Start timer.
grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
// Start balancer call.
StartBalancerCallLocked();
}
}
@ -1490,7 +1510,7 @@ void GrpcLb::ParseLbConfig(Config* grpclb_config) {
void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
if (!self->shutting_down_ && self->fallback_timer_callback_pending_) {
if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem =
@ -1511,6 +1531,7 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
"[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
self);
self->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&self->lb_fallback_timer_);
self->fallback_mode_ = true;
self->CreateOrUpdateChildPolicyLocked();
@ -1519,6 +1540,16 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
}
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
nullptr, &lb_channel_on_connectivity_changed_, nullptr);
}
//
// code for balancer channel and call
//
@ -1579,13 +1610,13 @@ void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
// code for handling fallback mode
//
void GrpcLb::MaybeEnterFallbackMode() {
void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_timer_callback_pending_ &&
if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
@ -1599,26 +1630,18 @@ void GrpcLb::MaybeEnterFallbackMode() {
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->fallback_timer_callback_pending_ = false;
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
error == GRPC_ERROR_NONE) {
if (grpclb_policy->fallback_at_startup_checks_pending_ &&
!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
// Cancel connectivity watch, since we no longer need it.
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
grpclb_policy->interested_parties()),
nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}

@ -406,7 +406,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
void ResetStub(int fallback_timeout = 0,
const grpc::string& expected_targets = "") {
ChannelArguments args;
args.SetGrpclbFallbackTimeout(fallback_timeout);
if (fallback_timeout > 0) args.SetGrpclbFallbackTimeout(fallback_timeout);
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
if (!expected_targets.empty()) {
@ -1321,6 +1321,22 @@ TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
/* wait_for_ready */ false);
}
TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return an unreachable balancer and one fallback backend.
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
addresses.emplace_back(AddressData{backends_[0]->port_, false, ""});
SetNextResolution(addresses);
// Balancer drops call without sending a serverlist.
balancers_[0]->service_.NotifyDoneWithServerlists();
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
/* wait_for_ready */ false);
}
TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
@ -1336,7 +1352,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
CheckRpcSendFailure();
// Restart backends. RPCs should start succeeding again.
StartAllBackends();
CheckRpcSendOk(1 /* times */, 1000 /* timeout_ms */,
CheckRpcSendOk(1 /* times */, 2000 /* timeout_ms */,
true /* wait_for_ready */);
// The balancer got a single request.
EXPECT_EQ(1U, balancers_[0]->service_.request_count());
@ -1867,8 +1883,6 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
// Send one RPC per backend.
CheckRpcSendOk(kNumBackendsSecondPass);
balancers_[0]->service_.NotifyDoneWithServerlists();
EXPECT_EQ(2U, balancers_[0]->service_.request_count());
EXPECT_EQ(2U, balancers_[0]->service_.response_count());
// Check client stats.
client_stats = WaitForLoadReports();
EXPECT_EQ(kNumBackendsSecondPass + 1, client_stats.num_calls_started);

Loading…
Cancel
Save