Merge pull request #16342 from markdroth/pf_keep_unselected_subchannels

Change pick_first to not unref unselected subchannels.
pull/16469/head
Mark D. Roth 7 years ago committed by GitHub
commit a144c13201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 9
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  3. 13
      src/core/ext/filters/client_channel/resolver.h
  4. 12
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  5. 12
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  6. 18
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  7. 3
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  8. 7
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
  9. 117
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  10. 58
      test/core/client_channel/resolvers/fake_resolver_test.cc
  11. 68
      test/cpp/end2end/client_lb_end2end_test.cc

@ -126,7 +126,6 @@ class PickFirst : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
void UpdateChildRefsLocked();
// All our subchannels.
@ -250,14 +249,9 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
subchannel_list_->subchannel(i)
->CheckConnectivityStateAndStartWatchingLocked();
break;
}
}
if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
}
}
@ -294,15 +288,6 @@ bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
return false;
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
@ -419,7 +404,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
selected_ = sd;
subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
@ -504,7 +488,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
UnrefSubchannelLocked("pf_selected_shutdown");
StopConnectivityWatchLocked();
} else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
@ -535,11 +518,9 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
PickFirstSubchannelData* sd = this;
do {
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
} while (sd->subchannel() == nullptr);
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
@ -600,8 +581,6 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
PickState* pick;
while ((pick = p->pending_picks_)) {

@ -102,11 +102,6 @@ class SubchannelData {
return pending_connectivity_state_unsafe_;
}
// Unrefs the subchannel. May be used if an individual subchannel is
// no longer needed even though the subchannel list as a whole is not
// being unreffed.
virtual void UnrefSubchannelLocked(const char* reason);
// Resets the connection backoff.
// TODO(roth): This method should go away when we move the backoff
// code out of the subchannel and into the LB policies.
@ -154,6 +149,10 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
// Unrefs the subchannel. May be overridden by subclasses that need
// to perform extra cleanup when unreffing the subchannel.
virtual void UnrefSubchannelLocked(const char* reason);
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.

@ -81,18 +81,7 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
///
/// If this causes new data to become available, then the currently
/// pending call to \a NextLocked() will return the new result.
///
/// Note: Currently, all resolvers are required to return a new result
/// shortly after this method is called. For pull-based mechanisms, if
/// the implementation decides to delay querying the name service, it
/// should immediately return a new copy of the previously returned
/// result (and it can then return the updated data later, when it
/// actually does query the name service). For push-based mechanisms,
/// the implementation should immediately return a new copy of the
/// last-seen result.
/// TODO(roth): Remove this requirement once we fix pick_first to not
/// throw away unselected subchannels.
virtual void RequestReresolutionLocked() GRPC_ABSTRACT;
virtual void RequestReresolutionLocked() {}
/// Resets the re-resolution backoff, if any.
/// This needs to be implemented only by pull-based implementations;

@ -373,13 +373,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void AresDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -401,10 +395,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
}

@ -247,13 +247,7 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void NativeDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -275,10 +269,6 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
}

@ -73,11 +73,6 @@ class FakeResolver : public Resolver {
// Results to use for the pretended re-resolution in
// RequestReresolutionLocked().
grpc_channel_args* reresolution_results_ = nullptr;
// TODO(juanlishen): This can go away once pick_first is changed to not throw
// away its subchannels, since that will eliminate its dependence on
// channel_saw_error_locked() causing an immediate resolver return.
// A copy of the most-recently used resolution results.
grpc_channel_args* last_used_results_ = nullptr;
// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
@ -96,7 +91,6 @@ FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
FakeResolver::~FakeResolver() {
grpc_channel_args_destroy(next_results_);
grpc_channel_args_destroy(reresolution_results_);
grpc_channel_args_destroy(last_used_results_);
grpc_channel_args_destroy(channel_args_);
}
@ -109,17 +103,11 @@ void FakeResolver::NextLocked(grpc_channel_args** target_result,
}
void FakeResolver::RequestReresolutionLocked() {
// A resolution must have been returned before an error is seen.
GPR_ASSERT(last_used_results_ != nullptr);
grpc_channel_args_destroy(next_results_);
if (reresolution_results_ != nullptr) {
grpc_channel_args_destroy(next_results_);
next_results_ = grpc_channel_args_copy(reresolution_results_);
} else {
// If reresolution_results is unavailable, re-resolve with the most-recently
// used results to avoid a no-op re-resolution.
next_results_ = grpc_channel_args_copy(last_used_results_);
MaybeFinishNextLocked();
}
MaybeFinishNextLocked();
}
void FakeResolver::MaybeFinishNextLocked() {
@ -161,8 +149,6 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
FakeResolver* resolver = closure_arg->generator->resolver_;
grpc_channel_args_destroy(resolver->next_results_);
resolver->next_results_ = closure_arg->response;
grpc_channel_args_destroy(resolver->last_used_results_);
resolver->last_used_results_ = grpc_channel_args_copy(closure_arg->response);
resolver->MaybeFinishNextLocked();
Delete(closure_arg);
}

@ -53,7 +53,8 @@ class FakeResolverResponseGenerator
// The new re-resolution response replaces any previous re-resolution
// response that may have been set by a previous call.
// If the re-resolution response is set to NULL, then the fake
// resolver will return the last value set via \a SetResponse().
// resolver will not return anything when \a RequestReresolutionLocked()
// is called.
void SetReresolutionResponse(grpc_channel_args* response);
// Tells the resolver to return a transient failure (signalled by

@ -50,8 +50,6 @@ class SockaddrResolver : public Resolver {
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void RequestReresolutionLocked() override;
void ShutdownLocked() override;
private:
@ -90,11 +88,6 @@ void SockaddrResolver::NextLocked(grpc_channel_args** target_result,
MaybeFinishNextLocked();
}
void SockaddrResolver::RequestReresolutionLocked() {
published_ = false;
MaybeFinishNextLocked();
}
void SockaddrResolver::ShutdownLocked() {
if (next_completion_ != nullptr) {
*target_result_ = nullptr;

@ -28,12 +28,16 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "test/core/util/test_config.h"
constexpr int kMinResolutionPeriodMs = 1000;
// Provide some slack when checking intervals, to allow for test timing issues.
constexpr int kMinResolutionPeriodForCheckMs = 900;
extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
static grpc_address_resolver_vtable* default_resolve_address;
static grpc_combiner* g_combiner;
grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
static grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@ -43,7 +47,7 @@ grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
// times a system-level resolution has happened.
static int g_resolution_count;
struct iomgr_args {
static struct iomgr_args {
gpr_event ev;
gpr_atm done_atm;
gpr_mu* mu;
@ -61,6 +65,16 @@ static void test_resolve_address_impl(const char* name,
default_resolve_address->resolve_address(
name, default_port, g_iomgr_args.pollset_set, on_done, addrs);
++g_resolution_count;
static grpc_millis last_resolution_time = 0;
if (last_resolution_time == 0) {
last_resolution_time =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
} else {
grpc_millis now =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
GPR_ASSERT(now - last_resolution_time >= kMinResolutionPeriodForCheckMs);
last_resolution_time = now;
}
}
static grpc_error* test_blocking_resolve_address_impl(
@ -73,7 +87,7 @@ static grpc_error* test_blocking_resolve_address_impl(
static grpc_address_resolver_vtable test_resolver = {
test_resolve_address_impl, test_blocking_resolve_address_impl};
grpc_ares_request* test_dns_lookup_ares_locked(
static grpc_ares_request* test_dns_lookup_ares_locked(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@ -82,6 +96,16 @@ grpc_ares_request* test_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs,
check_grpclb, service_config_json, combiner);
++g_resolution_count;
static grpc_millis last_resolution_time = 0;
if (last_resolution_time == 0) {
last_resolution_time =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
} else {
grpc_millis now =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
GPR_ASSERT(now - last_resolution_time >= kMinResolutionPeriodForCheckMs);
last_resolution_time = now;
}
return result;
}
@ -91,7 +115,7 @@ static gpr_timespec test_deadline(void) {
static void do_nothing(void* arg, grpc_error* error) {}
void iomgr_args_init(iomgr_args* args) {
static void iomgr_args_init(iomgr_args* args) {
gpr_event_init(&args->ev);
args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args->pollset, &args->mu);
@ -100,7 +124,7 @@ void iomgr_args_init(iomgr_args* args) {
gpr_atm_rel_store(&args->done_atm, 0);
}
void iomgr_args_finish(iomgr_args* args) {
static void iomgr_args_finish(iomgr_args* args) {
GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline()));
grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
grpc_pollset_set_destroy(args->pollset_set);
@ -146,29 +170,19 @@ struct OnResolutionCallbackArg {
const char* uri_str = nullptr;
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
grpc_channel_args* result = nullptr;
grpc_millis delay_before_second_resolution = 0;
};
// Counter for the number of times a resolution notification callback has been
// invoked.
static int g_on_resolution_invocations_count;
// Set to true by the last callback in the resolution chain.
bool g_all_callbacks_invoked;
static bool g_all_callbacks_invoked;
void on_fourth_resolution(void* arg, grpc_error* error) {
static void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"4th: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// In this case we expect to have incurred in another system-level resolution
// because on_third_resolution slept for longer than the min resolution
// period.
GPR_ASSERT(g_on_resolution_invocations_count == 4);
GPR_ASSERT(g_resolution_count == 3);
gpr_log(GPR_INFO, "2nd: g_resolution_count: %d", g_resolution_count);
// The resolution callback was not invoked until new data was
// available, which was delayed until after the cooldown period.
GPR_ASSERT(g_resolution_count == 2);
cb_arg->resolver.reset();
gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
gpr_mu_lock(g_iomgr_args.mu);
@ -179,67 +193,13 @@ void on_fourth_resolution(void* arg, grpc_error* error) {
g_all_callbacks_invoked = true;
}
void on_third_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The timer set because of the previous re-resolution request fires, so a new
// system-level resolution happened.
GPR_ASSERT(g_on_resolution_invocations_count == 3);
GPR_ASSERT(g_resolution_count == 2);
grpc_core::ExecCtx::Get()->TestOnlySetNow(
cb_arg->delay_before_second_resolution * 2);
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_fourth_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"2nd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The resolution request for which this function is the callback happened
// before the min resolution period. Therefore, no new system-level
// resolutions happened, as indicated by g_resolution_count. But a resolution
// timer was set to fire when the cooldown finishes.
GPR_ASSERT(g_on_resolution_invocations_count == 2);
GPR_ASSERT(g_resolution_count == 1);
// Register a new callback to capture the timer firing.
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_third_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_first_resolution(void* arg, grpc_error* error) {
static void on_first_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"1st: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
gpr_log(GPR_INFO, "1st: g_resolution_count: %d", g_resolution_count);
// There's one initial system-level resolution and one invocation of a
// notification callback (the current function).
GPR_ASSERT(g_on_resolution_invocations_count == 1);
GPR_ASSERT(g_resolution_count == 1);
cb_arg->resolver->NextLocked(
&cb_arg->result,
@ -265,9 +225,7 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
g_on_resolution_invocations_count = 0;
g_resolution_count = 0;
constexpr int kMinResolutionPeriodMs = 1000;
grpc_arg cooldown_arg;
cooldown_arg.key =
@ -280,7 +238,6 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
res_cb_arg->resolver = factory->CreateResolver(args);
grpc_channel_args_destroy(cooldown_channel_args);
GPR_ASSERT(res_cb_arg->resolver != nullptr);
res_cb_arg->delay_before_second_resolution = kMinResolutionPeriodMs;
// First resolution, would incur in system-level resolution.
res_cb_arg->resolver->NextLocked(
&res_cb_arg->result,

@ -124,8 +124,8 @@ static void test_fake_resolver() {
build_fake_resolver(combiner, response_generator.get());
GPR_ASSERT(resolver.get() != nullptr);
// Test 1: normal resolution.
// next_results != NULL, reresolution_results == NULL, last_used_results ==
// NULL. Expected response is next_results.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
grpc_channel_args* results = create_new_resolver_result();
on_resolution_arg on_res_arg = create_on_resolution_arg(results);
grpc_closure* on_resolution = GRPC_CLOSURE_CREATE(
@ -137,10 +137,9 @@ static void test_fake_resolver() {
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 2: update resolution.
// next_results != NULL, reresolution_results == NULL, last_used_results !=
// NULL. Expected response is next_results.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
results = create_new_resolver_result();
grpc_channel_args* last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@ -150,21 +149,9 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 3: fallback re-resolution.
// next_results == NULL, reresolution_results == NULL, last_used_results !=
// NULL. Expected response is last_used_results.
on_res_arg = create_on_resolution_arg(last_used_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 4: normal re-resolution.
// next_results == NULL, reresolution_results != NULL, last_used_results !=
// NULL. Expected response is reresolution_results.
// Test 3: normal re-resolution.
// next_results == NULL, reresolution_results != NULL.
// Expected response is reresolution_results.
grpc_channel_args* reresolution_results = create_new_resolver_result();
on_res_arg =
create_on_resolution_arg(grpc_channel_args_copy(reresolution_results));
@ -180,9 +167,9 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 5: repeat re-resolution.
// next_results == NULL, reresolution_results != NULL, last_used_results !=
// NULL. Expected response is reresolution_results.
// Test 4: repeat re-resolution.
// next_results == NULL, reresolution_results != NULL.
// Expected response is reresolution_results.
on_res_arg = create_on_resolution_arg(reresolution_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@ -192,11 +179,10 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 6: normal resolution.
// next_results != NULL, reresolution_results != NULL, last_used_results !=
// NULL. Expected response is next_results.
// Test 5: normal resolution.
// next_results != NULL, reresolution_results != NULL.
// Expected response is next_results.
results = create_new_resolver_result();
last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@ -206,23 +192,7 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 7: fallback re-resolution.
// next_results == NULL, reresolution_results == NULL, last_used_results !=
// NULL. Expected response is last_used_results.
on_res_arg = create_on_resolution_arg(last_used_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
// Reset reresolution_results.
response_generator->SetReresolutionResponse(nullptr);
// Flush here to guarantee that reresolution_results has been reset.
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 8: no-op.
// Test 6: no-op.
// Requesting a new resolution without setting the response shouldn't trigger
// the resolution callback.
memset(&on_res_arg, 0, sizeof(on_res_arg));

@ -129,12 +129,23 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
}
void StartServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
void CreateServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
servers_.clear();
for (size_t i = 0; i < num_servers; ++i) {
int port = 0;
if (ports.size() == num_servers) port = ports[i];
servers_.emplace_back(new ServerData(server_host_, port));
servers_.emplace_back(new ServerData(port));
}
}
void StartServer(size_t index) { servers_[index]->Start(server_host_); }
void StartServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
CreateServers(num_servers, ports);
for (size_t i = 0; i < num_servers; ++i) {
StartServer(i);
}
}
@ -240,20 +251,23 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
explicit ServerData(const grpc::string& server_host, int port = 0) {
explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
}
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Start(const grpc::string& server_host, std::mutex* mu,
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
@ -601,6 +615,41 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 0, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[0]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
StartServers(1, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
}
TEST_F(ClientLbEnd2endTest,
PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
CreateServers(2, ports);
StartServer(1);
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 1, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[1]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
StartServers(2, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
}
TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
@ -613,7 +662,6 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
servers_[0]->Shutdown();
// Channel 1 will receive a re-resolution containing the same server. It will
// create a new subchannel and hold a ref to it.
servers_.clear();
StartServers(1, ports);
gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
auto channel_2 = BuildChannel("pick_first");
@ -632,7 +680,6 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
// Channel 2 will also receive a re-resolution containing the same server.
// Both channels will ref the same subchannel that failed.
servers_.clear();
StartServers(1, ports);
gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
@ -860,7 +907,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown(false);
servers_[i]->Shutdown(true);
}
gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
@ -921,7 +968,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
// Bring the first server back up.
servers_[0].reset(new ServerData(server_host_, ports[0]));
servers_[0].reset(new ServerData(ports[0]));
StartServer(0);
// Requests should start arriving at the first server either right away (if
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before

Loading…
Cancel
Save