Merge pull request #15156 from AspirinSJL/resolution_timer_fix

Check retry timer before starting resolving
pull/15232/merge
Juanli Shen 7 years ago committed by GitHub
commit b61c42ea31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  2. 30
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  3. 115
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc

@ -363,6 +363,15 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
} }
void AresDnsResolver::MaybeStartResolvingLocked() { void AresDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (last_resolution_timestamp_ >= 0) { if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution = const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_; last_resolution_timestamp_ + min_time_between_resolutions_;
@ -375,17 +384,15 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
"In cooldown from last resolution (from %" PRIdPTR "In cooldown from last resolution (from %" PRIdPTR
" ms ago). Will resolve again in %" PRIdPTR " ms", " ms ago). Will resolve again in %" PRIdPTR " ms",
last_resolution_ago, ms_until_next_resolution); last_resolution_ago, ms_until_next_resolution);
if (!have_next_resolution_timer_) { have_next_resolution_timer_ = true;
have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the
// TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer
// new closure API is done, find a way to track this ref with the timer // callback as part of the type system.
// callback as part of the type system. RefCountedPtr<Resolver> self =
RefCountedPtr<Resolver> self = Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); self.release();
self.release(); grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, &on_next_resolution_);
&on_next_resolution_);
}
// TODO(dgq): remove the following two lines once Pick First stops // TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting. // discarding subchannels after selecting.
++resolved_version_; ++resolved_version_;
@ -397,6 +404,7 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
} }
void AresDnsResolver::StartResolvingLocked() { void AresDnsResolver::StartResolvingLocked() {
gpr_log(GPR_DEBUG, "Start resolving.");
// TODO(roth): We currently deal with this ref manually. Once the // TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer // new closure API is done, find a way to track this ref with the timer
// callback as part of the type system. // callback as part of the type system.

@ -236,6 +236,15 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
} }
void NativeDnsResolver::MaybeStartResolvingLocked() { void NativeDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (last_resolution_timestamp_ >= 0) { if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution = const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_; last_resolution_timestamp_ + min_time_between_resolutions_;
@ -248,17 +257,15 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
"In cooldown from last resolution (from %" PRIdPTR "In cooldown from last resolution (from %" PRIdPTR
" ms ago). Will resolve again in %" PRIdPTR " ms", " ms ago). Will resolve again in %" PRIdPTR " ms",
last_resolution_ago, ms_until_next_resolution); last_resolution_ago, ms_until_next_resolution);
if (!have_next_resolution_timer_) { have_next_resolution_timer_ = true;
have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the
// TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer
// new closure API is done, find a way to track this ref with the timer // callback as part of the type system.
// callback as part of the type system. RefCountedPtr<Resolver> self =
RefCountedPtr<Resolver> self = Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); self.release();
self.release(); grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, &on_next_resolution_);
&on_next_resolution_);
}
// TODO(dgq): remove the following two lines once Pick First stops // TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting. // discarding subchannels after selecting.
++resolved_version_; ++resolved_version_;
@ -270,6 +277,7 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
} }
void NativeDnsResolver::StartResolvingLocked() { void NativeDnsResolver::StartResolvingLocked() {
gpr_log(GPR_DEBUG, "Start resolving.");
// TODO(roth): We currently deal with this ref manually. Once the // TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer // new closure API is done, find a way to track this ref with the timer
// callback as part of the type system. // callback as part of the type system.

@ -145,7 +145,6 @@ struct OnResolutionCallbackArg {
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver; grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
grpc_channel_args* result = nullptr; grpc_channel_args* result = nullptr;
grpc_millis delay_before_second_resolution = 0; grpc_millis delay_before_second_resolution = 0;
bool using_cares = false;
}; };
// Counter for the number of times a resolution notification callback has been // Counter for the number of times a resolution notification callback has been
@ -155,81 +154,100 @@ static int g_on_resolution_invocations_count;
// Set to true by the last callback in the resolution chain. // Set to true by the last callback in the resolution chain.
bool g_all_callbacks_invoked; bool g_all_callbacks_invoked;
void on_third_resolution(void* arg, grpc_error* error) { void on_fourth_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg); OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count; ++g_on_resolution_invocations_count;
grpc_channel_args_destroy(cb_arg->result);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d", "4th: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count); g_on_resolution_invocations_count, g_resolution_count);
// In this case we expect to have incurred in another system-level resolution // In this case we expect to have incurred in another system-level resolution
// because on_second_resolution slept for longer than the min resolution // because on_third_resolution slept for longer than the min resolution
// period. // period.
GPR_ASSERT(g_on_resolution_invocations_count == 3); GPR_ASSERT(g_on_resolution_invocations_count == 4);
GPR_ASSERT(g_resolution_count == 2); GPR_ASSERT(g_resolution_count == 3);
cb_arg->resolver.reset(); cb_arg->resolver.reset();
if (cb_arg->using_cares) { gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
gpr_atm_rel_store(&g_iomgr_args.done_atm, 1); gpr_mu_lock(g_iomgr_args.mu);
gpr_mu_lock(g_iomgr_args.mu); GRPC_LOG_IF_ERROR("pollset_kick",
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
grpc_pollset_kick(g_iomgr_args.pollset, nullptr)); gpr_mu_unlock(g_iomgr_args.mu);
gpr_mu_unlock(g_iomgr_args.mu);
}
grpc_core::Delete(cb_arg); grpc_core::Delete(cb_arg);
g_all_callbacks_invoked = true; g_all_callbacks_invoked = true;
} }
void on_second_resolution(void* arg, grpc_error* error) { void on_third_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg); OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
++g_on_resolution_invocations_count;
grpc_channel_args_destroy(cb_arg->result); grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The timer set because of the previous re-resolution request fires, so a new
// system-level resolution happened.
GPR_ASSERT(g_on_resolution_invocations_count == 3);
GPR_ASSERT(g_resolution_count == 2);
grpc_core::ExecCtx::Get()->TestOnlySetNow(
cb_arg->delay_before_second_resolution * 2);
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_fourth_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"2nd: g_on_resolution_invocations_count: %d, g_resolution_count: %d", "2nd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count); g_on_resolution_invocations_count, g_resolution_count);
// The resolution request for which this function is the callback happened // The resolution request for which this function is the callback happened
// before the min resolution period. Therefore, no new system-level // before the min resolution period. Therefore, no new system-level
// resolutions happened, as indicated by g_resolution_count. // resolutions happened, as indicated by g_resolution_count. But a resolution
// timer was set to fire when the cooldown finishes.
GPR_ASSERT(g_on_resolution_invocations_count == 2); GPR_ASSERT(g_on_resolution_invocations_count == 2);
GPR_ASSERT(g_resolution_count == 1); GPR_ASSERT(g_resolution_count == 1);
grpc_core::ExecCtx::Get()->TestOnlySetNow( // Register a new callback to capture the timer firing.
cb_arg->delay_before_second_resolution * 2);
cb_arg->resolver->NextLocked( cb_arg->resolver->NextLocked(
&cb_arg->result, &cb_arg->result,
GRPC_CLOSURE_CREATE(on_third_resolution, arg, GRPC_CLOSURE_CREATE(on_third_resolution, arg,
grpc_combiner_scheduler(g_combiner))); grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked(); gpr_mu_lock(g_iomgr_args.mu);
if (cb_arg->using_cares) { GRPC_LOG_IF_ERROR("pollset_kick",
gpr_mu_lock(g_iomgr_args.mu); grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
GRPC_LOG_IF_ERROR("pollset_kick", gpr_mu_unlock(g_iomgr_args.mu);
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
} }
void on_first_resolution(void* arg, grpc_error* error) { void on_first_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg); OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
++g_on_resolution_invocations_count;
grpc_channel_args_destroy(cb_arg->result); grpc_channel_args_destroy(cb_arg->result);
cb_arg->resolver->NextLocked( GPR_ASSERT(error == GRPC_ERROR_NONE);
&cb_arg->result, ++g_on_resolution_invocations_count;
GRPC_CLOSURE_CREATE(on_second_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"1st: g_on_resolution_invocations_count: %d, g_resolution_count: %d", "1st: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count); g_on_resolution_invocations_count, g_resolution_count);
// Theres one initial system-level resolution and one invocation of a // There's one initial system-level resolution and one invocation of a
// notification callback (the current function). // notification callback (the current function).
GPR_ASSERT(g_on_resolution_invocations_count == 1); GPR_ASSERT(g_on_resolution_invocations_count == 1);
GPR_ASSERT(g_resolution_count == 1); GPR_ASSERT(g_resolution_count == 1);
if (cb_arg->using_cares) { cb_arg->resolver->NextLocked(
gpr_mu_lock(g_iomgr_args.mu); &cb_arg->result,
GRPC_LOG_IF_ERROR("pollset_kick", GRPC_CLOSURE_CREATE(on_second_resolution, arg,
grpc_pollset_kick(g_iomgr_args.pollset, nullptr)); grpc_combiner_scheduler(g_combiner)));
gpr_mu_unlock(g_iomgr_args.mu); cb_arg->resolver->RequestReresolutionLocked();
} gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
} }
static void start_test_under_combiner(void* arg, grpc_error* error) { static void start_test_under_combiner(void* arg, grpc_error* error) {
@ -269,22 +287,19 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
grpc_uri_destroy(uri); grpc_uri_destroy(uri);
} }
static void test_cooldown(bool using_cares) { static void test_cooldown() {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
if (using_cares) iomgr_args_init(&g_iomgr_args); iomgr_args_init(&g_iomgr_args);
OnResolutionCallbackArg* res_cb_arg = OnResolutionCallbackArg* res_cb_arg =
grpc_core::New<OnResolutionCallbackArg>(); grpc_core::New<OnResolutionCallbackArg>();
res_cb_arg->uri_str = "dns:127.0.0.1"; res_cb_arg->uri_str = "dns:127.0.0.1";
res_cb_arg->using_cares = using_cares;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg, GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg,
grpc_combiner_scheduler(g_combiner)), grpc_combiner_scheduler(g_combiner)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
if (using_cares) { grpc_core::ExecCtx::Get()->Flush();
grpc_core::ExecCtx::Get()->Flush(); poll_pollset_until_request_done(&g_iomgr_args);
poll_pollset_until_request_done(&g_iomgr_args); iomgr_args_finish(&g_iomgr_args);
iomgr_args_finish(&g_iomgr_args);
}
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
@ -293,16 +308,12 @@ int main(int argc, char** argv) {
g_combiner = grpc_combiner_create(); g_combiner = grpc_combiner_create();
bool using_cares = false;
#if GRPC_ARES == 1
using_cares = true;
#endif
g_default_dns_lookup_ares = grpc_dns_lookup_ares; g_default_dns_lookup_ares = grpc_dns_lookup_ares;
grpc_dns_lookup_ares = test_dns_lookup_ares; grpc_dns_lookup_ares = test_dns_lookup_ares;
default_resolve_address = grpc_resolve_address_impl; default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);
test_cooldown(using_cares); test_cooldown();
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;

Loading…
Cancel
Save