From adc2163038e563ed326f48db1d921ef3bb9da168 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 18 Mar 2019 09:08:38 -0700 Subject: [PATCH] Go into fallback mode when losing contact with balancer and backends. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 162 +++++++++++------- test/cpp/end2end/grpclb_end2end_test.cc | 133 ++++++++++++-- 2 files changed, 225 insertions(+), 70 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 34fe88215fe..cca490fef98 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -148,6 +148,7 @@ class GrpcLb : public LoadBalancingPolicy { GrpcLbClientStats* client_stats() const { return client_stats_.get(); } bool seen_initial_response() const { return seen_initial_response_; } + bool seen_serverlist() const { return seen_serverlist_; } private: // So Delete() can access our private dtor. @@ -188,6 +189,7 @@ class GrpcLb : public LoadBalancingPolicy { grpc_byte_buffer* recv_message_payload_ = nullptr; grpc_closure lb_on_balancer_message_received_; bool seen_initial_response_ = false; + bool seen_serverlist_ = false; // recv_trailing_metadata grpc_closure lb_on_balancer_status_received_; @@ -298,9 +300,12 @@ class GrpcLb : public LoadBalancingPolicy { static void OnBalancerChannelConnectivityChangedLocked(void* arg, grpc_error* error); + // Methods for dealing with fallback state. + void MaybeEnterFallbackMode(); + static void OnFallbackTimerLocked(void* arg, grpc_error* error); + // Methods for dealing with the balancer call. void StartBalancerCallLocked(); - static void OnFallbackTimerLocked(void* arg, grpc_error* error); void StartBalancerCallRetryTimerLocked(); static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); @@ -347,11 +352,13 @@ class GrpcLb : public LoadBalancingPolicy { // such response has arrived. RefCountedPtr serverlist_; + // Whether we're in fallback mode. + bool fallback_mode_ = false; // Timeout in milliseconds for before using fallback backend addresses. // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - UniquePtr fallback_backend_addresses_; + ServerAddressList fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -367,6 +374,8 @@ class GrpcLb : public LoadBalancingPolicy { OrphanablePtr pending_child_policy_; // The child policy config. RefCountedPtr child_policy_config_; + // Child policy in state READY. + bool child_policy_ready_ = false; }; // @@ -635,6 +644,10 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, GRPC_ERROR_UNREF(state_error); return; } + // Record whether child policy reports READY. + parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY; + // Enter fallback mode if needed. + parent_->MaybeEnterFallbackMode(); // There are three cases to consider here: // 1. We're in fallback mode. In this case, we're always going to use // the child policy's result, so we pass its picker through as-is. @@ -1014,16 +1027,14 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( grpclb_policy, lb_calld, serverlist->num_servers, serverlist_text.get()); } + lb_calld->seen_serverlist_ = true; // Start sending client load report only after we start using the // serverlist returned from the current LB call. if (lb_calld->client_stats_report_interval_ > 0 && lb_calld->client_stats_ == nullptr) { lb_calld->client_stats_ = MakeRefCounted(); - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); - self.release(); + // Ref held by callback. + lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release(); lb_calld->ScheduleNextClientLoadReportLocked(); } // Check if the serverlist differs from the previous one. @@ -1036,18 +1047,34 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( grpclb_policy, lb_calld); } } else { // New serverlist. - if (grpclb_policy->serverlist_ == nullptr) { - // Dispose of the fallback. - if (grpclb_policy->child_policy_ != nullptr) { - gpr_log(GPR_INFO, - "[grpclb %p] Received response from balancer; exiting " - "fallback mode", - grpclb_policy); - } - grpclb_policy->fallback_backend_addresses_.reset(); - if (grpclb_policy->fallback_timer_callback_pending_) { - grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); - } + // Dispose of the fallback. + // TODO(roth): Ideally, we should stay in fallback mode until we + // know that we can reach at least one of the backends in the new + // serverlist. Unfortunately, we can't do that, since we need to + // send the new addresses to the child policy in order to determine + // if they are reachable, and if we don't exit fallback mode now, + // CreateOrUpdateChildPolicyLocked() will use the fallback + // addresses instead of the addresses from the new serverlist. + // However, if we can't reach any of the servers in the new + // serverlist, then the child policy will never switch away from + // the fallback addresses, but the grpclb policy will still think + // that we're not in fallback mode, which means that we won't send + // updates to the child policy when the fallback addresses are + // updated by the resolver. This is sub-optimal, but the only way + // to fix it is to maintain a completely separate child policy for + // fallback mode, and that's more work than we want to put into + // the grpclb implementation at this point, since we're deprecating + // it in favor of the xds policy. We will implement this the + // right way in the xds policy instead. + if (grpclb_policy->fallback_mode_) { + gpr_log(GPR_INFO, + "[grpclb %p] Received response from balancer; exiting " + "fallback mode", + grpclb_policy); + grpclb_policy->fallback_mode_ = false; + } + if (grpclb_policy->fallback_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); } // Update the serverlist in the GrpcLb instance. This serverlist // instance will be destroyed either upon the next update or when the @@ -1103,6 +1130,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // we want to retry connecting. Otherwise, we have deliberately ended this // call and no further action is required. if (lb_calld == grpclb_policy->lb_calld_.get()) { + grpclb_policy->MaybeEnterFallbackMode(); grpclb_policy->lb_calld_.reset(); GPR_ASSERT(!grpclb_policy->shutting_down_); grpclb_policy->channel_control_helper()->RequestReresolution(); @@ -1379,16 +1407,15 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args, // // Returns the backend addresses extracted from the given addresses. -UniquePtr ExtractBackendAddresses( - const ServerAddressList& addresses) { +ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) { void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; grpc_arg arg = grpc_channel_arg_pointer_create( const_cast(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, &lb_token_arg_vtable); - auto backend_addresses = MakeUnique(); + ServerAddressList backend_addresses; for (size_t i = 0; i < addresses.size(); ++i) { if (!addresses[i].IsBalancer()) { - backend_addresses->emplace_back( + backend_addresses.emplace_back( addresses[i].address(), grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1)); } @@ -1485,6 +1512,7 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, "entering fallback mode", self); grpc_timer_cancel(&self->lb_fallback_timer_); + self->fallback_mode_ = true; self->CreateOrUpdateChildPolicyLocked(); } // Done watching connectivity state, so drop ref. @@ -1509,32 +1537,6 @@ void GrpcLb::StartBalancerCallLocked() { lb_calld_->StartQuery(); } -void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { - GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->fallback_timer_callback_pending_ = false; - // If we receive a serverlist after the timer fires but before this callback - // actually runs, don't fall back. - if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ && - error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, - "[grpclb %p] No response from balancer after fallback timeout; " - "entering fallback mode", - grpclb_policy); - GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr); - grpclb_policy->CreateOrUpdateChildPolicyLocked(); - // Cancel connectivity watch, since we no longer need it. - grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(grpclb_policy->lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - grpclb_policy->interested_parties()), - nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr); - } - grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); -} - void GrpcLb::StartBalancerCallRetryTimerLocked() { grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); if (grpc_lb_glb_trace.enabled()) { @@ -1573,6 +1575,54 @@ void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); } +// +// code for handling fallback mode +// + +void GrpcLb::MaybeEnterFallbackMode() { + // Enter fallback mode if all of the following are true: + // - We are not currently in fallback mode. + // - We are not currently waiting for the initial fallback timeout. + // - We are not currently in contact with the balancer. + // - The child policy is not in state READY. + if (!fallback_mode_ && !fallback_timer_callback_pending_ && + (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) && + !child_policy_ready_) { + gpr_log(GPR_INFO, + "[grpclb %p] lost contact with balancer and backends from " + "most recent serverlist; entering fallback mode", + this); + fallback_mode_ = true; + CreateOrUpdateChildPolicyLocked(); + } +} + +void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { + GrpcLb* grpclb_policy = static_cast(arg); + grpclb_policy->fallback_timer_callback_pending_ = false; + // If we receive a serverlist after the timer fires but before this callback + // actually runs, don't fall back. + if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ && + error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "[grpclb %p] No response from balancer after fallback timeout; " + "entering fallback mode", + grpclb_policy); + grpclb_policy->fallback_mode_ = true; + grpclb_policy->CreateOrUpdateChildPolicyLocked(); + // Cancel connectivity watch, since we no longer need it. + grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(grpclb_policy->lb_channel_)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + grpc_client_channel_watch_connectivity_state( + client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + grpclb_policy->interested_parties()), + nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr); + } + grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); +} + // // code for interacting with the child policy // @@ -1581,18 +1631,14 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() { ServerAddressList tmp_addresses; ServerAddressList* addresses = &tmp_addresses; bool is_backend_from_grpclb_load_balancer = false; - if (serverlist_ != nullptr) { + if (fallback_mode_) { + // Note: If fallback backend address list is empty, the child policy + // will go into state TRANSIENT_FAILURE. + addresses = &fallback_backend_addresses_; + } else { tmp_addresses = serverlist_->GetServerAddressList( lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); is_backend_from_grpclb_load_balancer = true; - } else { - // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't - // received any serverlist from the balancer, we use the fallback backends - // returned by the resolver. Note that the fallback backend list may be - // empty, in which case the new round_robin policy will keep the requested - // picks pending. - GPR_ASSERT(fallback_backend_addresses_ != nullptr); - addresses = fallback_backend_addresses_.get(); } GPR_ASSERT(addresses != nullptr); // Replace the server address list in the channel args that we pass down to diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 761b6ec39d3..1eb43266182 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -142,6 +142,8 @@ class BackendServiceImpl : public BackendService { return status; } + void Start() {} + void Shutdown() {} std::set clients() { @@ -278,11 +280,16 @@ class BalancerServiceImpl : public BalancerService { responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } - void Shutdown() { - std::unique_lock lock(mu_); - NotifyDoneWithServerlistsLocked(); + void Start() { + std::lock_guard lock(mu_); + serverlist_done_ = false; + load_report_ready_ = false; responses_and_delays_.clear(); client_stats_.Reset(); + } + + void Shutdown() { + NotifyDoneWithServerlists(); gpr_log(GPR_INFO, "LB[%p]: shut down", this); } @@ -319,10 +326,6 @@ class BalancerServiceImpl : public BalancerService { void NotifyDoneWithServerlists() { std::lock_guard lock(mu_); - NotifyDoneWithServerlistsLocked(); - } - - void NotifyDoneWithServerlistsLocked() { if (!serverlist_done_) { serverlist_done_ = true; serverlist_cond_.notify_all(); @@ -617,6 +620,7 @@ class GrpclbEnd2endTest : public ::testing::Test { gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); GPR_ASSERT(!running_); running_ = true; + service_.Start(); std::mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Serve from firing before the wait below is hit. @@ -1197,6 +1201,112 @@ TEST_F(SingleBalancerTest, FallbackUpdate) { EXPECT_EQ(1U, balancers_[0]->service_.response_count()); } +TEST_F(SingleBalancerTest, + FallbackAfterStartup_LoseContactWithBalancerThenBackends) { + // First two backends are fallback, last two are pointed to by balancer. + const size_t kNumFallbackBackends = 2; + const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends; + std::vector addresses; + for (size_t i = 0; i < kNumFallbackBackends; ++i) { + addresses.emplace_back(AddressData{backends_[i]->port_, false, ""}); + } + for (size_t i = 0; i < balancers_.size(); ++i) { + addresses.emplace_back(AddressData{balancers_[i]->port_, true, ""}); + } + SetNextResolution(addresses); + ScheduleResponseForBalancer(0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumFallbackBackends), {}), + 0); + // Try to connect. + channel_->GetState(true /* try_to_connect */); + WaitForAllBackends(1 /* num_requests_multiple_of */, + kNumFallbackBackends /* start_index */); + // Stop balancer. RPCs should continue going to backends from balancer. + balancers_[0]->Shutdown(); + CheckRpcSendOk(100 * kNumBalancerBackends); + for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { + EXPECT_EQ(100UL, backends_[i]->service_.request_count()); + } + // Stop backends from balancer. This should put us in fallback mode. + for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { + ShutdownBackend(i); + } + WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */, + kNumFallbackBackends /* stop_index */); + // Restart the backends from the balancer. We should *not* start + // sending traffic back to them at this point (although the behavior + // in xds may be different). + for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { + StartBackend(i); + } + CheckRpcSendOk(100 * kNumBalancerBackends); + for (size_t i = 0; i < kNumFallbackBackends; ++i) { + EXPECT_EQ(100UL, backends_[i]->service_.request_count()); + } + // Now start the balancer again. This should cause us to exit + // fallback mode. + balancers_[0]->Start(server_host_); + ScheduleResponseForBalancer(0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumFallbackBackends), {}), + 0); + WaitForAllBackends(1 /* num_requests_multiple_of */, + kNumFallbackBackends /* start_index */); +} + +TEST_F(SingleBalancerTest, + FallbackAfterStartup_LoseContactWithBackendsThenBalancer) { + // First two backends are fallback, last two are pointed to by balancer. + const size_t kNumFallbackBackends = 2; + const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends; + std::vector addresses; + for (size_t i = 0; i < kNumFallbackBackends; ++i) { + addresses.emplace_back(AddressData{backends_[i]->port_, false, ""}); + } + for (size_t i = 0; i < balancers_.size(); ++i) { + addresses.emplace_back(AddressData{balancers_[i]->port_, true, ""}); + } + SetNextResolution(addresses); + ScheduleResponseForBalancer(0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumFallbackBackends), {}), + 0); + // Try to connect. + channel_->GetState(true /* try_to_connect */); + WaitForAllBackends(1 /* num_requests_multiple_of */, + kNumFallbackBackends /* start_index */); + // Stop backends from balancer. Since we are still in contact with + // the balancer at this point, RPCs should be failing. + for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { + ShutdownBackend(i); + } + CheckRpcSendFailure(); + // Stop balancer. This should put us in fallback mode. + balancers_[0]->Shutdown(); + WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */, + kNumFallbackBackends /* stop_index */); + // Restart the backends from the balancer. We should *not* start + // sending traffic back to them at this point (although the behavior + // in xds may be different). + for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { + StartBackend(i); + } + CheckRpcSendOk(100 * kNumBalancerBackends); + for (size_t i = 0; i < kNumFallbackBackends; ++i) { + EXPECT_EQ(100UL, backends_[i]->service_.request_count()); + } + // Now start the balancer again. This should cause us to exit + // fallback mode. + balancers_[0]->Start(server_host_); + ScheduleResponseForBalancer(0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumFallbackBackends), {}), + 0); + WaitForAllBackends(1 /* num_requests_multiple_of */, + kNumFallbackBackends /* start_index */); +} + TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); @@ -1221,11 +1331,6 @@ TEST_F(SingleBalancerTest, BackendsRestart) { channel_->GetState(true /* try_to_connect */); // Send kNumRpcsPerAddress RPCs per server. CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); - balancers_[0]->service_.NotifyDoneWithServerlists(); - // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); // Stop backends. RPCs should fail. ShutdownAllBackends(); CheckRpcSendFailure(); @@ -1233,6 +1338,10 @@ TEST_F(SingleBalancerTest, BackendsRestart) { StartAllBackends(); CheckRpcSendOk(1 /* times */, 1000 /* timeout_ms */, true /* wait_for_ready */); + // The balancer got a single request. + EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancers_[0]->service_.response_count()); } class UpdatesTest : public GrpclbEnd2endTest {