Go into fallback mode when losing contact with balancer and backends.

pull/18344/head
Mark D. Roth 6 years ago
parent 8c3d4a7dfd
commit adc2163038
  1. 162
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 133
      test/cpp/end2end/grpclb_end2end_test.cc

@ -148,6 +148,7 @@ class GrpcLb : public LoadBalancingPolicy {
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
bool seen_initial_response() const { return seen_initial_response_; }
bool seen_serverlist() const { return seen_serverlist_; }
private:
// So Delete() can access our private dtor.
@ -188,6 +189,7 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure lb_on_balancer_message_received_;
bool seen_initial_response_ = false;
bool seen_serverlist_ = false;
// recv_trailing_metadata
grpc_closure lb_on_balancer_status_received_;
@ -298,9 +300,12 @@ class GrpcLb : public LoadBalancingPolicy {
static void OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error);
// Methods for dealing with fallback state.
void MaybeEnterFallbackMode();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
@ -347,11 +352,13 @@ class GrpcLb : public LoadBalancingPolicy {
// such response has arrived.
RefCountedPtr<Serverlist> serverlist_;
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
UniquePtr<ServerAddressList> fallback_backend_addresses_;
ServerAddressList fallback_backend_addresses_;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
@ -367,6 +374,8 @@ class GrpcLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// The child policy config.
RefCountedPtr<Config> child_policy_config_;
// Child policy in state READY.
bool child_policy_ready_ = false;
};
//
@ -635,6 +644,10 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
GRPC_ERROR_UNREF(state_error);
return;
}
// Record whether child policy reports READY.
parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
// Enter fallback mode if needed.
parent_->MaybeEnterFallbackMode();
// There are three cases to consider here:
// 1. We're in fallback mode. In this case, we're always going to use
// the child policy's result, so we pass its picker through as-is.
@ -1014,16 +1027,14 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpclb_policy, lb_calld, serverlist->num_servers,
serverlist_text.get());
}
lb_calld->seen_serverlist_ = true;
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
self.release();
// Ref held by callback.
lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
lb_calld->ScheduleNextClientLoadReportLocked();
}
// Check if the serverlist differs from the previous one.
@ -1036,18 +1047,34 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpclb_policy, lb_calld);
}
} else { // New serverlist.
if (grpclb_policy->serverlist_ == nullptr) {
// Dispose of the fallback.
if (grpclb_policy->child_policy_ != nullptr) {
gpr_log(GPR_INFO,
"[grpclb %p] Received response from balancer; exiting "
"fallback mode",
grpclb_policy);
}
grpclb_policy->fallback_backend_addresses_.reset();
if (grpclb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
}
// Dispose of the fallback.
// TODO(roth): Ideally, we should stay in fallback mode until we
// know that we can reach at least one of the backends in the new
// serverlist. Unfortunately, we can't do that, since we need to
// send the new addresses to the child policy in order to determine
// if they are reachable, and if we don't exit fallback mode now,
// CreateOrUpdateChildPolicyLocked() will use the fallback
// addresses instead of the addresses from the new serverlist.
// However, if we can't reach any of the servers in the new
// serverlist, then the child policy will never switch away from
// the fallback addresses, but the grpclb policy will still think
// that we're not in fallback mode, which means that we won't send
// updates to the child policy when the fallback addresses are
// updated by the resolver. This is sub-optimal, but the only way
// to fix it is to maintain a completely separate child policy for
// fallback mode, and that's more work than we want to put into
// the grpclb implementation at this point, since we're deprecating
// it in favor of the xds policy. We will implement this the
// right way in the xds policy instead.
if (grpclb_policy->fallback_mode_) {
gpr_log(GPR_INFO,
"[grpclb %p] Received response from balancer; exiting "
"fallback mode",
grpclb_policy);
grpclb_policy->fallback_mode_ = false;
}
if (grpclb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
}
// Update the serverlist in the GrpcLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
@ -1103,6 +1130,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if (lb_calld == grpclb_policy->lb_calld_.get()) {
grpclb_policy->MaybeEnterFallbackMode();
grpclb_policy->lb_calld_.reset();
GPR_ASSERT(!grpclb_policy->shutting_down_);
grpclb_policy->channel_control_helper()->RequestReresolution();
@ -1379,16 +1407,15 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args,
//
// Returns the backend addresses extracted from the given addresses.
UniquePtr<ServerAddressList> ExtractBackendAddresses(
const ServerAddressList& addresses) {
ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
grpc_arg arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
&lb_token_arg_vtable);
auto backend_addresses = MakeUnique<ServerAddressList>();
ServerAddressList backend_addresses;
for (size_t i = 0; i < addresses.size(); ++i) {
if (!addresses[i].IsBalancer()) {
backend_addresses->emplace_back(
backend_addresses.emplace_back(
addresses[i].address(),
grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
}
@ -1485,6 +1512,7 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
"entering fallback mode",
self);
grpc_timer_cancel(&self->lb_fallback_timer_);
self->fallback_mode_ = true;
self->CreateOrUpdateChildPolicyLocked();
}
// Done watching connectivity state, so drop ref.
@ -1509,32 +1537,6 @@ void GrpcLb::StartBalancerCallLocked() {
lb_calld_->StartQuery();
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->fallback_timer_callback_pending_ = false;
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
grpclb_policy->CreateOrUpdateChildPolicyLocked();
// Cancel connectivity watch, since we no longer need it.
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
grpclb_policy->interested_parties()),
nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
void GrpcLb::StartBalancerCallRetryTimerLocked() {
grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
if (grpc_lb_glb_trace.enabled()) {
@ -1573,6 +1575,54 @@ void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
//
// code for handling fallback mode
//
void GrpcLb::MaybeEnterFallbackMode() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_timer_callback_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
"[grpclb %p] lost contact with balancer and backends from "
"most recent serverlist; entering fallback mode",
this);
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->fallback_timer_callback_pending_ = false;
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
grpclb_policy);
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
// Cancel connectivity watch, since we no longer need it.
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
grpclb_policy->interested_parties()),
nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
}
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
//
// code for interacting with the child policy
//
@ -1581,18 +1631,14 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() {
ServerAddressList tmp_addresses;
ServerAddressList* addresses = &tmp_addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
if (fallback_mode_) {
// Note: If fallback backend address list is empty, the child policy
// will go into state TRANSIENT_FAILURE.
addresses = &fallback_backend_addresses_;
} else {
tmp_addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
} else {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
GPR_ASSERT(fallback_backend_addresses_ != nullptr);
addresses = fallback_backend_addresses_.get();
}
GPR_ASSERT(addresses != nullptr);
// Replace the server address list in the channel args that we pass down to

@ -142,6 +142,8 @@ class BackendServiceImpl : public BackendService {
return status;
}
void Start() {}
void Shutdown() {}
std::set<grpc::string> clients() {
@ -278,11 +280,16 @@ class BalancerServiceImpl : public BalancerService {
responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
}
void Shutdown() {
std::unique_lock<std::mutex> lock(mu_);
NotifyDoneWithServerlistsLocked();
void Start() {
std::lock_guard<std::mutex> lock(mu_);
serverlist_done_ = false;
load_report_ready_ = false;
responses_and_delays_.clear();
client_stats_.Reset();
}
void Shutdown() {
NotifyDoneWithServerlists();
gpr_log(GPR_INFO, "LB[%p]: shut down", this);
}
@ -319,10 +326,6 @@ class BalancerServiceImpl : public BalancerService {
void NotifyDoneWithServerlists() {
std::lock_guard<std::mutex> lock(mu_);
NotifyDoneWithServerlistsLocked();
}
void NotifyDoneWithServerlistsLocked() {
if (!serverlist_done_) {
serverlist_done_ = true;
serverlist_cond_.notify_all();
@ -617,6 +620,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
GPR_ASSERT(!running_);
running_ = true;
service_.Start();
std::mutex mu;
// We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Serve from firing before the wait below is hit.
@ -1197,6 +1201,112 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
TEST_F(SingleBalancerTest,
FallbackAfterStartup_LoseContactWithBalancerThenBackends) {
// First two backends are fallback, last two are pointed to by balancer.
const size_t kNumFallbackBackends = 2;
const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends;
std::vector<AddressData> addresses;
for (size_t i = 0; i < kNumFallbackBackends; ++i) {
addresses.emplace_back(AddressData{backends_[i]->port_, false, ""});
}
for (size_t i = 0; i < balancers_.size(); ++i) {
addresses.emplace_back(AddressData{balancers_[i]->port_, true, ""});
}
SetNextResolution(addresses);
ScheduleResponseForBalancer(0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumFallbackBackends), {}),
0);
// Try to connect.
channel_->GetState(true /* try_to_connect */);
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumFallbackBackends /* start_index */);
// Stop balancer. RPCs should continue going to backends from balancer.
balancers_[0]->Shutdown();
CheckRpcSendOk(100 * kNumBalancerBackends);
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
EXPECT_EQ(100UL, backends_[i]->service_.request_count());
}
// Stop backends from balancer. This should put us in fallback mode.
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
ShutdownBackend(i);
}
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
kNumFallbackBackends /* stop_index */);
// Restart the backends from the balancer. We should *not* start
// sending traffic back to them at this point (although the behavior
// in xds may be different).
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
StartBackend(i);
}
CheckRpcSendOk(100 * kNumBalancerBackends);
for (size_t i = 0; i < kNumFallbackBackends; ++i) {
EXPECT_EQ(100UL, backends_[i]->service_.request_count());
}
// Now start the balancer again. This should cause us to exit
// fallback mode.
balancers_[0]->Start(server_host_);
ScheduleResponseForBalancer(0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumFallbackBackends), {}),
0);
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumFallbackBackends /* start_index */);
}
TEST_F(SingleBalancerTest,
FallbackAfterStartup_LoseContactWithBackendsThenBalancer) {
// First two backends are fallback, last two are pointed to by balancer.
const size_t kNumFallbackBackends = 2;
const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends;
std::vector<AddressData> addresses;
for (size_t i = 0; i < kNumFallbackBackends; ++i) {
addresses.emplace_back(AddressData{backends_[i]->port_, false, ""});
}
for (size_t i = 0; i < balancers_.size(); ++i) {
addresses.emplace_back(AddressData{balancers_[i]->port_, true, ""});
}
SetNextResolution(addresses);
ScheduleResponseForBalancer(0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumFallbackBackends), {}),
0);
// Try to connect.
channel_->GetState(true /* try_to_connect */);
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumFallbackBackends /* start_index */);
// Stop backends from balancer. Since we are still in contact with
// the balancer at this point, RPCs should be failing.
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
ShutdownBackend(i);
}
CheckRpcSendFailure();
// Stop balancer. This should put us in fallback mode.
balancers_[0]->Shutdown();
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
kNumFallbackBackends /* stop_index */);
// Restart the backends from the balancer. We should *not* start
// sending traffic back to them at this point (although the behavior
// in xds may be different).
for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) {
StartBackend(i);
}
CheckRpcSendOk(100 * kNumBalancerBackends);
for (size_t i = 0; i < kNumFallbackBackends; ++i) {
EXPECT_EQ(100UL, backends_[i]->service_.request_count());
}
// Now start the balancer again. This should cause us to exit
// fallback mode.
balancers_[0]->Start(server_host_);
ScheduleResponseForBalancer(0,
BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(kNumFallbackBackends), {}),
0);
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumFallbackBackends /* start_index */);
}
TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
@ -1221,11 +1331,6 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
channel_->GetState(true /* try_to_connect */);
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
balancers_[0]->service_.NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancers_[0]->service_.request_count());
// and sent a single response.
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
// Stop backends. RPCs should fail.
ShutdownAllBackends();
CheckRpcSendFailure();
@ -1233,6 +1338,10 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
StartAllBackends();
CheckRpcSendOk(1 /* times */, 1000 /* timeout_ms */,
true /* wait_for_ready */);
// The balancer got a single request.
EXPECT_EQ(1U, balancers_[0]->service_.request_count());
// and sent a single response.
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
class UpdatesTest : public GrpclbEnd2endTest {

Loading…
Cancel
Save