Explicitly delete fd from pollset set after c-ares is done

pull/15828/head
Alexander Polcyn 7 years ago
parent 9fcfbb07bd
commit 0220a998db
  1. 10
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  2. 11
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  3. 5
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  4. 43
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  5. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  6. 2
      src/core/lib/iomgr/iomgr.cc
  7. 5
      src/core/lib/iomgr/iomgr.h
  8. 46
      test/cpp/naming/cancel_ares_query_test.cc

@ -74,6 +74,8 @@ struct grpc_ares_ev_driver {
bool working; bool working;
/** is this event driver being shut down */ /** is this event driver being shut down */
bool shutting_down; bool shutting_down;
/** request object that's using this ev driver */
grpc_ares_request* request;
}; };
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
@ -92,6 +94,7 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
GPR_ASSERT(ev_driver->fds == nullptr); GPR_ASSERT(ev_driver->fds == nullptr);
GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel); ares_destroy(ev_driver->channel);
grpc_ares_complete_request_locked(ev_driver->request);
gpr_free(ev_driver); gpr_free(ev_driver);
} }
} }
@ -115,7 +118,8 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set, grpc_pollset_set* pollset_set,
grpc_combiner* combiner) { grpc_combiner* combiner,
grpc_ares_request* request) {
*ev_driver = static_cast<grpc_ares_ev_driver*>( *ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver))); gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts; ares_options opts;
@ -139,10 +143,12 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
(*ev_driver)->fds = nullptr; (*ev_driver)->fds = nullptr;
(*ev_driver)->working = false; (*ev_driver)->working = false;
(*ev_driver)->shutting_down = false; (*ev_driver)->shutting_down = false;
(*ev_driver)->request = request;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_on_queries_complete_locked(
grpc_ares_ev_driver* ev_driver) {
// We mark the event driver as being shut down. If the event driver // We mark the event driver as being shut down. If the event driver
// is working, grpc_ares_notify_on_event_locked will shut down the // is working, grpc_ares_notify_on_event_locked will shut down the
// fds; if it's not working, there are no fds to shut down. // fds; if it's not working, there are no fds to shut down.

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <ares.h> #include <ares.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
@ -42,12 +43,12 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked(
created successfully. */ created successfully. */
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set, grpc_pollset_set* pollset_set,
grpc_combiner* combiner); grpc_combiner* combiner,
grpc_ares_request* request);
/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver /* Called back when all DNS lookups have completed. */
will be cancelled and their on_done callbacks will be invoked with a status void grpc_ares_ev_driver_on_queries_complete_locked(
of ARES_ECANCELLED. */ grpc_ares_ev_driver* ev_driver);
void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver);
/* Shutdown all the grpc_fds used by \a ev_driver */ /* Shutdown all the grpc_fds used by \a ev_driver */
void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver);

@ -44,11 +44,13 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
: as_(as) { : as_(as) {
gpr_asprintf(&name_, "c-ares fd: %d", (int)as); gpr_asprintf(&name_, "c-ares fd: %d", (int)as);
fd_ = grpc_fd_create((int)as, name_, false); fd_ = grpc_fd_create((int)as, name_, false);
grpc_pollset_set_add_fd(driver_pollset_set, fd_); driver_pollset_set_ = driver_pollset_set;
grpc_pollset_set_add_fd(driver_pollset_set_, fd_);
} }
~GrpcPolledFdPosix() { ~GrpcPolledFdPosix() {
gpr_free(name_); gpr_free(name_);
grpc_pollset_set_del_fd(driver_pollset_set_, fd_);
/* c-ares library will close the fd inside grpc_fd. This fd may be picked up /* c-ares library will close the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */ grpc_fd_orphan. */
@ -81,6 +83,7 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
char* name_; char* name_;
ares_socket_t as_; ares_socket_t as_;
grpc_fd* fd_; grpc_fd* fd_;
grpc_pollset_set* driver_pollset_set_;
}; };
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,

@ -63,7 +63,7 @@ struct grpc_ares_request {
/** the evernt driver used by this request */ /** the evernt driver used by this request */
grpc_ares_ev_driver* ev_driver; grpc_ares_ev_driver* ev_driver;
/** number of ongoing queries */ /** number of ongoing queries */
gpr_refcount pending_queries; size_t pending_queries;
/** is there at least one successful query, set in on_done_cb */ /** is there at least one successful query, set in on_done_cb */
bool success; bool success;
@ -145,21 +145,25 @@ void grpc_cares_wrapper_test_only_address_sorting_sort(
} }
static void grpc_ares_request_ref_locked(grpc_ares_request* r) { static void grpc_ares_request_ref_locked(grpc_ares_request* r) {
gpr_ref(&r->pending_queries); r->pending_queries++;
} }
static void grpc_ares_request_unref_locked(grpc_ares_request* r) { static void grpc_ares_request_unref_locked(grpc_ares_request* r) {
/* If there are no pending queries, invoke on_done callback and destroy the r->pending_queries--;
if (r->pending_queries == 0u) {
grpc_ares_ev_driver_on_queries_complete_locked(r->ev_driver);
}
}
void grpc_ares_complete_request_locked(grpc_ares_request* r) {
/* Invoke on_done callback and destroy the
request */ request */
if (gpr_unref(&r->pending_queries)) { grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out);
grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out); if (lb_addrs != nullptr) {
if (lb_addrs != nullptr) { grpc_cares_wrapper_address_sorting_sort(lb_addrs);
grpc_cares_wrapper_address_sorting_sort(lb_addrs);
}
GRPC_CLOSURE_SCHED(r->on_done, r->error);
grpc_ares_ev_driver_destroy_locked(r->ev_driver);
gpr_free(r);
} }
GRPC_CLOSURE_SCHED(r->on_done, r->error);
gpr_free(r);
} }
static grpc_ares_hostbyname_request* create_hostbyname_request_locked( static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
@ -399,20 +403,18 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
} }
port = gpr_strdup(default_port); port = gpr_strdup(default_port);
} }
grpc_ares_ev_driver* ev_driver;
error = grpc_ares_ev_driver_create_locked(&ev_driver, interested_parties,
combiner);
if (error != GRPC_ERROR_NONE) goto error_cleanup;
r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
r->ev_driver = ev_driver; r->ev_driver = nullptr;
r->on_done = on_done; r->on_done = on_done;
r->lb_addrs_out = addrs; r->lb_addrs_out = addrs;
r->service_config_json_out = service_config_json; r->service_config_json_out = service_config_json;
r->success = false; r->success = false;
r->error = GRPC_ERROR_NONE; r->error = GRPC_ERROR_NONE;
r->pending_queries = 0;
error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties,
combiner, r);
if (error != GRPC_ERROR_NONE) goto error_cleanup;
channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver); channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
// If dns_server is specified, use it. // If dns_server is specified, use it.
if (dns_server != nullptr) { if (dns_server != nullptr) {
gpr_log(GPR_INFO, "Using DNS server %s", dns_server); gpr_log(GPR_INFO, "Using DNS server %s", dns_server);
@ -437,7 +439,6 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
error = grpc_error_set_str( error = grpc_error_set_str(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"),
GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name));
gpr_free(r);
goto error_cleanup; goto error_cleanup;
} }
int status = ares_set_servers_ports(*channel, &r->dns_server_addr); int status = ares_set_servers_ports(*channel, &r->dns_server_addr);
@ -447,11 +448,10 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
ares_strerror(status)); ares_strerror(status));
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg); gpr_free(error_msg);
gpr_free(r);
goto error_cleanup; goto error_cleanup;
} }
} }
gpr_ref_init(&r->pending_queries, 1); r->pending_queries = 1;
if (grpc_ipv6_loopback_available()) { if (grpc_ipv6_loopback_available()) {
hr = create_hostbyname_request_locked(r, host, strhtons(port), hr = create_hostbyname_request_locked(r, host, strhtons(port),
false /* is_balancer */); false /* is_balancer */);
@ -487,6 +487,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
error_cleanup: error_cleanup:
GRPC_CLOSURE_SCHED(on_done, error); GRPC_CLOSURE_SCHED(on_done, error);
gpr_free(r);
gpr_free(host); gpr_free(host);
gpr_free(port); gpr_free(port);
return nullptr; return nullptr;

@ -66,6 +66,10 @@ grpc_error* grpc_ares_init(void);
it has been called the same number of times as grpc_ares_init(). */ it has been called the same number of times as grpc_ares_init(). */
void grpc_ares_cleanup(void); void grpc_ares_cleanup(void);
/** Schedules the desired callback for request completion
* and destroys the grpc_ares_request */
void grpc_ares_complete_request_locked(grpc_ares_request* request);
/* Exposed only for testing */ /* Exposed only for testing */
void grpc_cares_wrapper_test_only_address_sorting_sort( void grpc_cares_wrapper_test_only_address_sorting_sort(
grpc_lb_addresses* lb_addrs); grpc_lb_addresses* lb_addrs);

@ -70,6 +70,8 @@ static size_t count_objects(void) {
return n; return n;
} }
size_t grpc_iomgr_count_objects_for_testing(void) { return count_objects(); }
static void dump_objects(const char* kind) { static void dump_objects(const char* kind) {
grpc_iomgr_object* obj; grpc_iomgr_object* obj;
for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {

@ -23,6 +23,8 @@
#include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/port.h"
#include <stdlib.h>
/** Initializes the iomgr. */ /** Initializes the iomgr. */
void grpc_iomgr_init(); void grpc_iomgr_init();
@ -33,4 +35,7 @@ void grpc_iomgr_start();
* exec_ctx. */ * exec_ctx. */
void grpc_iomgr_shutdown(); void grpc_iomgr_shutdown();
/* Exposed only for testing */
size_t grpc_iomgr_count_objects_for_testing();
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */ #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */

@ -160,10 +160,7 @@ void CheckResolverResultAssertFailureLocked(void* arg, grpc_error* error) {
gpr_mu_unlock(args->mu); gpr_mu_unlock(args->mu);
} }
TEST(CancelDuringAresQuery, TestCancelActiveDNSQuery) { void TestCancelActiveDNSQuery(ArgsStruct* args) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
int fake_dns_port = grpc_pick_unused_port_or_die(); int fake_dns_port = grpc_pick_unused_port_or_die();
FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port); FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
char* client_target; char* client_target;
@ -173,20 +170,47 @@ TEST(CancelDuringAresQuery, TestCancelActiveDNSQuery) {
fake_dns_port)); fake_dns_port));
// create resolver and resolve // create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver = grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::ResolverRegistry::CreateResolver(client_target, nullptr, grpc_core::ResolverRegistry::CreateResolver(
args.pollset_set, args.lock); client_target, nullptr, args->pollset_set, args->lock);
gpr_free(client_target); gpr_free(client_target);
grpc_closure on_resolver_result_changed; grpc_closure on_resolver_result_changed;
GRPC_CLOSURE_INIT(&on_resolver_result_changed, GRPC_CLOSURE_INIT(&on_resolver_result_changed,
CheckResolverResultAssertFailureLocked, (void*)&args, CheckResolverResultAssertFailureLocked, (void*)args,
grpc_combiner_scheduler(args.lock)); grpc_combiner_scheduler(args->lock));
resolver->NextLocked(&args.channel_args, &on_resolver_result_changed); resolver->NextLocked(&args->channel_args, &on_resolver_result_changed);
// Without resetting and causing resolver shutdown, the // Without resetting and causing resolver shutdown, the
// PollPollsetUntilRequestDone call should never finish. // PollPollsetUntilRequestDone call should never finish.
resolver.reset(); resolver.reset();
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args); PollPollsetUntilRequestDone(args);
ArgsFinish(&args); ArgsFinish(args);
}
TEST(CancelDuringAresQuery, TestCancelActiveDNSQuery) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
TestCancelActiveDNSQuery(&args);
}
TEST(CancelDuringAresQuery, TestFdsAreDeletedFromPollsetSet) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
// Add fake_other_pollset_set into the mix to test
// that we're explicitly deleting fd's from their pollset.
// If we aren't doing so, then the remaining presence of
// "fake_other_pollset_set" after the request is done and the resolver
// pollset set is destroyed should keep the resolver's fd alive and
// fail the test.
grpc_pollset_set* fake_other_pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset_set(fake_other_pollset_set, args.pollset_set);
// Note that running the cancellation c-ares test is somewhat irrelevant for
// this test. This test only cares about what happens to fd's that c-ares
// opens.
TestCancelActiveDNSQuery(&args);
EXPECT_EQ(grpc_iomgr_count_objects_for_testing(), 0u);
grpc_pollset_set_destroy(fake_other_pollset_set);
} }
TEST(CancelDuringAresQuery, TEST(CancelDuringAresQuery,

Loading…
Cancel
Save