From 217de8908526c58d4a6c2c7c403e0d94287da815 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 27 Nov 2018 14:32:42 -0800 Subject: [PATCH] Don't ignore empty serverlists from the grpclb balancer. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 84 ++++++++----------- test/cpp/end2end/grpclb_end2end_test.cc | 8 +- 2 files changed, 41 insertions(+), 51 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index dc0e1f89ce6..7e70f1c28bd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -749,7 +749,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); - // Empty payload means the LB call was cancelled. + // Null payload means the LB call was cancelled. if (lb_calld != grpclb_policy->lb_calld_.get() || lb_calld->recv_message_payload_ == nullptr) { lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); @@ -803,54 +803,45 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( gpr_free(ipport); } } - /* update serverlist */ - if (serverlist->num_servers > 0) { - // Start sending client load report only after we start using the - // serverlist returned from the current LB call. - if (lb_calld->client_stats_report_interval_ > 0 && - lb_calld->client_stats_ == nullptr) { - lb_calld->client_stats_.reset(New()); - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); - self.release(); - lb_calld->ScheduleNextClientLoadReportLocked(); - } - if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, - serverlist)) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - grpclb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (grpclb_policy->serverlist_ != nullptr) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); - grpclb_policy->fallback_backend_addresses_ = nullptr; - if (grpclb_policy->fallback_timer_callback_pending_) { - grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); - } - } - // and update the copy in the GrpcLb instance. This - // serverlist instance will be destroyed either upon the next - // update or when the GrpcLb instance is destroyed. - grpclb_policy->serverlist_ = serverlist; - grpclb_policy->serverlist_index_ = 0; - grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); - } - } else { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval_ > 0 && + lb_calld->client_stats_ == nullptr) { + lb_calld->client_stats_.reset(New()); + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); + self.release(); + lb_calld->ScheduleNextClientLoadReportLocked(); + } + // Check if the serverlist differs from the previous one. + if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", + gpr_log(GPR_INFO, + "[grpclb %p] Incoming server list identical to current, " + "ignoring.", grpclb_policy); } grpc_grpclb_destroy_serverlist(serverlist); + } else { // New serverlist. + if (grpclb_policy->serverlist_ != nullptr) { + // Dispose of the old serverlist. + grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); + } else { + // Dispose of the fallback. + grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); + grpclb_policy->fallback_backend_addresses_ = nullptr; + if (grpclb_policy->fallback_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + } + } + // Update the serverlist in the GrpcLb instance. This serverlist + // instance will be destroyed either upon the next update or when the + // GrpcLb instance is destroyed. + grpclb_policy->serverlist_ = serverlist; + grpclb_policy->serverlist_index_ = 0; + grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); } } else { // No valid initial response or serverlist found. @@ -1583,7 +1574,7 @@ void GrpcLb::AddPendingPick(PendingPick* pp) { bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { // Check for drops if we are not using fallback backend addresses. - if (serverlist_ != nullptr) { + if (serverlist_ != nullptr && serverlist_->num_servers > 0) { // Look at the index into the serverlist to see if we should drop this call. grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; if (serverlist_index_ == serverlist_->num_servers) { @@ -1681,7 +1672,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_lb_addresses* addresses; bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { - GPR_ASSERT(serverlist_->num_servers > 0); addresses = ProcessServerlist(serverlist_); is_backend_from_grpclb_load_balancer = true; } else { diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 6ce0696114e..5b304dc16b1 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -553,10 +553,11 @@ class GrpclbEnd2endTest : public ::testing::Test { return status; } - void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000) { + void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000, + bool wait_for_ready = false) { for (size_t i = 0; i < times; ++i) { EchoResponse response; - const Status status = SendRpc(&response, timeout_ms); + const Status status = SendRpc(&response, timeout_ms, wait_for_ready); EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); EXPECT_EQ(response.message(), kRequestMessage_); @@ -717,10 +718,9 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); - const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. - CheckRpcSendOk(1, kCallDeadlineMs); + CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */); const auto ellapsed_ms = std::chrono::duration_cast( system_clock::now() - t0);