From 233d3e27ff0f7fe1cc8ad5a3e9c271123e4f0bc3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 18 Mar 2019 11:45:14 -0700 Subject: [PATCH] grpclb fallback-at-startup improvements --- .../client_channel/lb_policy/grpclb/grpclb.cc | 133 ++++++++++-------- test/cpp/end2end/grpclb_end2end_test.cc | 22 ++- 2 files changed, 96 insertions(+), 59 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 22c62661c5c..5906ecafc2a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -299,9 +299,10 @@ class GrpcLb : public LoadBalancingPolicy { void ParseLbConfig(Config* grpclb_config); static void OnBalancerChannelConnectivityChangedLocked(void* arg, grpc_error* error); + void CancelBalancerChannelConnectivityWatchLocked(); // Methods for dealing with fallback state. - void MaybeEnterFallbackMode(); + void MaybeEnterFallbackModeAfterStartup(); static void OnFallbackTimerLocked(void* arg, grpc_error* error); // Methods for dealing with the balancer call. @@ -330,9 +331,6 @@ class GrpcLb : public LoadBalancingPolicy { gpr_atm lb_channel_uuid_ = 0; // Response generator to inject address updates into lb_channel_. RefCountedPtr response_generator_; - // Connectivity state notification. - grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE; - grpc_closure lb_channel_on_connectivity_changed_; // The data associated with the current LB call. It holds a ref to this LB // policy. It's initialized every time we query for backends. It's reset to @@ -354,15 +352,17 @@ class GrpcLb : public LoadBalancingPolicy { // Whether we're in fallback mode. bool fallback_mode_ = false; - // Timeout in milliseconds for before using fallback backend addresses. - // 0 means not using fallback. - int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. ServerAddressList fallback_backend_addresses_; - // Fallback timer. - bool fallback_timer_callback_pending_ = false; + // State for fallback-at-startup checks. + // Timeout after startup after which we will go into fallback mode if + // we have not received a serverlist from the balancer. + int fallback_at_startup_timeout_ = 0; + bool fallback_at_startup_checks_pending_ = false; grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; + grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE; + grpc_closure lb_channel_on_connectivity_changed_; // Lock held when modifying the value of child_policy_ or // pending_child_policy_. @@ -647,7 +647,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, // Record whether child policy reports READY. parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY; // Enter fallback mode if needed. - parent_->MaybeEnterFallbackMode(); + parent_->MaybeEnterFallbackModeAfterStartup(); // There are three cases to consider here: // 1. We're in fallback mode. In this case, we're always going to use // the child policy's result, so we pass its picker through as-is. @@ -804,7 +804,8 @@ void GrpcLb::BalancerCallState::StartQuery() { grpc_op* op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; - op->flags = 0; + op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; op->reserved = nullptr; op++; // Op: send request message. @@ -1073,8 +1074,10 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( grpclb_policy); grpclb_policy->fallback_mode_ = false; } - if (grpclb_policy->fallback_timer_callback_pending_) { + if (grpclb_policy->fallback_at_startup_checks_pending_) { + grpclb_policy->fallback_at_startup_checks_pending_ = false; grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); } // Update the serverlist in the GrpcLb instance. This serverlist // instance will be destroyed either upon the next update or when the @@ -1130,7 +1133,24 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // we want to retry connecting. Otherwise, we have deliberately ended this // call and no further action is required. if (lb_calld == grpclb_policy->lb_calld_.get()) { - grpclb_policy->MaybeEnterFallbackMode(); + // If we did not receive a serverlist and the fallback-at-startup checks + // are pending, go into fallback mode immediately. This short-circuits + // the timeout for the fallback-at-startup case. + if (!lb_calld->seen_serverlist_ && + grpclb_policy->fallback_at_startup_checks_pending_) { + gpr_log(GPR_INFO, + "[grpclb %p] balancer call finished without receiving " + "serverlist; entering fallback mode", + grpclb_policy); + grpclb_policy->fallback_at_startup_checks_pending_ = false; + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); + grpclb_policy->fallback_mode_ = true; + grpclb_policy->CreateOrUpdateChildPolicyLocked(); + } else { + // This handles the fallback-after-startup case. + grpclb_policy->MaybeEnterFallbackModeAfterStartup(); + } grpclb_policy->lb_calld_.reset(); GPR_ASSERT(!grpclb_policy->shutting_down_); grpclb_policy->channel_control_helper()->RequestReresolution(); @@ -1262,6 +1282,8 @@ GrpcLb::GrpcLb(Args args) .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { // Initialization. + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, + grpc_combiner_scheduler(combiner())); GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, grpc_combiner_scheduler(args.combiner)); @@ -1282,9 +1304,9 @@ GrpcLb::GrpcLb(Args args) // Record LB call timeout. arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); - // Record fallback timeout. + // Record fallback-at-startup timeout. arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); - lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( + fallback_at_startup_timeout_ = grpc_channel_arg_get_integer( arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); } @@ -1300,8 +1322,9 @@ void GrpcLb::ShutdownLocked() { if (retry_timer_callback_pending_) { grpc_timer_cancel(&lb_call_retry_timer_); } - if (fallback_timer_callback_pending_) { + if (fallback_at_startup_checks_pending_) { grpc_timer_cancel(&lb_fallback_timer_); + CancelBalancerChannelConnectivityWatchLocked(); } if (child_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), @@ -1373,31 +1396,28 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args, ProcessChannelArgsLocked(args); // Update the existing child policy. if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); - // If this is the initial update, start the fallback timer. + // If this is the initial update, start the fallback-at-startup checks + // and the balancer call. if (is_initial_update) { - if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && - !fallback_timer_callback_pending_) { - grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; - Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, - grpc_combiner_scheduler(combiner())); - fallback_timer_callback_pending_ = true; - grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); - // Start watching the channel's connectivity state. If the channel - // goes into state TRANSIENT_FAILURE, we go into fallback mode even if - // the fallback timeout has not elapsed. - grpc_channel_element* client_channel_elem = - grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - // Ref held by callback. - Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release(); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set(interested_parties()), - &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, - nullptr); - } + fallback_at_startup_checks_pending_ = true; + // Start timer. + grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_; + Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback + grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + // Start watching the channel's connectivity state. If the channel + // goes into state TRANSIENT_FAILURE before the timer fires, we go into + // fallback mode even if the fallback timeout has not elapsed. + grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(lb_channel_)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + // Ref held by callback. + Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release(); + grpc_client_channel_watch_connectivity_state( + client_channel_elem, + grpc_polling_entity_create_from_pollset_set(interested_parties()), + &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, + nullptr); + // Start balancer call. StartBalancerCallLocked(); } } @@ -1490,7 +1510,7 @@ void GrpcLb::ParseLbConfig(Config* grpclb_config) { void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, grpc_error* error) { GrpcLb* self = static_cast(arg); - if (!self->shutting_down_ && self->fallback_timer_callback_pending_) { + if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) { if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { // Not in TRANSIENT_FAILURE. Renew connectivity watch. grpc_channel_element* client_channel_elem = @@ -1511,6 +1531,7 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; " "entering fallback mode", self); + self->fallback_at_startup_checks_pending_ = false; grpc_timer_cancel(&self->lb_fallback_timer_); self->fallback_mode_ = true; self->CreateOrUpdateChildPolicyLocked(); @@ -1519,6 +1540,16 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); } +void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { + grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(lb_channel_)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + grpc_client_channel_watch_connectivity_state( + client_channel_elem, + grpc_polling_entity_create_from_pollset_set(interested_parties()), + nullptr, &lb_channel_on_connectivity_changed_, nullptr); +} + // // code for balancer channel and call // @@ -1579,13 +1610,13 @@ void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { // code for handling fallback mode // -void GrpcLb::MaybeEnterFallbackMode() { +void GrpcLb::MaybeEnterFallbackModeAfterStartup() { // Enter fallback mode if all of the following are true: // - We are not currently in fallback mode. // - We are not currently waiting for the initial fallback timeout. // - We are not currently in contact with the balancer. // - The child policy is not in state READY. - if (!fallback_mode_ && !fallback_timer_callback_pending_ && + if (!fallback_mode_ && !fallback_at_startup_checks_pending_ && (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) && !child_policy_ready_) { gpr_log(GPR_INFO, @@ -1599,26 +1630,18 @@ void GrpcLb::MaybeEnterFallbackMode() { void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->fallback_timer_callback_pending_ = false; // If we receive a serverlist after the timer fires but before this callback // actually runs, don't fall back. - if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ && - error == GRPC_ERROR_NONE) { + if (grpclb_policy->fallback_at_startup_checks_pending_ && + !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "[grpclb %p] No response from balancer after fallback timeout; " "entering fallback mode", grpclb_policy); + grpclb_policy->fallback_at_startup_checks_pending_ = false; + grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); grpclb_policy->fallback_mode_ = true; grpclb_policy->CreateOrUpdateChildPolicyLocked(); - // Cancel connectivity watch, since we no longer need it. - grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(grpclb_policy->lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - grpclb_policy->interested_parties()), - nullptr, &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr); } grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 1eb43266182..3afcd0c578f 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -406,7 +406,7 @@ class GrpclbEnd2endTest : public ::testing::Test { void ResetStub(int fallback_timeout = 0, const grpc::string& expected_targets = "") { ChannelArguments args; - args.SetGrpclbFallbackTimeout(fallback_timeout); + if (fallback_timeout > 0) args.SetGrpclbFallbackTimeout(fallback_timeout); args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); if (!expected_targets.empty()) { @@ -1321,6 +1321,22 @@ TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) { /* wait_for_ready */ false); } +TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) { + const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); + ResetStub(kFallbackTimeoutMs); + // Return an unreachable balancer and one fallback backend. + std::vector addresses; + addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""}); + addresses.emplace_back(AddressData{backends_[0]->port_, false, ""}); + SetNextResolution(addresses); + // Balancer drops call without sending a serverlist. + balancers_[0]->service_.NotifyDoneWithServerlists(); + // Send RPC with deadline less than the fallback timeout and make sure it + // succeeds. + CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000, + /* wait_for_ready */ false); +} + TEST_F(SingleBalancerTest, BackendsRestart) { SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; @@ -1336,7 +1352,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { CheckRpcSendFailure(); // Restart backends. RPCs should start succeeding again. StartAllBackends(); - CheckRpcSendOk(1 /* times */, 1000 /* timeout_ms */, + CheckRpcSendOk(1 /* times */, 2000 /* timeout_ms */, true /* wait_for_ready */); // The balancer got a single request. EXPECT_EQ(1U, balancers_[0]->service_.request_count()); @@ -1867,8 +1883,6 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { // Send one RPC per backend. CheckRpcSendOk(kNumBackendsSecondPass); balancers_[0]->service_.NotifyDoneWithServerlists(); - EXPECT_EQ(2U, balancers_[0]->service_.request_count()); - EXPECT_EQ(2U, balancers_[0]->service_.response_count()); // Check client stats. client_stats = WaitForLoadReports(); EXPECT_EQ(kNumBackendsSecondPass + 1, client_stats.num_calls_started);