pull/15225/head
Ben Sykes 7 years ago
commit 2bfa59fb95
  1. 2
      src/core/ext/filters/client_channel/client_channel.cc
  2. 30
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  3. 30
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  4. 11
      src/core/lib/debug/trace.h
  5. 6
      src/core/lib/iomgr/closure.h
  6. 2
      src/core/lib/iomgr/tcp_posix.cc
  7. 4
      src/core/lib/surface/call.cc
  8. 115
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  9. 23
      test/cpp/naming/address_sorting_test.cc
  10. 18
      third_party/address_sorting/address_sorting.c

@ -3252,6 +3252,8 @@ static void watch_connectivity_state_locked(void* arg,
external_connectivity_watcher* found = nullptr;
if (w->state != nullptr) {
external_connectivity_watcher_list_append(w->chand, w);
// An assumption is being made that the closure is scheduled on the exec ctx
// scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
grpc_combiner_scheduler(w->chand->combiner));

@ -363,6 +363,15 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
}
void AresDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -375,17 +384,15 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
"In cooldown from last resolution (from %" PRIdPTR
" ms ago). Will resolve again in %" PRIdPTR " ms",
last_resolution_ago, ms_until_next_resolution);
if (!have_next_resolution_timer_) {
have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
}
have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
@ -397,6 +404,7 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
}
void AresDnsResolver::StartResolvingLocked() {
gpr_log(GPR_DEBUG, "Start resolving.");
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.

@ -236,6 +236,15 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
}
void NativeDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -248,17 +257,15 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
"In cooldown from last resolution (from %" PRIdPTR
" ms ago). Will resolve again in %" PRIdPTR " ms",
last_resolution_ago, ms_until_next_resolution);
if (!have_next_resolution_timer_) {
have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
}
have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
@ -270,6 +277,7 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
}
void NativeDnsResolver::StartResolvingLocked() {
gpr_log(GPR_DEBUG, "Start resolving.");
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.

@ -57,13 +57,22 @@ class TraceFlag {
const char* name() const { return name_; }
// This following define may be commented out to ensure that the compiler
// deletes any "if (tracer.enabled()) {...}" codeblocks. This is useful to
// test the performance impact tracers have on the system.
//
// #define COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD
#ifdef COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD
bool enabled() { return false; }
#else
bool enabled() {
#ifdef GRPC_THREADSAFE_TRACER
return gpr_atm_no_barrier_load(&value_) != 0;
#else
return value_;
#endif
#endif // GRPC_THREADSAFE_TRACER
}
#endif // COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD
private:
friend void grpc_core::testing::grpc_tracer_enable_flag(TraceFlag* flag);

@ -253,8 +253,8 @@ inline void grpc_closure_run(grpc_closure* c, grpc_error* error) {
c->file_initiated = file;
c->line_initiated = line;
c->run = true;
GPR_ASSERT(c->cb != nullptr);
#endif
assert(c->cb);
c->scheduler->vtable->run(c, error);
} else {
GRPC_ERROR_UNREF(error);
@ -292,8 +292,8 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
assert(c->cb);
c->scheduler->vtable->sched(c, error);
} else {
GRPC_ERROR_UNREF(error);
@ -330,8 +330,8 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) {
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
assert(c->cb);
c->scheduler->vtable->sched(c, c->error_data.error);
c = next;
}

@ -371,7 +371,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
#define MAX_READ_IOVEC 4
static void tcp_do_read(grpc_tcp* tcp) {
GPR_TIMER_SCOPE("tcp_continue_read", 0);
GPR_TIMER_SCOPE("tcp_do_read", 0);
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;

@ -610,7 +610,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
// This is called via the call combiner to start sending a batch down
// the filter stack.
static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
GPR_TIMER_SCOPE("execute_batch", 0);
GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
@ -1539,7 +1539,7 @@ static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* notify_tag,
int is_notify_tag_closure) {
GPR_TIMER_SCOPE("grpc_call_start_batch", 0);
GPR_TIMER_SCOPE("call_start_batch", 0);
size_t i;
const grpc_op* op;

@ -145,7 +145,6 @@ struct OnResolutionCallbackArg {
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
grpc_channel_args* result = nullptr;
grpc_millis delay_before_second_resolution = 0;
bool using_cares = false;
};
// Counter for the number of times a resolution notification callback has been
@ -155,81 +154,100 @@ static int g_on_resolution_invocations_count;
// Set to true by the last callback in the resolution chain.
bool g_all_callbacks_invoked;
void on_third_resolution(void* arg, grpc_error* error) {
void on_fourth_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
grpc_channel_args_destroy(cb_arg->result);
gpr_log(GPR_INFO,
"3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
"4th: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// In this case we expect to have incurred in another system-level resolution
// because on_second_resolution slept for longer than the min resolution
// because on_third_resolution slept for longer than the min resolution
// period.
GPR_ASSERT(g_on_resolution_invocations_count == 3);
GPR_ASSERT(g_resolution_count == 2);
GPR_ASSERT(g_on_resolution_invocations_count == 4);
GPR_ASSERT(g_resolution_count == 3);
cb_arg->resolver.reset();
if (cb_arg->using_cares) {
gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
grpc_core::Delete(cb_arg);
g_all_callbacks_invoked = true;
}
void on_second_resolution(void* arg, grpc_error* error) {
void on_third_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
++g_on_resolution_invocations_count;
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The timer set because of the previous re-resolution request fires, so a new
// system-level resolution happened.
GPR_ASSERT(g_on_resolution_invocations_count == 3);
GPR_ASSERT(g_resolution_count == 2);
grpc_core::ExecCtx::Get()->TestOnlySetNow(
cb_arg->delay_before_second_resolution * 2);
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_fourth_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"2nd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The resolution request for which this function is the callback happened
// before the min resolution period. Therefore, no new system-level
// resolutions happened, as indicated by g_resolution_count.
// resolutions happened, as indicated by g_resolution_count. But a resolution
// timer was set to fire when the cooldown finishes.
GPR_ASSERT(g_on_resolution_invocations_count == 2);
GPR_ASSERT(g_resolution_count == 1);
grpc_core::ExecCtx::Get()->TestOnlySetNow(
cb_arg->delay_before_second_resolution * 2);
// Register a new callback to capture the timer firing.
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_third_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
if (cb_arg->using_cares) {
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_first_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
++g_on_resolution_invocations_count;
grpc_channel_args_destroy(cb_arg->result);
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_second_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"1st: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// Theres one initial system-level resolution and one invocation of a
// There's one initial system-level resolution and one invocation of a
// notification callback (the current function).
GPR_ASSERT(g_on_resolution_invocations_count == 1);
GPR_ASSERT(g_resolution_count == 1);
if (cb_arg->using_cares) {
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_second_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
static void start_test_under_combiner(void* arg, grpc_error* error) {
@ -269,22 +287,19 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
grpc_uri_destroy(uri);
}
static void test_cooldown(bool using_cares) {
static void test_cooldown() {
grpc_core::ExecCtx exec_ctx;
if (using_cares) iomgr_args_init(&g_iomgr_args);
iomgr_args_init(&g_iomgr_args);
OnResolutionCallbackArg* res_cb_arg =
grpc_core::New<OnResolutionCallbackArg>();
res_cb_arg->uri_str = "dns:127.0.0.1";
res_cb_arg->using_cares = using_cares;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg,
grpc_combiner_scheduler(g_combiner)),
GRPC_ERROR_NONE);
if (using_cares) {
grpc_core::ExecCtx::Get()->Flush();
poll_pollset_until_request_done(&g_iomgr_args);
iomgr_args_finish(&g_iomgr_args);
}
grpc_core::ExecCtx::Get()->Flush();
poll_pollset_until_request_done(&g_iomgr_args);
iomgr_args_finish(&g_iomgr_args);
}
int main(int argc, char** argv) {
@ -293,16 +308,12 @@ int main(int argc, char** argv) {
g_combiner = grpc_combiner_create();
bool using_cares = false;
#if GRPC_ARES == 1
using_cares = true;
#endif
g_default_dns_lookup_ares = grpc_dns_lookup_ares;
grpc_dns_lookup_ares = test_dns_lookup_ares;
default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver);
test_cooldown(using_cares);
test_cooldown();
{
grpc_core::ExecCtx exec_ctx;

@ -298,6 +298,29 @@ TEST(AddressSortingTest, TestUsesLabelFromDefaultTable) {
});
}
/* Flip the input on the test above to reorder the sort function's
* comparator's inputs. */
TEST(AddressSortingTest, TestUsesLabelFromDefaultTableInputFlipped) {
bool ipv4_supported = true;
bool ipv6_supported = true;
OverrideAddressSortingSourceAddrFactory(
ipv4_supported, ipv6_supported,
{
{"[2002::5001]:443", {"[2001::5002]:0", AF_INET6}},
{"[2001::5001]:443",
{"[2001::5002]:0", AF_INET6}}, // matching labels
});
grpc_lb_addresses* lb_addrs = BuildLbAddrInputs({
{"[2001::5001]:443", AF_INET6},
{"[2002::5001]:443", AF_INET6},
});
grpc_cares_wrapper_test_only_address_sorting_sort(lb_addrs);
VerifyLbAddrOutputs(lb_addrs, {
"[2001::5001]:443",
"[2002::5001]:443",
});
}
/* Tests for rule 6 */
TEST(AddressSortingTest,

@ -225,15 +225,15 @@ static int compare_source_addr_exists(const address_sorting_sortable* first,
static int compare_source_dest_scope_matches(
const address_sorting_sortable* first,
const address_sorting_sortable* second) {
int first_src_dst_scope_matches = 0;
bool first_src_dst_scope_matches = false;
if (sockaddr_get_scope(&first->dest_addr) ==
sockaddr_get_scope(&first->source_addr)) {
first_src_dst_scope_matches = 1;
first_src_dst_scope_matches = true;
}
int second_src_dst_scope_matches = 0;
bool second_src_dst_scope_matches = false;
if (sockaddr_get_scope(&second->dest_addr) ==
sockaddr_get_scope(&second->source_addr)) {
second_src_dst_scope_matches = 1;
second_src_dst_scope_matches = true;
}
if (first_src_dst_scope_matches != second_src_dst_scope_matches) {
return first_src_dst_scope_matches ? -1 : 1;
@ -244,18 +244,18 @@ static int compare_source_dest_scope_matches(
static int compare_source_dest_labels_match(
const address_sorting_sortable* first,
const address_sorting_sortable* second) {
int first_label_matches = 0;
bool first_label_matches = false;
if (get_label_value(&first->dest_addr) ==
get_label_value(&first->source_addr)) {
first_label_matches = 1;
first_label_matches = true;
}
int second_label_matches = 0;
bool second_label_matches = false;
if (get_label_value(&second->dest_addr) ==
get_label_value(&second->source_addr)) {
second_label_matches = 1;
second_label_matches = true;
}
if (first_label_matches != second_label_matches) {
return first_label_matches ? 1 : 1;
return first_label_matches ? -1 : 1;
}
return 0;
}

Loading…
Cancel
Save