Fix pick_first to not throw away unused subchannels.

reviewable/pr16342/r2
Mark D. Roth 6 years ago
parent 186df431de
commit 4e1e6ceda9
  1. 22
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 9
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  3. 13
      src/core/ext/filters/client_channel/resolver.h
  4. 12
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  5. 12
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  6. 16
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  7. 3
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  8. 7
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
  9. 17
      test/cpp/end2end/client_lb_end2end_test.cc

@ -125,7 +125,6 @@ class PickFirst : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
void UpdateChildRefsLocked();
// All our subchannels.
@ -293,15 +292,6 @@ bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
return false;
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
@ -418,7 +408,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
selected_ = sd;
subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
@ -503,7 +492,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
UnrefSubchannelLocked("pf_selected_shutdown");
StopConnectivityWatchLocked();
} else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
@ -534,11 +522,9 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
PickFirstSubchannelData* sd = this;
do {
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
} while (sd->subchannel() == nullptr);
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
@ -608,8 +594,6 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
PickState* pick;
while ((pick = p->pending_picks_)) {

@ -102,11 +102,6 @@ class SubchannelData {
return pending_connectivity_state_unsafe_;
}
// Unrefs the subchannel. May be used if an individual subchannel is
// no longer needed even though the subchannel list as a whole is not
// being unreffed.
virtual void UnrefSubchannelLocked(const char* reason);
// Resets the connection backoff.
// TODO(roth): This method should go away when we move the backoff
// code out of the subchannel and into the LB policies.
@ -154,6 +149,10 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
// Unrefs the subchannel. May be overridden by subclasses that need
// to perform extra cleanup when unreffing the subchannel.
virtual void UnrefSubchannelLocked(const char* reason);
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.

@ -81,18 +81,7 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
///
/// If this causes new data to become available, then the currently
/// pending call to \a NextLocked() will return the new result.
///
/// Note: Currently, all resolvers are required to return a new result
/// shortly after this method is called. For pull-based mechanisms, if
/// the implementation decides to delay querying the name service, it
/// should immediately return a new copy of the previously returned
/// result (and it can then return the updated data later, when it
/// actually does query the name service). For push-based mechanisms,
/// the implementation should immediately return a new copy of the
/// last-seen result.
/// TODO(roth): Remove this requirement once we fix pick_first to not
/// throw away unselected subchannels.
virtual void RequestReresolutionLocked() GRPC_ABSTRACT;
virtual void RequestReresolutionLocked() {}
/// Resets the re-resolution backoff, if any.
/// This needs to be implemented only by pull-based implementations;

@ -373,13 +373,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void AresDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -401,10 +395,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
}

@ -247,13 +247,7 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void NativeDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -275,10 +269,6 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
}

@ -73,11 +73,6 @@ class FakeResolver : public Resolver {
// Results to use for the pretended re-resolution in
// RequestReresolutionLocked().
grpc_channel_args* reresolution_results_ = nullptr;
// TODO(juanlishen): This can go away once pick_first is changed to not throw
// away its subchannels, since that will eliminate its dependence on
// channel_saw_error_locked() causing an immediate resolver return.
// A copy of the most-recently used resolution results.
grpc_channel_args* last_used_results_ = nullptr;
// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
@ -96,7 +91,6 @@ FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
FakeResolver::~FakeResolver() {
grpc_channel_args_destroy(next_results_);
grpc_channel_args_destroy(reresolution_results_);
grpc_channel_args_destroy(last_used_results_);
grpc_channel_args_destroy(channel_args_);
}
@ -109,17 +103,11 @@ void FakeResolver::NextLocked(grpc_channel_args** target_result,
}
void FakeResolver::RequestReresolutionLocked() {
// A resolution must have been returned before an error is seen.
GPR_ASSERT(last_used_results_ != nullptr);
grpc_channel_args_destroy(next_results_);
if (reresolution_results_ != nullptr) {
next_results_ = grpc_channel_args_copy(reresolution_results_);
} else {
// If reresolution_results is unavailable, re-resolve with the most-recently
// used results to avoid a no-op re-resolution.
next_results_ = grpc_channel_args_copy(last_used_results_);
MaybeFinishNextLocked();
}
MaybeFinishNextLocked();
}
void FakeResolver::MaybeFinishNextLocked() {
@ -161,8 +149,6 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
FakeResolver* resolver = closure_arg->generator->resolver_;
grpc_channel_args_destroy(resolver->next_results_);
resolver->next_results_ = closure_arg->response;
grpc_channel_args_destroy(resolver->last_used_results_);
resolver->last_used_results_ = grpc_channel_args_copy(closure_arg->response);
resolver->MaybeFinishNextLocked();
Delete(closure_arg);
}

@ -53,7 +53,8 @@ class FakeResolverResponseGenerator
// The new re-resolution response replaces any previous re-resolution
// response that may have been set by a previous call.
// If the re-resolution response is set to NULL, then the fake
// resolver will return the last value set via \a SetResponse().
// resolver will not return anything when \a RequestReresolutionLocked()
// is called.
void SetReresolutionResponse(grpc_channel_args* response);
// Tells the resolver to return a transient failure (signalled by

@ -50,8 +50,6 @@ class SockaddrResolver : public Resolver {
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void RequestReresolutionLocked() override;
void ShutdownLocked() override;
private:
@ -90,11 +88,6 @@ void SockaddrResolver::NextLocked(grpc_channel_args** target_result,
MaybeFinishNextLocked();
}
void SockaddrResolver::RequestReresolutionLocked() {
published_ = false;
MaybeFinishNextLocked();
}
void SockaddrResolver::ShutdownLocked() {
if (next_completion_ != nullptr) {
*target_result_ = nullptr;

@ -584,6 +584,23 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 0, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[0]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
servers_.clear();
StartServers(1, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
}
TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);

Loading…
Cancel
Save