|
|
|
@ -90,9 +90,6 @@ class PickFirst : public LoadBalancingPolicy { |
|
|
|
|
const grpc_channel_args& args) |
|
|
|
|
: SubchannelList(policy, tracer, addresses, combiner, |
|
|
|
|
client_channel_factory, args) {} |
|
|
|
|
|
|
|
|
|
void RefForConnectivityWatch(const char* reason); |
|
|
|
|
void UnrefForConnectivityWatch(const char* reason); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
void ShutdownLocked() override; |
|
|
|
@ -220,9 +217,7 @@ void PickFirst::StartPickingLocked() { |
|
|
|
|
if (subchannel_list_ != nullptr) { |
|
|
|
|
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { |
|
|
|
|
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { |
|
|
|
|
subchannel_list_->RefForConnectivityWatch( |
|
|
|
|
"connectivity_watch+start_picking"); |
|
|
|
|
subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); |
|
|
|
|
subchannel_list_->subchannel(i)->StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -332,8 +327,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { |
|
|
|
|
// If we've started picking, start trying to connect to the first
|
|
|
|
|
// subchannel in the new list.
|
|
|
|
|
if (started_picking_) { |
|
|
|
|
subchannel_list_->RefForConnectivityWatch("connectivity_watch+update"); |
|
|
|
|
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); |
|
|
|
|
subchannel_list_->subchannel(0)->StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// We do have a selected subchannel.
|
|
|
|
@ -359,9 +353,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { |
|
|
|
|
} |
|
|
|
|
subchannel_list_ = std::move(subchannel_list); |
|
|
|
|
DestroyUnselectedSubchannelsLocked(); |
|
|
|
|
subchannel_list_->RefForConnectivityWatch( |
|
|
|
|
"connectivity_watch+replace_selected"); |
|
|
|
|
sd->StartConnectivityWatchLocked(); |
|
|
|
|
sd->StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
// If there was a previously pending update (which may or may
|
|
|
|
|
// not have contained the currently selected subchannel), drop
|
|
|
|
|
// it, so that it doesn't override what we've done here.
|
|
|
|
@ -391,35 +383,12 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { |
|
|
|
|
// If we've started picking, start trying to connect to the first
|
|
|
|
|
// subchannel in the new list.
|
|
|
|
|
if (started_picking_) { |
|
|
|
|
latest_pending_subchannel_list_->RefForConnectivityWatch( |
|
|
|
|
"connectivity_watch+update"); |
|
|
|
|
latest_pending_subchannel_list_->subchannel(0) |
|
|
|
|
->StartConnectivityWatchLocked(); |
|
|
|
|
->StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PickFirst::PickFirstSubchannelList::RefForConnectivityWatch( |
|
|
|
|
const char* reason) { |
|
|
|
|
// TODO(roth): We currently track these refs manually. Once the new
|
|
|
|
|
// ClosureRef API is ready, find a way to pass the RefCountedPtr<>
|
|
|
|
|
// along with the closures instead of doing this manually.
|
|
|
|
|
// Ref subchannel list.
|
|
|
|
|
Ref(DEBUG_LOCATION, reason).release(); |
|
|
|
|
// Ref LB policy.
|
|
|
|
|
PickFirst* p = static_cast<PickFirst*>(policy()); |
|
|
|
|
p->Ref(DEBUG_LOCATION, reason).release(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PickFirst::PickFirstSubchannelList::UnrefForConnectivityWatch( |
|
|
|
|
const char* reason) { |
|
|
|
|
// Unref LB policy.
|
|
|
|
|
PickFirst* p = static_cast<PickFirst*>(policy()); |
|
|
|
|
p->Unref(DEBUG_LOCATION, reason); |
|
|
|
|
// Unref subchannel list.
|
|
|
|
|
Unref(DEBUG_LOCATION, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
grpc_error* error) { |
|
|
|
|
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); |
|
|
|
@ -434,17 +403,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
grpc_connectivity_state_name(connectivity_state()), p->shutdown_, |
|
|
|
|
subchannel_list()->shutting_down(), grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
|
// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
|
|
|
|
|
// If the subchannel list is shutting down, stop watching.
|
|
|
|
|
if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { |
|
|
|
|
StopConnectivityWatchLocked(); |
|
|
|
|
UnrefSubchannelLocked("pf_sl_shutdown"); |
|
|
|
|
subchannel_list()->UnrefForConnectivityWatch("pf_sl_shutdown"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If we're still here, the notification must be for a subchannel in
|
|
|
|
|
// either the current or latest pending subchannel lists.
|
|
|
|
|
// The notification must be for a subchannel in either the current or
|
|
|
|
|
// latest pending subchannel lists.
|
|
|
|
|
GPR_ASSERT(p->subchannel_list_ == subchannel_list() || |
|
|
|
|
p->latest_pending_subchannel_list_ == subchannel_list()); |
|
|
|
|
// Handle updates for the currently selected subchannel.
|
|
|
|
@ -455,8 +415,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
p->latest_pending_subchannel_list_ != nullptr) { |
|
|
|
|
p->selected_ = nullptr; |
|
|
|
|
StopConnectivityWatchLocked(); |
|
|
|
|
subchannel_list()->UnrefForConnectivityWatch( |
|
|
|
|
"selected_not_ready+switch_to_update"); |
|
|
|
|
subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update"); |
|
|
|
|
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); |
|
|
|
|
grpc_connectivity_state_set( |
|
|
|
@ -478,14 +436,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); |
|
|
|
|
// In transient failure. Rely on re-resolution to recover.
|
|
|
|
|
p->selected_ = nullptr; |
|
|
|
|
StopConnectivityWatchLocked(); |
|
|
|
|
subchannel_list()->UnrefForConnectivityWatch("pf_selected_shutdown"); |
|
|
|
|
UnrefSubchannelLocked("pf_selected_shutdown"); |
|
|
|
|
StopConnectivityWatchLocked(); |
|
|
|
|
} else { |
|
|
|
|
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(), |
|
|
|
|
GRPC_ERROR_REF(error), "selected_changed"); |
|
|
|
|
// Renew notification.
|
|
|
|
|
StartConnectivityWatchLocked(); |
|
|
|
|
StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
@ -533,7 +490,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
// Renew notification.
|
|
|
|
|
StartConnectivityWatchLocked(); |
|
|
|
|
StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
|
|
|
@ -551,8 +508,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
GRPC_ERROR_REF(error), "connecting_transient_failure"); |
|
|
|
|
} |
|
|
|
|
// Reuses the connectivity refs from the previous watch.
|
|
|
|
|
sd->StartConnectivityWatchLocked(); |
|
|
|
|
sd->StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case GRPC_CHANNEL_CONNECTING: |
|
|
|
@ -564,7 +520,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
|
|
|
|
"connecting_changed"); |
|
|
|
|
} |
|
|
|
|
// Renew notification.
|
|
|
|
|
StartConnectivityWatchLocked(); |
|
|
|
|
StartOrRenewConnectivityWatchLocked(); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case GRPC_CHANNEL_SHUTDOWN: |
|
|
|
|