Reland DNSResolver cancellation (#29581 and #29631) (#29670)

* Reland DNSResolver cancellation (#29581 and #29631)

* add failing test: pollset_set deletion after DNSResolver cancellation

* Allow interested_parties to make like a pumpkin after cancellation

* cleanup

* iwyu

* Automated change: Fix sanity tests

* get the pollset_set join correct

* reviewer feedback

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/29705/head
AJ Heller 3 years ago committed by GitHub
parent baf7c108ff
commit c11f66faef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 133
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  3. 9
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  4. 26
      src/core/lib/http/httpcli.cc
  5. 3
      src/core/lib/http/httpcli.h
  6. 2
      src/core/lib/iomgr/event_engine/resolver.h
  7. 8
      src/core/lib/iomgr/resolve_address.cc
  8. 35
      src/core/lib/iomgr/resolve_address.h
  9. 5
      src/core/lib/iomgr/resolve_address_impl.h
  10. 22
      src/core/lib/iomgr/resolve_address_posix.cc
  11. 7
      src/core/lib/iomgr/resolve_address_posix.h
  12. 21
      src/core/lib/iomgr/resolve_address_windows.cc
  13. 7
      src/core/lib/iomgr/resolve_address_windows.h
  14. 5
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  15. 21
      test/core/end2end/fuzzers/api_fuzzer.cc
  16. 66
      test/core/end2end/goaway_server_test.cc
  17. 3
      test/core/iomgr/resolve_address_posix_test.cc
  18. 137
      test/core/iomgr/resolve_address_test.cc

@ -3991,12 +3991,13 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/memory",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/container:inlined_vector",
"address_sorting",
"cares",
],
@ -4005,6 +4006,7 @@ grpc_cc_library(
"config",
"debug_location",
"error",
"event_engine_common",
"gpr_base",
"grpc_base",
"grpc_client_channel",

@ -16,6 +16,7 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <stdlib.h>
#include <algorithm>
@ -34,6 +35,7 @@
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -50,6 +52,7 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resolver/resolver_factory.h"
@ -64,6 +67,7 @@
#include <address_sorting/address_sorting.h>
#include "absl/container/flat_hash_set.h"
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_cat.h"
@ -73,6 +77,7 @@
#include "src/core/ext/filters/client_channel/resolver/polling_resolver.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/gethostname.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -358,74 +363,86 @@ class AresClientChannelDNSResolverFactory : public ResolverFactory {
class AresDNSResolver : public DNSResolver {
public:
class AresRequest : public DNSResolver::Request {
class AresRequest {
public:
AresRequest(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_resolve_address_done)
on_resolve_address_done,
AresDNSResolver* resolver, intptr_t aba_token)
: name_(std::string(name)),
default_port_(std::string(default_port)),
interested_parties_(interested_parties),
on_resolve_address_done_(std::move(on_resolve_address_done)) {
pollset_set_(grpc_pollset_set_create()),
on_resolve_address_done_(std::move(on_resolve_address_done)),
completed_(false),
resolver_(resolver),
aba_token_(aba_token) {
GRPC_CARES_TRACE_LOG("AresRequest:%p ctor", this);
GRPC_CLOSURE_INIT(&on_dns_lookup_done_, OnDnsLookupDone, this,
grpc_schedule_on_exec_ctx);
MutexLock lock(&mu_);
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
ares_request_ = std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_ares(
/*dns_server=*/"", name_.c_str(), default_port_.c_str(), pollset_set_,
&on_dns_lookup_done_, &addresses_,
/*balancer_addresses=*/nullptr, /*service_config_json=*/nullptr,
GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS));
GRPC_CARES_TRACE_LOG("AresRequest:%p Start ares_request_:%p", this,
ares_request_.get());
}
~AresRequest() override {
~AresRequest() {
GRPC_CARES_TRACE_LOG("AresRequest:%p dtor ares_request_:%p", this,
ares_request_.get());
resolver_->UnregisterRequest(task_handle());
grpc_pollset_set_destroy(pollset_set_);
}
void Start() override {
bool Cancel() {
MutexLock lock(&mu_);
Ref().release(); // ref held by resolution
ares_request_ = std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_ares(
"" /* dns_server */, name_.c_str(), default_port_.c_str(),
interested_parties_, &on_dns_lookup_done_, &addresses_,
nullptr /* balancer_addresses */, nullptr /* service_config_json */,
GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS));
GRPC_CARES_TRACE_LOG("AresRequest:%p Start ares_request_:%p", this,
GRPC_CARES_TRACE_LOG("AresRequest:%p Cancel ares_request_:%p", this,
ares_request_.get());
if (completed_) return false;
// OnDnsLookupDone will still be run
grpc_cancel_ares_request(ares_request_.get());
completed_ = true;
grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties_);
return true;
}
void Orphan() override {
{
MutexLock lock(&mu_);
GRPC_CARES_TRACE_LOG("AresRequest:%p Orphan ares_request_:%p", this,
ares_request_.get());
if (ares_request_ != nullptr) {
grpc_cancel_ares_request(ares_request_.get());
}
}
Unref();
TaskHandle task_handle() {
return {reinterpret_cast<intptr_t>(this), aba_token_};
}
private:
// Called by ares when lookup has completed or when cancelled. It is always
// called exactly once.
static void OnDnsLookupDone(void* arg, grpc_error_handle error) {
AresRequest* r = static_cast<AresRequest*>(arg);
AresRequest* request = static_cast<AresRequest*>(arg);
GRPC_CARES_TRACE_LOG("AresRequest:%p OnDnsLookupDone", request);
// This request is deleted and unregistered upon any exit.
std::unique_ptr<AresRequest> deleter(request);
std::vector<grpc_resolved_address> resolved_addresses;
{
MutexLock lock(&r->mu_);
GRPC_CARES_TRACE_LOG("AresRequest:%p OnDnsLookupDone error:%s", r,
grpc_error_std_string(error).c_str());
if (r->addresses_ != nullptr) {
resolved_addresses.reserve(r->addresses_->size());
for (const auto& server_address : *r->addresses_) {
MutexLock lock(&request->mu_);
if (request->completed_) return;
request->completed_ = true;
if (request->addresses_ != nullptr) {
resolved_addresses.reserve(request->addresses_->size());
for (const auto& server_address : *request->addresses_) {
resolved_addresses.push_back(server_address.address());
}
}
}
if (error == GRPC_ERROR_NONE) {
// it's safe to run this inline since we've already been scheduled
// on the ExecCtx
r->on_resolve_address_done_(std::move(resolved_addresses));
} else {
r->on_resolve_address_done_(grpc_error_to_absl_status(error));
grpc_pollset_set_del_pollset_set(request->pollset_set_,
request->interested_parties_);
if (error != GRPC_ERROR_NONE) {
request->on_resolve_address_done_(grpc_error_to_absl_status(error));
return;
}
r->Unref();
request->on_resolve_address_done_(std::move(resolved_addresses));
}
// mutex to synchronize access to this object (but not to the ares_request
@ -437,6 +454,9 @@ class AresDNSResolver : public DNSResolver {
const std::string default_port_;
// parties interested in our I/O
grpc_pollset_set* const interested_parties_;
// locally owned pollset_set, required to support cancellation of requests
// while ares still needs a valid pollset_set.
grpc_pollset_set* pollset_set_;
// user-provided completion callback
const std::function<void(
absl::StatusOr<std::vector<grpc_resolved_address>>)>
@ -449,6 +469,12 @@ class AresDNSResolver : public DNSResolver {
grpc_closure on_dns_lookup_done_ ABSL_GUARDED_BY(mu_);
// underlying ares_request that the query is performed on
std::unique_ptr<grpc_ares_request> ares_request_ ABSL_GUARDED_BY(mu_);
bool completed_ ABSL_GUARDED_BY(mu_);
// Parent resolver that created this request
AresDNSResolver* resolver_;
// Unique token to help distinguish this request from others that may later
// be created in the same memory location.
intptr_t aba_token_;
};
// gets the singleton instance, possibly creating it first
@ -457,13 +483,17 @@ class AresDNSResolver : public DNSResolver {
return instance;
}
OrphanablePtr<DNSResolver::Request> ResolveName(
TaskHandle ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) override {
return MakeOrphanable<AresRequest>(name, default_port, interested_parties,
std::move(on_done));
MutexLock lock(&mu_);
auto* request = new AresRequest(name, default_port, interested_parties,
std::move(on_done), this, aba_token_++);
auto handle = request->task_handle();
open_requests_.insert(handle);
return handle;
}
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
@ -473,9 +503,34 @@ class AresDNSResolver : public DNSResolver {
return default_resolver_->ResolveNameBlocking(name, default_port);
}
bool Cancel(TaskHandle handle) override {
MutexLock lock(&mu_);
if (!open_requests_.contains(handle)) {
// Unknown request, possibly completed already, or an invalid handle.
GRPC_CARES_TRACE_LOG(
"AresDNSResolver:%p attempt to cancel unknown TaskHandle:%s", this,
HandleToString(handle).c_str());
return false;
}
auto* request = reinterpret_cast<AresRequest*>(handle.keys[0]);
GRPC_CARES_TRACE_LOG("AresDNSResolver:%p cancel ares_request:%p", this,
request);
return request->Cancel();
}
private:
// Called exclusively from the AresRequest destructor.
void UnregisterRequest(TaskHandle handle) {
MutexLock lock(&mu_);
open_requests_.erase(handle);
}
// the previous default DNS resolver, used to delegate blocking DNS calls to
DNSResolver* default_resolver_ = GetDNSResolver();
Mutex mu_;
grpc_event_engine::experimental::LookupTaskHandleSet open_requests_
ABSL_GUARDED_BY(mu_);
intptr_t aba_token_ ABSL_GUARDED_BY(mu_) = 0;
};
bool ShouldUseAres(const char* resolver_env) {

@ -108,16 +108,15 @@ NativeClientChannelDNSResolver::~NativeClientChannelDNSResolver() {
OrphanablePtr<Orphanable> NativeClientChannelDNSResolver::StartRequest() {
Ref(DEBUG_LOCATION, "dns_request").release();
auto dns_request = GetDNSResolver()->ResolveName(
auto dns_request_handle = GetDNSResolver()->ResolveName(
name_to_resolve(), kDefaultSecurePort, interested_parties(),
absl::bind_front(&NativeClientChannelDNSResolver::OnResolved, this));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_dns_resolver)) {
gpr_log(GPR_DEBUG, "[dns_resolver=%p] starting request=%p", this,
dns_request.get());
DNSResolver::HandleToString(dns_request_handle).c_str());
}
dns_request->Start();
// Explicit type conversion to work around issue with older compilers.
return OrphanablePtr<Orphanable>(dns_request.release());
// Not cancellable.
return nullptr;
}
void NativeClientChannelDNSResolver::OnResolved(

@ -180,10 +180,6 @@ HttpRequest::HttpRequest(
grpc_schedule_on_exec_ctx);
GPR_ASSERT(pollent);
grpc_polling_entity_add_to_pollset_set(pollent, pollset_set_);
// Create the DNS resolver. We'll start resolving when Start is called.
dns_request_ = GetDNSResolver()->ResolveName(
uri_.authority(), uri_.scheme(), pollset_set_,
absl::bind_front(&HttpRequest::OnResolved, this));
}
HttpRequest::~HttpRequest() {
@ -207,7 +203,9 @@ void HttpRequest::Start() {
return;
}
Ref().release(); // ref held by pending DNS resolution
dns_request_->Start();
dns_request_handle_ = GetDNSResolver()->ResolveName(
uri_.authority(), uri_.scheme(), pollset_set_,
absl::bind_front(&HttpRequest::OnResolved, this));
}
void HttpRequest::Orphan() {
@ -215,7 +213,13 @@ void HttpRequest::Orphan() {
MutexLock lock(&mu_);
GPR_ASSERT(!cancelled_);
cancelled_ = true;
dns_request_.reset(); // cancel potentially pending DNS resolution
// cancel potentially pending DNS resolution.
if (dns_request_handle_.has_value() &&
GetDNSResolver()->Cancel(dns_request_handle_.value())) {
Finish(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"cancelled during DNS resolution"));
Unref();
}
if (handshake_mgr_ != nullptr) {
// Shutdown will cancel any ongoing tcp connect.
handshake_mgr_->Shutdown(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -381,16 +385,16 @@ void HttpRequest::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_);
dns_request_.reset();
if (!addresses_or.ok()) {
Finish(absl_status_to_grpc_error(addresses_or.status()));
return;
}
dns_request_handle_.reset();
if (cancelled_) {
Finish(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"cancelled during DNS resolution"));
return;
}
if (!addresses_or.ok()) {
Finish(absl_status_to_grpc_error(addresses_or.status()));
return;
}
addresses_ = std::move(*addresses_or);
next_address_ = 0;
NextAddress(GRPC_ERROR_NONE);

@ -246,7 +246,8 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_);
grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = GRPC_ERROR_NONE;
OrphanablePtr<DNSResolver::Request> dns_request_ ABSL_GUARDED_BY(mu_);
absl::optional<DNSResolver::TaskHandle> dns_request_handle_
ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle;
};
} // namespace grpc_core

@ -35,6 +35,7 @@
namespace grpc_core {
namespace experimental {
#ifdef GRPC_USE_EVENT_ENGINE
class EventEngineDNSResolver : public DNSResolver {
public:
// Gets the singleton instance, creating it first if it doesn't exist
@ -49,6 +50,7 @@ class EventEngineDNSResolver : public DNSResolver {
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
absl::string_view name, absl::string_view default_port) override;
};
#endif // GRPC_USE_EVENT_ENGINE
} // namespace experimental
} // namespace grpc_core

@ -19,6 +19,8 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/alloc.h>
@ -29,8 +31,14 @@ namespace {
DNSResolver* g_dns_resolver;
}
constexpr DNSResolver::TaskHandle DNSResolver::kNullHandle;
void SetDNSResolver(DNSResolver* resolver) { g_dns_resolver = resolver; }
DNSResolver* GetDNSResolver() { return g_dns_resolver; }
std::string DNSResolver::HandleToString(TaskHandle handle) {
return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}");
}
} // namespace grpc_core

@ -25,6 +25,8 @@
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/port.h"
@ -39,37 +41,42 @@ constexpr int kDefaultSecurePortInt = 443;
// A singleton class used for async and blocking DNS resolution
class DNSResolver {
public:
// Tracks a single asynchronous DNS resolution attempt. The DNS
// resolution should be arranged to be cancelled as soon as possible
// when Orphan is called.
class Request : public InternallyRefCounted<Request> {
public:
// Begins async DNS resolution
virtual void Start() = 0;
};
using TaskHandle = ::grpc_event_engine::experimental::EventEngine::
DNSResolver::LookupTaskHandle;
static constexpr TaskHandle kNullHandle{0, 0};
virtual ~DNSResolver() {}
static std::string HandleToString(TaskHandle handle);
// Asynchronously resolve name. Use \a default_port if a port isn't designated
// in \a name, otherwise use the port in \a name. On completion, \a on_done is
// invoked with the result.
//
// Note for implementations: calls may acquire locks in \a on_done which
// were previously held while calling Request::Start(). Therefore,
// implementations must not invoke \a on_done inline from the call to
// Request::Start(). The DNSCallbackExecCtxScheduler utility may help address
// this.
virtual OrphanablePtr<Request> ResolveName(
// were previously held while starting the request. Therefore,
// implementations must not invoke \a on_done inline from the call site that
// starts the request. The DNSCallbackExecCtxScheduler utility may help
// address this.
//
// \a interested_parties may be deleted after a request is cancelled.
virtual TaskHandle ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) GRPC_MUST_USE_RESULT = 0;
on_done) = 0;
// Resolve name in a blocking fashion. Use \a default_port if a port isn't
// designated in \a name, otherwise use the port in \a name.
virtual absl::StatusOr<std::vector<grpc_resolved_address>>
ResolveNameBlocking(absl::string_view name,
absl::string_view default_port) = 0;
// This shares the same semantics with \a EventEngine::Cancel: successfully
// cancelled lookups will not have their callbacks executed, and this
// method returns true. If a TaskHandle is unknown, this method should return
// false.
virtual bool Cancel(TaskHandle handle) = 0;
};
// Override the active DNS resolver which should be used for all DNS

@ -26,9 +26,8 @@
namespace grpc_core {
// A fire and forget class used by DNSResolver::Request implementations to
// schedule DNS resolution callbacks on the ExecCtx, which is frequently
// necessary to avoid lock inversion related problems.
// A fire and forget class to schedule DNS resolution callbacks on the ExecCtx,
// which is frequently necessary to avoid lock inversion related problems.
class DNSCallbackExecCtxScheduler {
public:
DNSCallbackExecCtxScheduler(

@ -45,7 +45,7 @@
namespace grpc_core {
namespace {
class NativeDNSRequest : public DNSResolver::Request {
class NativeDNSRequest {
public:
NativeDNSRequest(
absl::string_view name, absl::string_view default_port,
@ -53,18 +53,9 @@ class NativeDNSRequest : public DNSResolver::Request {
on_done)
: name_(name), default_port_(default_port), on_done_(std::move(on_done)) {
GRPC_CLOSURE_INIT(&request_closure_, DoRequestThread, this, nullptr);
}
// Starts the resolution
void Start() override {
Ref().release(); // ref held by callback
Executor::Run(&request_closure_, GRPC_ERROR_NONE, ExecutorType::RESOLVER);
}
// This is a no-op for the native resolver. Note
// that no I/O polling is required for the resolution to finish.
void Orphan() override { Unref(); }
private:
// Callback to be passed to grpc Executor to asynch-ify
// ResolveNameBlocking
@ -74,7 +65,7 @@ class NativeDNSRequest : public DNSResolver::Request {
GetDNSResolver()->ResolveNameBlocking(r->name_, r->default_port_);
// running inline is safe since we've already been scheduled on the executor
r->on_done_(std::move(result));
r->Unref();
delete r;
}
const std::string name_;
@ -91,13 +82,14 @@ NativeDNSResolver* NativeDNSResolver::GetOrCreate() {
return instance;
}
OrphanablePtr<DNSResolver::Request> NativeDNSResolver::ResolveName(
DNSResolver::TaskHandle NativeDNSResolver::ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* /* interested_parties */,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) {
return MakeOrphanable<NativeDNSRequest>(name, default_port,
std::move(on_done));
// self-deleting class
new NativeDNSRequest(name, default_port, std::move(on_done));
return kNullHandle;
}
absl::StatusOr<std::vector<grpc_resolved_address>>
@ -181,6 +173,8 @@ done:
return error_result;
}
bool NativeDNSResolver::Cancel(TaskHandle /*handle*/) { return false; }
} // namespace grpc_core
#endif

@ -32,14 +32,17 @@ class NativeDNSResolver : public DNSResolver {
// Gets the singleton instance, creating it first if it doesn't exist
static NativeDNSResolver* GetOrCreate();
OrphanablePtr<DNSResolver::Request> ResolveName(
TaskHandle ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
grpc_pollset_set* /* interested_parties */,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) override;
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
absl::string_view name, absl::string_view default_port) override;
// NativeDNSResolver does not support cancellation.
bool Cancel(TaskHandle handle) override;
};
} // namespace grpc_core

@ -48,7 +48,7 @@
namespace grpc_core {
namespace {
class NativeDNSRequest : public DNSResolver::Request {
class NativeDNSRequest {
public:
NativeDNSRequest(
absl::string_view name, absl::string_view default_port,
@ -56,18 +56,9 @@ class NativeDNSRequest : public DNSResolver::Request {
on_done)
: name_(name), default_port_(default_port), on_done_(std::move(on_done)) {
GRPC_CLOSURE_INIT(&request_closure_, DoRequestThread, this, nullptr);
}
// Starts the resolution
void Start() override {
Ref().release(); // ref held by callback
Executor::Run(&request_closure_, GRPC_ERROR_NONE, ExecutorType::RESOLVER);
}
// This is a no-op for the native resolver. Note
// that no I/O polling is required for the resolution to finish.
void Orphan() override { Unref(); }
private:
// Callback to be passed to grpc Executor to asynch-ify
// ResolveNameBlocking
@ -77,7 +68,7 @@ class NativeDNSRequest : public DNSResolver::Request {
GetDNSResolver()->ResolveNameBlocking(r->name_, r->default_port_);
// running inline is safe since we've already been scheduled on the executor
r->on_done_(std::move(result));
r->Unref();
delete r;
}
const std::string name_;
@ -94,13 +85,13 @@ NativeDNSResolver* NativeDNSResolver::GetOrCreate() {
return instance;
}
OrphanablePtr<DNSResolver::Request> NativeDNSResolver::ResolveName(
DNSResolver::TaskHandle NativeDNSResolver::ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* /* interested_parties */,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) {
return MakeOrphanable<NativeDNSRequest>(name, default_port,
std::move(on_done));
new NativeDNSRequest(name, default_port, std::move(on_done));
return kNullHandle;
}
absl::StatusOr<std::vector<grpc_resolved_address>>
@ -166,6 +157,8 @@ done:
return error_result;
}
bool NativeDNSResolver::Cancel(TaskHandle /*handle*/) { return false; }
} // namespace grpc_core
#endif

@ -32,14 +32,17 @@ class NativeDNSResolver : public DNSResolver {
// Gets the singleton instance, creating it first if it doesn't exist
static NativeDNSResolver* GetOrCreate();
OrphanablePtr<DNSResolver::Request> ResolveName(
TaskHandle ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
grpc_pollset_set* /* interested_parties */,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) override;
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
absl::string_view name, absl::string_view default_port) override;
// NativeDNSResolver does not support cancellation.
bool Cancel(TaskHandle handle) override;
};
} // namespace grpc_core

@ -66,7 +66,7 @@ class TestDNSResolver : public grpc_core::DNSResolver {
public:
// Wrapper around default resolve_address in order to count the number of
// times we incur in a system-level name resolution.
grpc_core::OrphanablePtr<grpc_core::DNSResolver::Request> ResolveName(
TaskHandle ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
@ -100,6 +100,9 @@ class TestDNSResolver : public grpc_core::DNSResolver {
absl::string_view name, absl::string_view default_port) override {
return g_default_dns_resolver->ResolveNameBlocking(name, default_port);
}
// Not cancellable
bool Cancel(TaskHandle /*handle*/) override { return false; }
};
} // namespace

@ -114,25 +114,19 @@ namespace {
class FuzzerDNSResolver : public grpc_core::DNSResolver {
public:
class FuzzerDNSRequest : public grpc_core::DNSResolver::Request {
class FuzzerDNSRequest {
public:
FuzzerDNSRequest(
absl::string_view name,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done)
: name_(std::string(name)), on_done_(std::move(on_done)) {}
void Start() override {
Ref().release(); // ref held by timer callback
: name_(std::string(name)), on_done_(std::move(on_done)) {
grpc_timer_init(
&timer_,
grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
GRPC_CLOSURE_CREATE(FinishResolve, this, grpc_schedule_on_exec_ctx));
}
// cancellation not implemented
void Orphan() override { Unref(); }
private:
static void FinishResolve(void* arg, grpc_error_handle error) {
FuzzerDNSRequest* self = static_cast<FuzzerDNSRequest*>(arg);
@ -145,7 +139,7 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
} else {
self->on_done_(absl::UnknownError("Resolution failed"));
}
self->Unref();
delete self;
}
const std::string name_;
@ -161,13 +155,13 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
return instance;
}
grpc_core::OrphanablePtr<grpc_core::DNSResolver::Request> ResolveName(
TaskHandle ResolveName(
absl::string_view name, absl::string_view /* default_port */,
grpc_pollset_set* /* interested_parties */,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) override {
return grpc_core::MakeOrphanable<FuzzerDNSRequest>(name,
std::move(on_done));
new FuzzerDNSRequest(name, std::move(on_done));
return kNullHandle;
}
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
@ -175,6 +169,9 @@ class FuzzerDNSResolver : public grpc_core::DNSResolver {
absl::string_view /* default_port */) override {
GPR_ASSERT(0);
}
// FuzzerDNSResolver does not support cancellation.
bool Cancel(TaskHandle /*handle*/) override { return false; }
};
} // namespace

@ -63,42 +63,7 @@ grpc_core::DNSResolver* g_default_dns_resolver;
class TestDNSResolver : public grpc_core::DNSResolver {
public:
class TestDNSRequest : public grpc_core::DNSResolver::Request {
public:
explicit TestDNSRequest(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done)
: on_done_(std::move(on_done)) {}
void Start() override {
gpr_mu_lock(&g_mu);
if (g_resolve_port < 0) {
gpr_mu_unlock(&g_mu);
new grpc_core::DNSCallbackExecCtxScheduler(
std::move(on_done_), absl::UnknownError("Forced Failure"));
} else {
std::vector<grpc_resolved_address> addrs;
grpc_resolved_address addr;
grpc_sockaddr_in* sa = reinterpret_cast<grpc_sockaddr_in*>(&addr);
sa->sin_family = GRPC_AF_INET;
sa->sin_addr.s_addr = 0x100007f;
sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
addr.len = static_cast<socklen_t>(sizeof(*sa));
addrs.push_back(addr);
gpr_mu_unlock(&g_mu);
new grpc_core::DNSCallbackExecCtxScheduler(std::move(on_done_),
std::move(addrs));
}
}
void Orphan() override { Unref(); }
private:
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done_;
};
grpc_core::OrphanablePtr<grpc_core::DNSResolver::Request> ResolveName(
TaskHandle ResolveName(
absl::string_view name, absl::string_view default_port,
grpc_pollset_set* interested_parties,
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
@ -107,13 +72,40 @@ class TestDNSResolver : public grpc_core::DNSResolver {
return g_default_dns_resolver->ResolveName(
name, default_port, interested_parties, std::move(on_done));
}
return grpc_core::MakeOrphanable<TestDNSRequest>(std::move(on_done));
MakeDNSRequest(std::move(on_done));
return kNullHandle;
}
absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
absl::string_view name, absl::string_view default_port) override {
return g_default_dns_resolver->ResolveNameBlocking(name, default_port);
}
bool Cancel(TaskHandle /*handle*/) override { return false; }
private:
void MakeDNSRequest(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_done) {
gpr_mu_lock(&g_mu);
if (g_resolve_port < 0) {
gpr_mu_unlock(&g_mu);
new grpc_core::DNSCallbackExecCtxScheduler(
std::move(on_done), absl::UnknownError("Forced Failure"));
} else {
std::vector<grpc_resolved_address> addrs;
grpc_resolved_address addr;
grpc_sockaddr_in* sa = reinterpret_cast<grpc_sockaddr_in*>(&addr);
sa->sin_family = GRPC_AF_INET;
sa->sin_addr.s_addr = 0x100007f;
sa->sin_port = grpc_htons(static_cast<uint16_t>(g_resolve_port));
addr.len = static_cast<socklen_t>(sizeof(*sa));
addrs.push_back(addr);
gpr_mu_unlock(&g_mu);
new grpc_core::DNSCallbackExecCtxScheduler(std::move(on_done),
std::move(addrs));
}
}
};
} // namespace

@ -135,12 +135,11 @@ static void resolve_address_must_succeed(const char* target) {
args_struct args;
args_init(&args);
poll_pollset_until_request_done(&args);
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
target, "1" /* port number */, args.pollset_set,
[&args](absl::StatusOr<std::vector<grpc_resolved_address>> result) {
MustSucceed(&args, std::move(result));
});
r->Start();
grpc_core::ExecCtx::Get()->Flush();
args_finish(&args);
}

@ -154,10 +154,10 @@ class ResolveAddressTest : public ::testing::Test {
Finish();
}
grpc_pollset_set* pollset_set() const { return pollset_set_; }
private:
static void DoNothing(void* /*arg*/, grpc_error_handle /*error*/) {}
void MustNotBeCalled(
absl::StatusOr<std::vector<grpc_resolved_address>> /*result*/) {
FAIL() << "This should never be called";
}
void Finish() {
grpc_core::MutexLockForGprMu lock(mu_);
@ -165,6 +165,11 @@ class ResolveAddressTest : public ::testing::Test {
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(pollset_, nullptr));
}
grpc_pollset_set* pollset_set() const { return pollset_set_; }
private:
static void DoNothing(void* /*arg*/, grpc_error_handle /*error*/) {}
gpr_mu* mu_;
bool done_ = false; // guarded by mu
grpc_pollset* pollset_; // guarded by mu
@ -178,20 +183,18 @@ class ResolveAddressTest : public ::testing::Test {
TEST_F(ResolveAddressTest, Localhost) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"localhost:1", "", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceed, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
TEST_F(ResolveAddressTest, DefaultPort) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"localhost", "1", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceed, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
@ -201,10 +204,9 @@ TEST_F(ResolveAddressTest, LocalhostResultHasIPv6First) {
GTEST_SKIP() << "this test is only valid with the c-ares resolver";
}
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"localhost:1", "", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceedWithIPv6First, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
@ -249,50 +251,45 @@ TEST_F(ResolveAddressTest, LocalhostResultHasIPv4FirstWhenIPv6IsntAvalailable) {
address_sorting_override_source_addr_factory_for_testing(mock);
// run the test
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"localhost:1", "", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceedWithIPv4First, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
TEST_F(ResolveAddressTest, NonNumericDefaultPort) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"localhost", "http", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceed, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
TEST_F(ResolveAddressTest, MissingDefaultPort) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"localhost", "", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustFail, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
TEST_F(ResolveAddressTest, IPv6WithPort) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
"[2001:db8::1]:1", "", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceed, this));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
void TestIPv6WithoutPort(ResolveAddressTest* test, const char* target) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
target, "80", test->pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceed, test));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
test->PollPollsetUntilRequestDone();
}
@ -311,10 +308,9 @@ TEST_F(ResolveAddressTest, IPv6WithoutPortV4MappedV6) {
void TestInvalidIPAddress(ResolveAddressTest* test, const char* target) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
target, "", test->pollset_set(),
absl::bind_front(&ResolveAddressTest::MustFail, test));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
test->PollPollsetUntilRequestDone();
}
@ -329,10 +325,9 @@ TEST_F(ResolveAddressTest, InvalidIPv6Addresses) {
void TestUnparseableHostPort(ResolveAddressTest* test, const char* target) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
grpc_core::GetDNSResolver()->ResolveName(
target, "1", test->pollset_set(),
absl::bind_front(&ResolveAddressTest::MustFail, test));
r->Start();
grpc_core::ExecCtx::Get()->Flush();
test->PollPollsetUntilRequestDone();
}
@ -365,15 +360,27 @@ TEST_F(ResolveAddressTest, UnparseableHostPortsBadLocalhostWithPort) {
// test doesn't care what the result is, just that we don't crash etc.
TEST_F(ResolveAddressTest, ImmediateCancel) {
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
auto request_handle = grpc_core::GetDNSResolver()->ResolveName(
"localhost:1", "1", pollset_set(),
absl::bind_front(&ResolveAddressTest::DontCare, this));
r->Start();
r.reset(); // cancel the resolution
if (grpc_core::GetDNSResolver()->Cancel(request_handle)) {
Finish();
}
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
// Attempt to cancel a request after it has completed.
TEST_F(ResolveAddressTest, CancelDoesNotSucceed) {
grpc_core::ExecCtx exec_ctx;
auto request_handle = grpc_core::GetDNSResolver()->ResolveName(
"localhost:1", "1", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustSucceed, this));
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
ASSERT_FALSE(grpc_core::GetDNSResolver()->Cancel(request_handle));
}
namespace {
int g_fake_non_responsive_dns_server_port;
@ -410,14 +417,78 @@ TEST_F(ResolveAddressTest, CancelWithNonResponsiveDNSServer) {
grpc_ares_test_only_inject_config = InjectNonResponsiveDNSServer;
// Run the test
grpc_core::ExecCtx exec_ctx;
auto r = grpc_core::GetDNSResolver()->ResolveName(
auto request_handle = grpc_core::GetDNSResolver()->ResolveName(
"foo.bar.com:1", "1", pollset_set(),
absl::bind_front(&ResolveAddressTest::MustFailExpectCancelledErrorMessage,
this));
r->Start();
absl::bind_front(&ResolveAddressTest::MustNotBeCalled, this));
grpc_core::ExecCtx::Get()->Flush(); // initiate DNS requests
r.reset(); // cancel the resolution
grpc_core::ExecCtx::Get()->Flush(); // let cancellation work finish
ASSERT_TRUE(grpc_core::GetDNSResolver()->Cancel(request_handle));
Finish();
// let cancellation work finish to ensure the callback is not called
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone();
}
// RAII class for pollset and pollset_set creation
class PollsetSetWrapper {
public:
static std::unique_ptr<PollsetSetWrapper> Create() {
return absl::WrapUnique<PollsetSetWrapper>(new PollsetSetWrapper());
}
~PollsetSetWrapper() {
grpc_pollset_set_del_pollset(pss_, ps_);
grpc_pollset_destroy(ps_);
gpr_free(ps_);
grpc_pollset_set_destroy(pss_);
gpr_log(GPR_DEBUG, "PollsetSetWrapper:%p deleted", this);
}
grpc_pollset_set* pollset_set() { return pss_; }
private:
PollsetSetWrapper() {
ps_ = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(ps_, &mu_);
pss_ = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(pss_, ps_);
gpr_log(GPR_DEBUG, "PollsetSetWrapper:%p created", this);
}
gpr_mu* mu_;
grpc_pollset* ps_;
grpc_pollset_set* pss_;
};
TEST_F(ResolveAddressTest, DeleteInterestedPartiesAfterCancellation) {
// Regression test for race around interested_party deletion after
// cancellation.
if (absl::string_view(g_resolver_type) != "ares") {
GTEST_SKIP() << "the native resolver doesn't support cancellation, so we "
"can only test this with c-ares";
}
// Inject an unresponsive DNS server into the resolver's DNS server config
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
g_fake_non_responsive_dns_server_port = fake_dns_server.port();
grpc_ares_test_only_inject_config = InjectNonResponsiveDNSServer;
{
grpc_core::ExecCtx exec_ctx;
// Create a pollset_set, destroyed immediately after cancellation
std::unique_ptr<PollsetSetWrapper> pss = PollsetSetWrapper::Create();
// Run the test
auto request_handle = grpc_core::GetDNSResolver()->ResolveName(
"foo.bar.com:1", "1", pss->pollset_set(),
absl::bind_front(&ResolveAddressTest::MustNotBeCalled, this));
grpc_core::ExecCtx::Get()->Flush(); // initiate DNS requests
ASSERT_TRUE(grpc_core::GetDNSResolver()->Cancel(request_handle));
}
{
// let cancellation work finish to ensure the callback is not called
grpc_core::ExecCtx ctx;
Finish();
}
PollPollsetUntilRequestDone();
}

Loading…
Cancel
Save