|
|
|
@ -749,7 +749,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); |
|
|
|
|
GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); |
|
|
|
|
// Empty payload means the LB call was cancelled.
|
|
|
|
|
// Null payload means the LB call was cancelled.
|
|
|
|
|
if (lb_calld != grpclb_policy->lb_calld_.get() || |
|
|
|
|
lb_calld->recv_message_payload_ == nullptr) { |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); |
|
|
|
@ -803,54 +803,45 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( |
|
|
|
|
gpr_free(ipport); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* update serverlist */ |
|
|
|
|
if (serverlist->num_servers > 0) { |
|
|
|
|
// Start sending client load report only after we start using the
|
|
|
|
|
// serverlist returned from the current LB call.
|
|
|
|
|
if (lb_calld->client_stats_report_interval_ > 0 && |
|
|
|
|
lb_calld->client_stats_ == nullptr) { |
|
|
|
|
lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); |
|
|
|
|
// TODO(roth): We currently track this ref manually. Once the
|
|
|
|
|
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
|
|
|
|
|
// with the callback.
|
|
|
|
|
auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); |
|
|
|
|
self.release(); |
|
|
|
|
lb_calld->ScheduleNextClientLoadReportLocked(); |
|
|
|
|
} |
|
|
|
|
if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, |
|
|
|
|
serverlist)) { |
|
|
|
|
if (grpc_lb_glb_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[grpclb %p] Incoming server list identical to current, " |
|
|
|
|
"ignoring.", |
|
|
|
|
grpclb_policy); |
|
|
|
|
} |
|
|
|
|
grpc_grpclb_destroy_serverlist(serverlist); |
|
|
|
|
} else { /* new serverlist */ |
|
|
|
|
if (grpclb_policy->serverlist_ != nullptr) { |
|
|
|
|
/* dispose of the old serverlist */ |
|
|
|
|
grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); |
|
|
|
|
} else { |
|
|
|
|
/* or dispose of the fallback */ |
|
|
|
|
grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); |
|
|
|
|
grpclb_policy->fallback_backend_addresses_ = nullptr; |
|
|
|
|
if (grpclb_policy->fallback_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// and update the copy in the GrpcLb instance. This
|
|
|
|
|
// serverlist instance will be destroyed either upon the next
|
|
|
|
|
// update or when the GrpcLb instance is destroyed.
|
|
|
|
|
grpclb_policy->serverlist_ = serverlist; |
|
|
|
|
grpclb_policy->serverlist_index_ = 0; |
|
|
|
|
grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// Start sending client load report only after we start using the
|
|
|
|
|
// serverlist returned from the current LB call.
|
|
|
|
|
if (lb_calld->client_stats_report_interval_ > 0 && |
|
|
|
|
lb_calld->client_stats_ == nullptr) { |
|
|
|
|
lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); |
|
|
|
|
// TODO(roth): We currently track this ref manually. Once the
|
|
|
|
|
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
|
|
|
|
|
// with the callback.
|
|
|
|
|
auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); |
|
|
|
|
self.release(); |
|
|
|
|
lb_calld->ScheduleNextClientLoadReportLocked(); |
|
|
|
|
} |
|
|
|
|
// Check if the serverlist differs from the previous one.
|
|
|
|
|
if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { |
|
|
|
|
if (grpc_lb_glb_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[grpclb %p] Incoming server list identical to current, " |
|
|
|
|
"ignoring.", |
|
|
|
|
grpclb_policy); |
|
|
|
|
} |
|
|
|
|
grpc_grpclb_destroy_serverlist(serverlist); |
|
|
|
|
} else { // New serverlist.
|
|
|
|
|
if (grpclb_policy->serverlist_ != nullptr) { |
|
|
|
|
// Dispose of the old serverlist.
|
|
|
|
|
grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); |
|
|
|
|
} else { |
|
|
|
|
// Dispose of the fallback.
|
|
|
|
|
grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); |
|
|
|
|
grpclb_policy->fallback_backend_addresses_ = nullptr; |
|
|
|
|
if (grpclb_policy->fallback_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Update the serverlist in the GrpcLb instance. This serverlist
|
|
|
|
|
// instance will be destroyed either upon the next update or when the
|
|
|
|
|
// GrpcLb instance is destroyed.
|
|
|
|
|
grpclb_policy->serverlist_ = serverlist; |
|
|
|
|
grpclb_policy->serverlist_index_ = 0; |
|
|
|
|
grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// No valid initial response or serverlist found.
|
|
|
|
@ -1583,7 +1574,7 @@ void GrpcLb::AddPendingPick(PendingPick* pp) { |
|
|
|
|
bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, |
|
|
|
|
grpc_error** error) { |
|
|
|
|
// Check for drops if we are not using fallback backend addresses.
|
|
|
|
|
if (serverlist_ != nullptr) { |
|
|
|
|
if (serverlist_ != nullptr && serverlist_->num_servers > 0) { |
|
|
|
|
// Look at the index into the serverlist to see if we should drop this call.
|
|
|
|
|
grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; |
|
|
|
|
if (serverlist_index_ == serverlist_->num_servers) { |
|
|
|
@ -1681,7 +1672,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { |
|
|
|
|
grpc_lb_addresses* addresses; |
|
|
|
|
bool is_backend_from_grpclb_load_balancer = false; |
|
|
|
|
if (serverlist_ != nullptr) { |
|
|
|
|
GPR_ASSERT(serverlist_->num_servers > 0); |
|
|
|
|
addresses = ProcessServerlist(serverlist_); |
|
|
|
|
is_backend_from_grpclb_load_balancer = true; |
|
|
|
|
} else { |
|
|
|
|