Merge pull request #17987 from markdroth/resolver_api

Add ResultHandler to Resolver API
reviewable/pr18357/r2^2
Mark D. Roth 6 years ago committed by GitHub
commit 4222ba53d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/ext/filters/client_channel/resolver.cc
  2. 54
      src/core/ext/filters/client_channel/resolver.h
  3. 97
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  4. 104
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  5. 88
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  6. 3
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  7. 89
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
  8. 4
      src/core/ext/filters/client_channel/resolver_factory.h
  9. 7
      src/core/ext/filters/client_channel/resolver_registry.cc
  10. 8
      src/core/ext/filters/client_channel/resolver_registry.h
  11. 216
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  12. 14
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  13. 3
      src/core/lib/channel/channel_args.h
  14. 111
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  15. 84
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  16. 8
      test/core/client_channel/resolvers/dns_resolver_test.cc
  17. 172
      test/core/client_channel/resolvers/fake_resolver_test.cc
  18. 36
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc
  19. 39
      test/cpp/naming/cancel_ares_query_test.cc
  20. 134
      test/cpp/naming/resolver_component_test.cc

@ -26,8 +26,10 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_resolver_refcount(false,
namespace grpc_core {
Resolver::Resolver(grpc_combiner* combiner)
Resolver::Resolver(grpc_combiner* combiner,
UniquePtr<ResultHandler> result_handler)
: InternallyRefCounted(&grpc_trace_resolver_refcount),
result_handler_(std::move(result_handler)),
combiner_(GRPC_COMBINER_REF(combiner, "resolver")) {}
Resolver::~Resolver() { GRPC_COMBINER_UNREF(combiner_, "resolver"); }

@ -46,27 +46,34 @@ namespace grpc_core {
/// combiner passed to the constructor.
class Resolver : public InternallyRefCounted<Resolver> {
public:
/// A proxy object used by the resolver to return results to the
/// client channel.
class ResultHandler {
public:
virtual ~ResultHandler() {}
/// Returns a result to the channel.
/// The list of addresses will be in GRPC_ARG_SERVER_ADDRESS_LIST.
/// The service config (if any) will be in GRPC_ARG_SERVICE_CONFIG.
/// Takes ownership of \a result.
// TODO(roth): Change this API so that addresses and service config are
// passed explicitly instead of being in channel args.
virtual void ReturnResult(const grpc_channel_args* result) GRPC_ABSTRACT;
/// Returns a transient error to the channel.
/// If the resolver does not set the GRPC_ERROR_INT_GRPC_STATUS
/// attribute on the error, calls will be failed with status UNKNOWN.
virtual void ReturnError(grpc_error* error) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
// Not copyable nor movable.
Resolver(const Resolver&) = delete;
Resolver& operator=(const Resolver&) = delete;
/// Requests a callback when a new result becomes available.
/// When the new result is available, sets \a *result to the new result
/// and schedules \a on_complete for execution.
/// Upon transient failure, sets \a *result to nullptr and schedules
/// \a on_complete with no error.
/// If resolution is fatally broken, sets \a *result to nullptr and
/// schedules \a on_complete with an error.
/// TODO(roth): When we have time, improve the way this API represents
/// transient failure vs. shutdown.
///
/// Note that the client channel will almost always have a request
/// to \a NextLocked() pending. When it gets the callback, it will
/// process the new result and then immediately make another call to
/// \a NextLocked(). This allows push-based resolvers to provide new
/// data as soon as it becomes available.
virtual void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) GRPC_ABSTRACT;
/// Starts resolving.
virtual void StartLocked() GRPC_ABSTRACT;
/// Asks the resolver to obtain an updated resolver result, if
/// applicable.
@ -79,8 +86,8 @@ class Resolver : public InternallyRefCounted<Resolver> {
///
/// For push-based implementations, this may be a no-op.
///
/// If this causes new data to become available, then the currently
/// pending call to \a NextLocked() will return the new result.
/// Note: Implementations must not invoke any method on the
/// ResultHandler from within this call.
virtual void RequestReresolutionLocked() {}
/// Resets the re-resolution backoff, if any.
@ -108,16 +115,18 @@ class Resolver : public InternallyRefCounted<Resolver> {
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a RefCountedPtr<>, so that we always take
// ownership of a new ref.
explicit Resolver(grpc_combiner* combiner);
explicit Resolver(grpc_combiner* combiner,
UniquePtr<ResultHandler> result_handler);
virtual ~Resolver();
/// Shuts down the resolver. If there is a pending call to
/// NextLocked(), the callback will be scheduled with an error.
/// Shuts down the resolver.
virtual void ShutdownLocked() GRPC_ABSTRACT;
grpc_combiner* combiner() const { return combiner_; }
ResultHandler* result_handler() const { return result_handler_.get(); }
private:
static void ShutdownAndUnrefLocked(void* arg, grpc_error* ignored) {
Resolver* resolver = static_cast<Resolver*>(arg);
@ -125,6 +134,7 @@ class Resolver : public InternallyRefCounted<Resolver> {
resolver->Unref();
}
UniquePtr<ResultHandler> result_handler_;
grpc_combiner* combiner_;
};

@ -60,10 +60,9 @@ const char kDefaultPort[] = "https";
class AresDnsResolver : public Resolver {
public:
explicit AresDnsResolver(const ResolverArgs& args);
explicit AresDnsResolver(ResolverArgs args);
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void StartLocked() override;
void RequestReresolutionLocked() override;
@ -76,7 +75,6 @@ class AresDnsResolver : public Resolver {
void MaybeStartResolvingLocked();
void StartResolvingLocked();
void MaybeFinishNextLocked();
static void OnNextResolutionLocked(void* arg, grpc_error* error);
static void OnResolvedLocked(void* arg, grpc_error* error);
@ -98,16 +96,6 @@ class AresDnsResolver : public Resolver {
bool resolving_ = false;
/// the pending resolving request
grpc_ares_request* pending_request_ = nullptr;
/// which version of the result have we published?
int published_version_ = 0;
/// which version of the result is current?
int resolved_version_ = 0;
/// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
/// target result address for next completion
grpc_channel_args** target_result_ = nullptr;
/// current (fully resolved) result
grpc_channel_args* resolved_result_ = nullptr;
/// next resolution timer
bool have_next_resolution_timer_ = false;
grpc_timer next_resolution_timer_;
@ -129,8 +117,8 @@ class AresDnsResolver : public Resolver {
bool enable_srv_queries_;
};
AresDnsResolver::AresDnsResolver(const ResolverArgs& args)
: Resolver(args.combiner),
AresDnsResolver::AresDnsResolver(ResolverArgs args)
: Resolver(args.combiner, std::move(args.result_handler)),
backoff_(
BackOff::Options()
.set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS *
@ -177,27 +165,16 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args)
AresDnsResolver::~AresDnsResolver() {
GRPC_CARES_TRACE_LOG("resolver:%p destroying AresDnsResolver", this);
if (resolved_result_ != nullptr) {
grpc_channel_args_destroy(resolved_result_);
}
grpc_pollset_set_destroy(interested_parties_);
gpr_free(dns_server_);
gpr_free(name_to_resolve_);
grpc_channel_args_destroy(channel_args_);
}
void AresDnsResolver::NextLocked(grpc_channel_args** target_result,
grpc_closure* on_complete) {
GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::NextLocked() is called.",
void AresDnsResolver::StartLocked() {
GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::StartLocked() is called.",
this);
GPR_ASSERT(next_completion_ == nullptr);
next_completion_ = on_complete;
target_result_ = target_result;
if (resolved_version_ == 0 && !resolving_) {
MaybeStartResolvingLocked();
} else {
MaybeFinishNextLocked();
}
MaybeStartResolvingLocked();
}
void AresDnsResolver::RequestReresolutionLocked() {
@ -221,12 +198,6 @@ void AresDnsResolver::ShutdownLocked() {
if (pending_request_ != nullptr) {
grpc_cancel_ares_request_locked(pending_request_);
}
if (next_completion_ != nullptr) {
*target_result_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Resolver Shutdown"));
next_completion_ = nullptr;
}
}
void AresDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
@ -319,11 +290,14 @@ char* ChooseServiceConfig(char* service_config_choice_json) {
void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
grpc_channel_args* result = nullptr;
GPR_ASSERT(r->resolving_);
r->resolving_ = false;
gpr_free(r->pending_request_);
r->pending_request_ = nullptr;
if (r->shutdown_initiated_) {
r->Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown");
return;
}
if (r->addresses_ != nullptr) {
static const char* args_to_remove[1];
size_t num_args_to_remove = 0;
@ -343,17 +317,22 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
(char*)GRPC_ARG_SERVICE_CONFIG, service_config_string);
}
}
result = grpc_channel_args_copy_and_add_and_remove(
r->result_handler()->ReturnResult(grpc_channel_args_copy_and_add_and_remove(
r->channel_args_, args_to_remove, num_args_to_remove, args_to_add,
num_args_to_add);
num_args_to_add));
gpr_free(service_config_string);
r->addresses_.reset();
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
r->backoff_.Reset();
} else if (!r->shutdown_initiated_) {
const char* msg = grpc_error_string(error);
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r, msg);
} else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r,
grpc_error_string(error));
r->result_handler()->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"DNS resolution failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
// Set retry timer.
grpc_millis next_try = r->backoff_.NextAttemptTime();
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed (will retry): %s",
@ -363,8 +342,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self = r->Ref(DEBUG_LOCATION, "retry-timer");
self.release();
r->Ref(DEBUG_LOCATION, "retry-timer").release();
if (timeout > 0) {
GRPC_CARES_TRACE_LOG("resolver:%p retrying in %" PRId64 " milliseconds",
r, timeout);
@ -374,12 +352,6 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
grpc_timer_init(&r->next_resolution_timer_, next_try,
&r->on_next_resolution_);
}
if (r->resolved_result_ != nullptr) {
grpc_channel_args_destroy(r->resolved_result_);
}
r->resolved_result_ = result;
++r->resolved_version_;
r->MaybeFinishNextLocked();
r->Unref(DEBUG_LOCATION, "dns-resolving");
}
@ -403,9 +375,7 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
self.release();
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
return;
@ -418,8 +388,7 @@ void AresDnsResolver::StartResolvingLocked() {
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self = Ref(DEBUG_LOCATION, "dns-resolving");
self.release();
Ref(DEBUG_LOCATION, "dns-resolving").release();
GPR_ASSERT(!resolving_);
resolving_ = true;
service_config_json_ = nullptr;
@ -433,28 +402,14 @@ void AresDnsResolver::StartResolvingLocked() {
this, pending_request_);
}
void AresDnsResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr && resolved_version_ != published_version_) {
*target_result_ = resolved_result_ == nullptr
? nullptr
: grpc_channel_args_copy(resolved_result_);
GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::MaybeFinishNextLocked()",
this);
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
published_version_ = resolved_version_;
}
}
//
// Factory
//
class AresDnsResolverFactory : public ResolverFactory {
public:
OrphanablePtr<Resolver> CreateResolver(
const ResolverArgs& args) const override {
return OrphanablePtr<Resolver>(New<AresDnsResolver>(args));
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return OrphanablePtr<Resolver>(New<AresDnsResolver>(std::move(args)));
}
const char* scheme() const override { return "dns"; }

@ -51,10 +51,9 @@ const char kDefaultPort[] = "https";
class NativeDnsResolver : public Resolver {
public:
explicit NativeDnsResolver(const ResolverArgs& args);
explicit NativeDnsResolver(ResolverArgs args);
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void StartLocked() override;
void RequestReresolutionLocked() override;
@ -67,7 +66,6 @@ class NativeDnsResolver : public Resolver {
void MaybeStartResolvingLocked();
void StartResolvingLocked();
void MaybeFinishNextLocked();
static void OnNextResolutionLocked(void* arg, grpc_error* error);
static void OnResolvedLocked(void* arg, grpc_error* error);
@ -78,19 +76,11 @@ class NativeDnsResolver : public Resolver {
grpc_channel_args* channel_args_ = nullptr;
/// pollset_set to drive the name resolution process
grpc_pollset_set* interested_parties_ = nullptr;
/// are we shutting down?
bool shutdown_ = false;
/// are we currently resolving?
bool resolving_ = false;
grpc_closure on_resolved_;
/// which version of the result have we published?
int published_version_ = 0;
/// which version of the result is current?
int resolved_version_ = 0;
/// pending next completion, or nullptr
grpc_closure* next_completion_ = nullptr;
/// target result address for next completion
grpc_channel_args** target_result_ = nullptr;
/// current (fully resolved) result
grpc_channel_args* resolved_result_ = nullptr;
/// next resolution timer
bool have_next_resolution_timer_ = false;
grpc_timer next_resolution_timer_;
@ -105,8 +95,8 @@ class NativeDnsResolver : public Resolver {
grpc_resolved_addresses* addresses_ = nullptr;
};
NativeDnsResolver::NativeDnsResolver(const ResolverArgs& args)
: Resolver(args.combiner),
NativeDnsResolver::NativeDnsResolver(ResolverArgs args)
: Resolver(args.combiner, std::move(args.result_handler)),
backoff_(
BackOff::Options()
.set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS *
@ -134,25 +124,12 @@ NativeDnsResolver::NativeDnsResolver(const ResolverArgs& args)
}
NativeDnsResolver::~NativeDnsResolver() {
if (resolved_result_ != nullptr) {
grpc_channel_args_destroy(resolved_result_);
}
grpc_channel_args_destroy(channel_args_);
grpc_pollset_set_destroy(interested_parties_);
gpr_free(name_to_resolve_);
grpc_channel_args_destroy(channel_args_);
}
void NativeDnsResolver::NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) {
GPR_ASSERT(next_completion_ == nullptr);
next_completion_ = on_complete;
target_result_ = result;
if (resolved_version_ == 0 && !resolving_) {
MaybeStartResolvingLocked();
} else {
MaybeFinishNextLocked();
}
}
void NativeDnsResolver::StartLocked() { MaybeStartResolvingLocked(); }
void NativeDnsResolver::RequestReresolutionLocked() {
if (!resolving_) {
@ -168,15 +145,10 @@ void NativeDnsResolver::ResetBackoffLocked() {
}
void NativeDnsResolver::ShutdownLocked() {
shutdown_ = true;
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
}
if (next_completion_ != nullptr) {
*target_result_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Resolver Shutdown"));
next_completion_ = nullptr;
}
}
void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
@ -190,38 +162,42 @@ void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
grpc_channel_args* result = nullptr;
GPR_ASSERT(r->resolving_);
r->resolving_ = false;
GRPC_ERROR_REF(error);
error =
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(r->name_to_resolve_));
if (r->shutdown_) {
r->Unref(DEBUG_LOCATION, "dns-resolving");
return;
}
if (r->addresses_ != nullptr) {
ServerAddressList addresses;
for (size_t i = 0; i < r->addresses_->naddrs; ++i) {
addresses.emplace_back(&r->addresses_->addrs[i].addr,
r->addresses_->addrs[i].len, nullptr /* args */);
}
grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses);
result = grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1);
grpc_resolved_addresses_destroy(r->addresses_);
grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses);
r->result_handler()->ReturnResult(
grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1));
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
r->backoff_.Reset();
} else {
grpc_millis next_try = r->backoff_.NextAttemptTime();
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
// Return transient error.
r->result_handler()->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"DNS resolution failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
// Set up for retry.
grpc_millis next_try = r->backoff_.NextAttemptTime();
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
GPR_ASSERT(!r->have_next_resolution_timer_);
r->have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
r->Ref(DEBUG_LOCATION, "next_resolution_timer");
self.release();
r->Ref(DEBUG_LOCATION, "next_resolution_timer").release();
if (timeout > 0) {
gpr_log(GPR_DEBUG, "retrying in %" PRId64 " milliseconds", timeout);
} else {
@ -230,13 +206,6 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
grpc_timer_init(&r->next_resolution_timer_, next_try,
&r->on_next_resolution_);
}
if (r->resolved_result_ != nullptr) {
grpc_channel_args_destroy(r->resolved_result_);
}
r->resolved_result_ = result;
++r->resolved_version_;
r->MaybeFinishNextLocked();
GRPC_ERROR_UNREF(error);
r->Unref(DEBUG_LOCATION, "dns-resolving");
}
@ -260,9 +229,7 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self =
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
self.release();
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
return;
@ -276,8 +243,7 @@ void NativeDnsResolver::StartResolvingLocked() {
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
RefCountedPtr<Resolver> self = Ref(DEBUG_LOCATION, "dns-resolving");
self.release();
Ref(DEBUG_LOCATION, "dns-resolving").release();
GPR_ASSERT(!resolving_);
resolving_ = true;
addresses_ = nullptr;
@ -286,30 +252,18 @@ void NativeDnsResolver::StartResolvingLocked() {
last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
}
void NativeDnsResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr && resolved_version_ != published_version_) {
*target_result_ = resolved_result_ == nullptr
? nullptr
: grpc_channel_args_copy(resolved_result_);
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
published_version_ = resolved_version_;
}
}
//
// Factory
//
class NativeDnsResolverFactory : public ResolverFactory {
public:
OrphanablePtr<Resolver> CreateResolver(
const ResolverArgs& args) const override {
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
if (GPR_UNLIKELY(0 != strcmp(args.uri->authority, ""))) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return OrphanablePtr<Resolver>(nullptr);
}
return OrphanablePtr<Resolver>(New<NativeDnsResolver>(args));
return OrphanablePtr<Resolver>(New<NativeDnsResolver>(std::move(args)));
}
const char* scheme() const override { return "dns"; }

@ -50,10 +50,9 @@ namespace grpc_core {
// FakeResolverResponseGenerator.
class FakeResolver : public Resolver {
public:
explicit FakeResolver(const ResolverArgs& args);
explicit FakeResolver(ResolverArgs args);
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void StartLocked() override;
void RequestReresolutionLocked() override;
@ -62,27 +61,32 @@ class FakeResolver : public Resolver {
virtual ~FakeResolver();
void MaybeFinishNextLocked();
void ShutdownLocked() override { active_ = false; }
void ShutdownLocked() override;
void MaybeSendResultLocked();
static void ReturnReresolutionResult(void* arg, grpc_error* error);
// passed-in parameters
grpc_channel_args* channel_args_ = nullptr;
// If not NULL, the next set of resolution results to be returned to
// NextLocked()'s closure.
// If not NULL, the next set of resolution results to be returned.
grpc_channel_args* next_results_ = nullptr;
// Results to use for the pretended re-resolution in
// RequestReresolutionLocked().
grpc_channel_args* reresolution_results_ = nullptr;
// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
grpc_channel_args** target_result_ = nullptr;
// True between the calls to StartLocked() ShutdownLocked().
bool active_ = false;
// if true, return failure
bool return_failure_ = false;
// pending re-resolution
grpc_closure reresolution_closure_;
bool reresolution_closure_pending_ = false;
};
FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
FakeResolver::FakeResolver(ResolverArgs args)
: Resolver(args.combiner, std::move(args.result_handler)) {
GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
grpc_combiner_scheduler(combiner()));
channel_args_ = grpc_channel_args_copy(args.args);
FakeResolverResponseGenerator* response_generator =
FakeResolverResponseGenerator::GetFromArgs(args.args);
@ -102,46 +106,51 @@ FakeResolver::~FakeResolver() {
grpc_channel_args_destroy(channel_args_);
}
void FakeResolver::NextLocked(grpc_channel_args** target_result,
grpc_closure* on_complete) {
GPR_ASSERT(next_completion_ == nullptr);
next_completion_ = on_complete;
target_result_ = target_result;
MaybeFinishNextLocked();
void FakeResolver::StartLocked() {
active_ = true;
MaybeSendResultLocked();
}
void FakeResolver::RequestReresolutionLocked() {
if (reresolution_results_ != nullptr || return_failure_) {
grpc_channel_args_destroy(next_results_);
next_results_ = grpc_channel_args_copy(reresolution_results_);
MaybeFinishNextLocked();
// Return the result in a different closure, so that we don't call
// back into the LB policy while it's still processing the previous
// update.
if (!reresolution_closure_pending_) {
reresolution_closure_pending_ = true;
Ref().release(); // ref held by closure
GRPC_CLOSURE_SCHED(&reresolution_closure_, GRPC_ERROR_NONE);
}
}
}
void FakeResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr &&
(next_results_ != nullptr || return_failure_)) {
void FakeResolver::MaybeSendResultLocked() {
if (!active_) return;
if (return_failure_) {
// TODO(roth): Change resolver result generator to be able to inject
// the error to be returned.
result_handler()->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver transient failure"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
return_failure_ = false;
} else if (next_results_ != nullptr) {
// When both next_results_ and channel_args_ contain an arg with the same
// name, only the one in next_results_ will be kept since next_results_ is
// before channel_args_.
*target_result_ =
return_failure_ ? nullptr
: grpc_channel_args_union(next_results_, channel_args_);
result_handler()->ReturnResult(
grpc_channel_args_union(next_results_, channel_args_));
grpc_channel_args_destroy(next_results_);
next_results_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
return_failure_ = false;
}
}
void FakeResolver::ShutdownLocked() {
if (next_completion_ != nullptr) {
*target_result_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Resolver Shutdown"));
next_completion_ = nullptr;
}
void FakeResolver::ReturnReresolutionResult(void* arg, grpc_error* error) {
FakeResolver* self = static_cast<FakeResolver*>(arg);
self->reresolution_closure_pending_ = false;
self->MaybeSendResultLocked();
self->Unref();
}
//
@ -161,7 +170,7 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
FakeResolver* resolver = closure_arg->generator->resolver_;
grpc_channel_args_destroy(resolver->next_results_);
resolver->next_results_ = closure_arg->response;
resolver->MaybeFinishNextLocked();
resolver->MaybeSendResultLocked();
Delete(closure_arg);
}
@ -210,7 +219,7 @@ void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
FakeResolver* resolver = closure_arg->generator->resolver_;
resolver->return_failure_ = true;
if (closure_arg->immediate) resolver->MaybeFinishNextLocked();
if (closure_arg->immediate) resolver->MaybeSendResultLocked();
Delete(closure_arg);
}
@ -290,9 +299,8 @@ namespace {
class FakeResolverFactory : public ResolverFactory {
public:
OrphanablePtr<Resolver> CreateResolver(
const ResolverArgs& args) const override {
return OrphanablePtr<Resolver>(New<FakeResolver>(args));
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return OrphanablePtr<Resolver>(New<FakeResolver>(std::move(args)));
}
const char* scheme() const override { return "fake"; }

@ -58,8 +58,7 @@ class FakeResolverResponseGenerator
// is called.
void SetReresolutionResponse(grpc_channel_args* response);
// Tells the resolver to return a transient failure (signalled by
// returning a null result with no error).
// Tells the resolver to return a transient failure.
void SetFailure();
// Same as SetFailure(), but instead of returning the error

@ -45,66 +45,29 @@ namespace {
class SockaddrResolver : public Resolver {
public:
/// Takes ownership of \a addresses.
SockaddrResolver(const ResolverArgs& args,
UniquePtr<ServerAddressList> addresses);
explicit SockaddrResolver(ResolverArgs args);
~SockaddrResolver() override;
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void StartLocked() override;
void ShutdownLocked() override;
void ShutdownLocked() override {}
private:
virtual ~SockaddrResolver();
void MaybeFinishNextLocked();
/// the addresses that we've "resolved"
UniquePtr<ServerAddressList> addresses_;
/// channel args
grpc_channel_args* channel_args_ = nullptr;
/// have we published?
bool published_ = false;
/// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
/// target result address for next completion
grpc_channel_args** target_result_ = nullptr;
const grpc_channel_args* channel_args_ = nullptr;
};
SockaddrResolver::SockaddrResolver(const ResolverArgs& args,
UniquePtr<ServerAddressList> addresses)
: Resolver(args.combiner),
addresses_(std::move(addresses)),
channel_args_(grpc_channel_args_copy(args.args)) {}
SockaddrResolver::SockaddrResolver(ResolverArgs args)
: Resolver(args.combiner, std::move(args.result_handler)),
channel_args_(args.args) {}
SockaddrResolver::~SockaddrResolver() {
grpc_channel_args_destroy(channel_args_);
}
void SockaddrResolver::NextLocked(grpc_channel_args** target_result,
grpc_closure* on_complete) {
GPR_ASSERT(!next_completion_);
next_completion_ = on_complete;
target_result_ = target_result;
MaybeFinishNextLocked();
}
void SockaddrResolver::ShutdownLocked() {
if (next_completion_ != nullptr) {
*target_result_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Resolver Shutdown"));
next_completion_ = nullptr;
}
}
void SockaddrResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr && !published_) {
published_ = true;
grpc_arg arg = CreateServerAddressListChannelArg(addresses_.get());
*target_result_ = grpc_channel_args_copy_and_add(channel_args_, &arg, 1);
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
}
void SockaddrResolver::StartLocked() {
result_handler()->ReturnResult(channel_args_);
channel_args_ = nullptr;
}
//
@ -114,7 +77,7 @@ void SockaddrResolver::MaybeFinishNextLocked() {
void DoNothing(void* ignored) {}
OrphanablePtr<Resolver> CreateSockaddrResolver(
const ResolverArgs& args,
ResolverArgs args,
bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) {
if (0 != strcmp(args.uri->authority, "")) {
gpr_log(GPR_ERROR, "authority-based URIs not supported by the %s scheme",
@ -127,7 +90,7 @@ OrphanablePtr<Resolver> CreateSockaddrResolver(
grpc_slice_buffer path_parts;
grpc_slice_buffer_init(&path_parts);
grpc_slice_split(path_slice, ",", &path_parts);
auto addresses = MakeUnique<ServerAddressList>();
ServerAddressList addresses;
bool errors_found = false;
for (size_t i = 0; i < path_parts.count; i++) {
grpc_uri ith_uri = *args.uri;
@ -135,26 +98,28 @@ OrphanablePtr<Resolver> CreateSockaddrResolver(
ith_uri.path = part_str.get();
grpc_resolved_address addr;
if (!parse(&ith_uri, &addr)) {
errors_found = true; /* GPR_TRUE */
errors_found = true;
break;
}
addresses->emplace_back(addr, nullptr /* args */);
addresses.emplace_back(addr, nullptr /* args */);
}
grpc_slice_buffer_destroy_internal(&path_parts);
grpc_slice_unref_internal(path_slice);
if (errors_found) {
return OrphanablePtr<Resolver>(nullptr);
}
// Add addresses to channel args.
// Note: SockaddrResolver takes ownership of channel args.
grpc_arg arg = CreateServerAddressListChannelArg(&addresses);
args.args = grpc_channel_args_copy_and_add(args.args, &arg, 1);
// Instantiate resolver.
return OrphanablePtr<Resolver>(
New<SockaddrResolver>(args, std::move(addresses)));
return OrphanablePtr<Resolver>(New<SockaddrResolver>(std::move(args)));
}
class IPv4ResolverFactory : public ResolverFactory {
public:
OrphanablePtr<Resolver> CreateResolver(
const ResolverArgs& args) const override {
return CreateSockaddrResolver(args, grpc_parse_ipv4);
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return CreateSockaddrResolver(std::move(args), grpc_parse_ipv4);
}
const char* scheme() const override { return "ipv4"; }
@ -162,9 +127,8 @@ class IPv4ResolverFactory : public ResolverFactory {
class IPv6ResolverFactory : public ResolverFactory {
public:
OrphanablePtr<Resolver> CreateResolver(
const ResolverArgs& args) const override {
return CreateSockaddrResolver(args, grpc_parse_ipv6);
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return CreateSockaddrResolver(std::move(args), grpc_parse_ipv6);
}
const char* scheme() const override { return "ipv6"; }
@ -173,9 +137,8 @@ class IPv6ResolverFactory : public ResolverFactory {
#ifdef GRPC_HAVE_UNIX_SOCKET
class UnixResolverFactory : public ResolverFactory {
public:
OrphanablePtr<Resolver> CreateResolver(
const ResolverArgs& args) const override {
return CreateSockaddrResolver(args, grpc_parse_unix);
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return CreateSockaddrResolver(std::move(args), grpc_parse_unix);
}
UniquePtr<char> GetDefaultAuthority(grpc_uri* uri) const override {

@ -41,12 +41,14 @@ struct ResolverArgs {
grpc_pollset_set* pollset_set = nullptr;
/// The combiner under which all resolver calls will be run.
grpc_combiner* combiner = nullptr;
/// The result handler to be used by the resolver.
UniquePtr<Resolver::ResultHandler> result_handler;
};
class ResolverFactory {
public:
/// Returns a new resolver instance.
virtual OrphanablePtr<Resolver> CreateResolver(const ResolverArgs& args) const
virtual OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const
GRPC_ABSTRACT;
/// Returns a string representing the default authority to use for this

@ -134,7 +134,8 @@ ResolverFactory* ResolverRegistry::LookupResolverFactory(const char* scheme) {
OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, grpc_combiner* combiner) {
grpc_pollset_set* pollset_set, grpc_combiner* combiner,
UniquePtr<Resolver::ResultHandler> result_handler) {
GPR_ASSERT(g_state != nullptr);
grpc_uri* uri = nullptr;
char* canonical_target = nullptr;
@ -145,8 +146,10 @@ OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
resolver_args.args = args;
resolver_args.pollset_set = pollset_set;
resolver_args.combiner = combiner;
resolver_args.result_handler = std::move(result_handler);
OrphanablePtr<Resolver> resolver =
factory == nullptr ? nullptr : factory->CreateResolver(resolver_args);
factory == nullptr ? nullptr
: factory->CreateResolver(std::move(resolver_args));
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return resolver;

@ -62,10 +62,10 @@ class ResolverRegistry {
/// \a args are the channel args to be included in resolver results.
/// \a pollset_set is used to drive I/O in the name resolution process.
/// \a combiner is the combiner under which all resolver calls will be run.
static OrphanablePtr<Resolver> CreateResolver(const char* target,
const grpc_channel_args* args,
grpc_pollset_set* pollset_set,
grpc_combiner* combiner);
static OrphanablePtr<Resolver> CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, grpc_combiner* combiner,
UniquePtr<Resolver::ResultHandler> result_handler);
/// Returns the default authority to pass from a client for \a target.
static UniquePtr<char> GetDefaultAuthority(const char* target);

@ -65,6 +65,36 @@
namespace grpc_core {
//
// ResolvingLoadBalancingPolicy::ResolverResultHandler
//
class ResolvingLoadBalancingPolicy::ResolverResultHandler
: public Resolver::ResultHandler {
public:
explicit ResolverResultHandler(
RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
~ResolverResultHandler() {
if (parent_->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
parent_.get());
}
}
void ReturnResult(const grpc_channel_args* result) override {
parent_->OnResolverResultChangedLocked(result);
}
void ReturnError(grpc_error* error) override {
parent_->OnResolverError(error);
}
private:
RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
};
//
// ResolvingLoadBalancingPolicy::ResolvingControlHelper
//
@ -196,12 +226,9 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
}
grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
GRPC_CLOSURE_INIT(
&on_resolver_result_changed_,
&ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked, this,
grpc_combiner_scheduler(combiner()));
resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), &args, interested_parties(), combiner());
target_uri_.get(), &args, interested_parties(), combiner(),
UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
if (resolver_ == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
}
@ -288,62 +315,34 @@ void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
Ref().release();
resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_);
resolver_->StartLocked();
}
// Invoked from the resolver NextLocked() callback when the resolver
// is shutting down.
void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down", this);
void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
if (resolver_ == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
{
MutexLock lock(&lb_policy_mu_);
if (lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties());
lb_policy_.reset();
}
if (pending_lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
this, pending_lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
interested_parties());
pending_lb_policy_.reset();
}
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
grpc_error_string(error));
}
if (resolver_ != nullptr) {
// This should never happen; it can only be triggered by a resolver
// implementation spotaneously deciding to report shutdown without
// being orphaned. This code is included just to be defensive.
if (tracer_->enabled()) {
gpr_log(GPR_INFO,
"resolving_lb=%p: spontaneous shutdown from resolver %p", this,
resolver_.get());
}
resolver_.reset();
grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver spontaneous shutdown", &error, 1);
// If we already have an LB policy from a previous resolution
// result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE.
if (lb_policy_ == nullptr) {
grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(state_error),
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
}
grpc_channel_args_destroy(resolver_result_);
resolver_result_ = nullptr;
GRPC_ERROR_UNREF(error);
Unref();
}
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name, RefCountedPtr<Config> lb_policy_config,
TraceStringVector* trace_strings) {
const grpc_channel_args& args, TraceStringVector* trace_strings) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -411,7 +410,7 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
}
auto new_policy = CreateLbPolicyLocked(lb_policy_name, trace_strings);
auto new_policy = CreateLbPolicyLocked(lb_policy_name, args, trace_strings);
auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
{
MutexLock lock(&lb_policy_mu_);
@ -432,21 +431,21 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
policy_to_update == pending_lb_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(*resolver_result_,
std::move(lb_policy_config));
policy_to_update->UpdateLocked(args, std::move(lb_policy_config));
}
// Creates a new LB policy.
// Updates trace_strings to indicate what was done.
OrphanablePtr<LoadBalancingPolicy>
ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
const char* lb_policy_name, TraceStringVector* trace_strings) {
const char* lb_policy_name, const grpc_channel_args& args,
TraceStringVector* trace_strings) {
ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
lb_policy_args.args = resolver_result_;
lb_policy_args.args = &args;
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, std::move(lb_policy_args));
@ -480,9 +479,10 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
}
void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
const grpc_channel_args& resolver_result,
TraceStringVector* trace_strings) {
const ServerAddressList* addresses =
FindServerAddressListChannelArg(resolver_result_);
FindServerAddressListChannelArg(&resolver_result);
const bool resolution_contains_addresses =
addresses != nullptr && addresses->size() > 0;
if (!resolution_contains_addresses &&
@ -516,27 +516,16 @@ void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
}
}
// Callback invoked when a resolver result is available.
void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
void* arg, grpc_error* error) {
auto* self = static_cast<ResolvingLoadBalancingPolicy*>(arg);
if (self->tracer_->enabled()) {
const char* disposition =
self->resolver_result_ != nullptr
? ""
: (error == GRPC_ERROR_NONE ? " (transient error)"
: " (resolver shutdown)");
gpr_log(GPR_INFO,
"resolving_lb=%p: got resolver result: resolver_result=%p "
"error=%s%s",
self, self->resolver_result_, grpc_error_string(error),
disposition);
}
// Handle shutdown.
if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) {
self->OnResolverShutdownLocked(GRPC_ERROR_REF(error));
const grpc_channel_args* result) {
// Handle race conditions.
if (resolver_ == nullptr) {
grpc_channel_args_destroy(result);
return;
}
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result %p", this, result);
}
// We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from
@ -547,63 +536,34 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
//
// we track a list of strings to eventually be concatenated and traced.
TraceStringVector trace_strings;
// resolver_result_ will be null in the case of a transient
// resolution error. In that case, we don't have any new result to
// process, which means that we keep using the previous result (if any).
if (self->resolver_result_ == nullptr) {
if (self->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure", self);
}
// If we already have an LB policy from a previous resolution
// result, then we continue to let it set the connectivity state.
// Otherwise, we go into TRANSIENT_FAILURE.
if (self->lb_policy_ == nullptr) {
// TODO(roth): When we change the resolver API to be able to
// return transient errors in a cleaner way, we should make it the
// resolver's responsibility to attach a status to the error,
// rather than doing it centrally here.
grpc_error* state_error = grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
self->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(state_error),
UniquePtr<SubchannelPicker>(
New<TransientFailurePicker>(state_error)));
}
// Parse the resolver result.
const char* lb_policy_name = nullptr;
RefCountedPtr<Config> lb_policy_config;
bool service_config_changed = false;
if (process_resolver_result_ != nullptr) {
service_config_changed =
process_resolver_result_(process_resolver_result_user_data_, *result,
&lb_policy_name, &lb_policy_config);
} else {
// Parse the resolver result.
const char* lb_policy_name = nullptr;
RefCountedPtr<Config> lb_policy_config;
bool service_config_changed = false;
if (self->process_resolver_result_ != nullptr) {
service_config_changed = self->process_resolver_result_(
self->process_resolver_result_user_data_, *self->resolver_result_,
&lb_policy_name, &lb_policy_config);
} else {
lb_policy_name = self->child_policy_name_.get();
lb_policy_config = self->child_lb_config_;
}
GPR_ASSERT(lb_policy_name != nullptr);
self->CreateOrUpdateLbPolicyLocked(
lb_policy_name, std::move(lb_policy_config), &trace_strings);
// Add channel trace event.
if (self->channelz_node() != nullptr) {
if (service_config_changed) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back(gpr_strdup("Service config changed"));
}
self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings);
self->ConcatenateAndAddChannelTraceLocked(&trace_strings);
lb_policy_name = child_policy_name_.get();
lb_policy_config = child_lb_config_;
}
GPR_ASSERT(lb_policy_name != nullptr);
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(lb_policy_name, std::move(lb_policy_config),
*result, &trace_strings);
// Add channel trace event.
if (channelz_node() != nullptr) {
if (service_config_changed) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back(gpr_strdup("Service config changed"));
}
// Clean up.
grpc_channel_args_destroy(self->resolver_result_);
self->resolver_result_ = nullptr;
MaybeAddTraceMessagesForAddressChangesLocked(*result, &trace_strings);
ConcatenateAndAddChannelTraceLocked(&trace_strings);
}
// Renew resolver callback.
self->resolver_->NextLocked(&self->resolver_result_,
&self->on_resolver_result_changed_);
// Clean up.
grpc_channel_args_destroy(result);
}
} // namespace grpc_core

@ -93,6 +93,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
private:
using TraceStringVector = InlinedVector<char*, 3>;
class ResolverResultHandler;
class ResolvingControlHelper;
~ResolvingLoadBalancingPolicy();
@ -101,17 +102,20 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartResolvingLocked();
void OnResolverShutdownLocked(grpc_error* error);
void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked(const char* lb_policy_name,
RefCountedPtr<Config>,
RefCountedPtr<Config> lb_policy_config,
const grpc_channel_args& args,
TraceStringVector* trace_strings);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const char* lb_policy_name, TraceStringVector* trace_strings);
const char* lb_policy_name, const grpc_channel_args& args,
TraceStringVector* trace_strings);
void MaybeAddTraceMessagesForAddressChangesLocked(
const grpc_channel_args& resolver_result,
TraceStringVector* trace_strings);
void ConcatenateAndAddChannelTraceLocked(
TraceStringVector* trace_strings) const;
static void OnResolverResultChangedLocked(void* arg, grpc_error* error);
void OnResolverResultChangedLocked(const grpc_channel_args* result);
// Passed in from caller at construction time.
TraceFlag* tracer_;
@ -124,9 +128,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// Resolver and associated state.
OrphanablePtr<Resolver> resolver_;
bool started_resolving_ = false;
grpc_channel_args* resolver_result_ = nullptr;
bool previous_resolution_contained_addresses_ = false;
grpc_closure on_resolver_result_changed_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> lb_policy_;

@ -56,6 +56,9 @@ grpc_channel_args* grpc_channel_args_union(const grpc_channel_args* a,
/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args* a);
inline void grpc_channel_args_destroy(const grpc_channel_args* a) {
grpc_channel_args_destroy(const_cast<grpc_channel_args*>(a));
}
/** Returns the compression algorithm set in \a a. */
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(

@ -90,7 +90,8 @@ static void my_cancel_ares_request_locked(grpc_ares_request* request) {
}
static grpc_core::OrphanablePtr<grpc_core::Resolver> create_resolver(
const char* name) {
const char* name,
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> result_handler) {
grpc_core::ResolverFactory* factory =
grpc_core::ResolverRegistry::LookupResolverFactory("dns");
grpc_uri* uri = grpc_uri_parse(name, 0);
@ -98,15 +99,52 @@ static grpc_core::OrphanablePtr<grpc_core::Resolver> create_resolver(
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.result_handler = std::move(result_handler);
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(args);
factory->CreateResolver(std::move(args));
grpc_uri_destroy(uri);
return resolver;
}
static void on_done(void* ev, grpc_error* error) {
gpr_event_set(static_cast<gpr_event*>(ev), (void*)1);
}
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
struct ResolverOutput {
const grpc_channel_args* result = nullptr;
grpc_error* error = nullptr;
gpr_event ev;
ResolverOutput() { gpr_event_init(&ev); }
~ResolverOutput() {
grpc_channel_args_destroy(result);
GRPC_ERROR_UNREF(error);
}
};
void SetOutput(ResolverOutput* output) {
gpr_atm_rel_store(&output_, reinterpret_cast<gpr_atm>(output));
}
void ReturnResult(const grpc_channel_args* args) override {
ResolverOutput* output =
reinterpret_cast<ResolverOutput*>(gpr_atm_acq_load(&output_));
GPR_ASSERT(output != nullptr);
output->result = args;
output->error = GRPC_ERROR_NONE;
gpr_event_set(&output->ev, (void*)1);
}
void ReturnError(grpc_error* error) override {
ResolverOutput* output =
reinterpret_cast<ResolverOutput*>(gpr_atm_acq_load(&output_));
GPR_ASSERT(output != nullptr);
output->result = nullptr;
output->error = error;
gpr_event_set(&output->ev, (void*)1);
}
private:
gpr_atm output_ = 0; // ResolverOutput*
};
// interleave waiting for an event with a timer check
static bool wait_loop(int deadline_seconds, gpr_event* ev) {
@ -121,32 +159,6 @@ static bool wait_loop(int deadline_seconds, gpr_event* ev) {
return false;
}
typedef struct next_args {
grpc_core::Resolver* resolver;
grpc_channel_args** result;
grpc_closure* on_complete;
} next_args;
static void call_resolver_next_now_lock_taken(void* arg,
grpc_error* error_unused) {
next_args* a = static_cast<next_args*>(arg);
a->resolver->NextLocked(a->result, a->on_complete);
gpr_free(a);
}
static void call_resolver_next_after_locking(grpc_core::Resolver* resolver,
grpc_channel_args** result,
grpc_closure* on_complete,
grpc_combiner* combiner) {
next_args* a = static_cast<next_args*>(gpr_malloc(sizeof(*a)));
a->resolver = resolver;
a->result = result;
a->on_complete = on_complete;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(call_resolver_next_now_lock_taken, a,
grpc_combiner_scheduler(combiner)),
GRPC_ERROR_NONE);
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
@ -156,33 +168,28 @@ int main(int argc, char** argv) {
grpc_set_resolver_impl(&test_resolver);
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
grpc_channel_args* result = (grpc_channel_args*)1;
{
grpc_core::ExecCtx exec_ctx;
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
create_resolver("dns:test");
gpr_event ev1;
gpr_event_init(&ev1);
call_resolver_next_after_locking(
resolver.get(), &result,
GRPC_CLOSURE_CREATE(on_done, &ev1, grpc_schedule_on_exec_ctx),
g_combiner);
ResultHandler* result_handler = grpc_core::New<ResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver = create_resolver(
"dns:test", grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
result_handler));
ResultHandler::ResolverOutput output1;
result_handler->SetOutput(&output1);
resolver->StartLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(wait_loop(5, &ev1));
GPR_ASSERT(result == nullptr);
gpr_event ev2;
gpr_event_init(&ev2);
call_resolver_next_after_locking(
resolver.get(), &result,
GRPC_CLOSURE_CREATE(on_done, &ev2, grpc_schedule_on_exec_ctx),
g_combiner);
GPR_ASSERT(wait_loop(5, &output1.ev));
GPR_ASSERT(output1.result == nullptr);
GPR_ASSERT(output1.error != GRPC_ERROR_NONE);
ResultHandler::ResolverOutput output2;
result_handler->SetOutput(&output2);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(wait_loop(30, &ev2));
GPR_ASSERT(result != nullptr);
GPR_ASSERT(wait_loop(30, &output2.ev));
GPR_ASSERT(output2.result != nullptr);
GPR_ASSERT(output2.error == GRPC_ERROR_NONE);
grpc_channel_args_destroy(result);
GRPC_COMBINER_UNREF(g_combiner, "test");
}

@ -170,19 +170,52 @@ static void poll_pollset_until_request_done(iomgr_args* args) {
gpr_event_set(&args->ev, (void*)1);
}
struct OnResolutionCallbackArg;
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
using ResultCallback = void (*)(const grpc_channel_args* result,
OnResolutionCallbackArg* state);
void SetCallback(ResultCallback result_cb, OnResolutionCallbackArg* state) {
GPR_ASSERT(result_cb_ == nullptr);
result_cb_ = result_cb;
GPR_ASSERT(state_ == nullptr);
state_ = state;
}
void ReturnResult(const grpc_channel_args* args) override {
GPR_ASSERT(result_cb_ != nullptr);
GPR_ASSERT(state_ != nullptr);
ResultCallback cb = result_cb_;
OnResolutionCallbackArg* state = state_;
result_cb_ = nullptr;
state_ = nullptr;
cb(args, state);
}
void ReturnError(grpc_error* error) override {
gpr_log(GPR_ERROR, "resolver returned error: %s", grpc_error_string(error));
GPR_ASSERT(false);
}
private:
ResultCallback result_cb_ = nullptr;
OnResolutionCallbackArg* state_ = nullptr;
};
struct OnResolutionCallbackArg {
const char* uri_str = nullptr;
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
grpc_channel_args* result = nullptr;
ResultHandler* result_handler;
};
// Set to true by the last callback in the resolution chain.
static bool g_all_callbacks_invoked;
static void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
static void on_second_resolution(const grpc_channel_args* result,
OnResolutionCallbackArg* cb_arg) {
grpc_channel_args_destroy(result);
gpr_log(GPR_INFO, "2nd: g_resolution_count: %d", g_resolution_count);
// The resolution callback was not invoked until new data was
// available, which was delayed until after the cooldown period.
@ -197,18 +230,14 @@ static void on_second_resolution(void* arg, grpc_error* error) {
g_all_callbacks_invoked = true;
}
static void on_first_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
static void on_first_resolution(const grpc_channel_args* result,
OnResolutionCallbackArg* cb_arg) {
grpc_channel_args_destroy(result);
gpr_log(GPR_INFO, "1st: g_resolution_count: %d", g_resolution_count);
// There's one initial system-level resolution and one invocation of a
// notification callback (the current function).
GPR_ASSERT(g_resolution_count == 1);
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_second_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->result_handler->SetCallback(on_second_resolution, cb_arg);
cb_arg->resolver->RequestReresolutionLocked();
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
@ -220,6 +249,8 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
OnResolutionCallbackArg* res_cb_arg =
static_cast<OnResolutionCallbackArg*>(arg);
res_cb_arg->result_handler = grpc_core::New<ResultHandler>();
grpc_core::ResolverFactory* factory =
grpc_core::ResolverRegistry::LookupResolverFactory("dns");
grpc_uri* uri = grpc_uri_parse(res_cb_arg->uri_str, 0);
@ -229,25 +260,22 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.result_handler =
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
res_cb_arg->result_handler);
g_resolution_count = 0;
grpc_arg cooldown_arg;
cooldown_arg.key =
const_cast<char*>(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS);
cooldown_arg.type = GRPC_ARG_INTEGER;
cooldown_arg.value.integer = kMinResolutionPeriodMs;
auto* cooldown_channel_args =
grpc_channel_args_copy_and_add(nullptr, &cooldown_arg, 1);
args.args = cooldown_channel_args;
res_cb_arg->resolver = factory->CreateResolver(args);
grpc_channel_args_destroy(cooldown_channel_args);
grpc_arg cooldown_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS),
kMinResolutionPeriodMs);
grpc_channel_args cooldown_args = {1, &cooldown_arg};
args.args = &cooldown_args;
res_cb_arg->resolver = factory->CreateResolver(std::move(args));
GPR_ASSERT(res_cb_arg->resolver != nullptr);
// First resolution, would incur in system-level resolution.
res_cb_arg->resolver->NextLocked(
&res_cb_arg->result,
GRPC_CLOSURE_CREATE(on_first_resolution, res_cb_arg,
grpc_combiner_scheduler(g_combiner)));
grpc_uri_destroy(uri);
// First resolution, would incur in system-level resolution.
res_cb_arg->result_handler->SetCallback(on_first_resolution, res_cb_arg);
res_cb_arg->resolver->StartLocked();
}
static void test_cooldown() {

@ -39,8 +39,10 @@ static void test_succeeds(grpc_core::ResolverFactory* factory,
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.result_handler =
grpc_core::MakeUnique<grpc_core::Resolver::ResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(args);
factory->CreateResolver(std::move(args));
GPR_ASSERT(resolver != nullptr);
grpc_uri_destroy(uri);
}
@ -55,8 +57,10 @@ static void test_fails(grpc_core::ResolverFactory* factory,
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.result_handler =
grpc_core::MakeUnique<grpc_core::Resolver::ResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(args);
factory->CreateResolver(std::move(args));
GPR_ASSERT(resolver == nullptr);
grpc_uri_destroy(uri);
}

@ -33,9 +33,49 @@
#include "test/core/util/test_config.h"
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
~ResultHandler() override { grpc_channel_args_destroy(expected_); }
void SetExpectedAndEvent(grpc_channel_args* expected, gpr_event* ev) {
GPR_ASSERT(expected_ == nullptr);
GPR_ASSERT(ev_ == nullptr);
expected_ = grpc_channel_args_copy(expected);
ev_ = ev;
}
void ReturnResult(const grpc_channel_args* args) override {
GPR_ASSERT(expected_ != nullptr);
GPR_ASSERT(ev_ != nullptr);
// We only check the addresses channel arg because that's the only one
// explicitly set by the test via
// FakeResolverResponseGenerator::SetResponse().
const grpc_core::ServerAddressList* actual_addresses =
grpc_core::FindServerAddressListChannelArg(args);
const grpc_core::ServerAddressList* expected_addresses =
grpc_core::FindServerAddressListChannelArg(expected_);
GPR_ASSERT(actual_addresses->size() == expected_addresses->size());
for (size_t i = 0; i < expected_addresses->size(); ++i) {
GPR_ASSERT((*actual_addresses)[i] == (*expected_addresses)[i]);
}
grpc_channel_args_destroy(args);
grpc_channel_args_destroy(expected_);
expected_ = nullptr;
gpr_event_set(ev_, (void*)1);
ev_ = nullptr;
}
void ReturnError(grpc_error* error) override {}
private:
grpc_channel_args* expected_ = nullptr;
gpr_event* ev_ = nullptr;
};
static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
grpc_combiner* combiner,
grpc_core::FakeResolverResponseGenerator* response_generator) {
grpc_core::FakeResolverResponseGenerator* response_generator,
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> result_handler) {
grpc_core::ResolverFactory* factory =
grpc_core::ResolverRegistry::LookupResolverFactory("fake");
grpc_arg generator_arg =
@ -45,37 +85,12 @@ static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
grpc_core::ResolverArgs args;
args.args = &channel_args;
args.combiner = combiner;
args.result_handler = std::move(result_handler);
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(args);
factory->CreateResolver(std::move(args));
return resolver;
}
typedef struct on_resolution_arg {
grpc_channel_args* resolver_result;
grpc_channel_args* expected_resolver_result;
gpr_event ev;
} on_resolution_arg;
// Callback to check the resolution result is as expected.
void on_resolution_cb(void* arg, grpc_error* error) {
if (error != GRPC_ERROR_NONE) return;
on_resolution_arg* res = static_cast<on_resolution_arg*>(arg);
// We only check the addresses channel arg because that's the only one
// explicitly set by the test via
// FakeResolverResponseGenerator::SetResponse().
const grpc_core::ServerAddressList* actual_addresses =
grpc_core::FindServerAddressListChannelArg(res->resolver_result);
const grpc_core::ServerAddressList* expected_addresses =
grpc_core::FindServerAddressListChannelArg(res->expected_resolver_result);
GPR_ASSERT(actual_addresses->size() == expected_addresses->size());
for (size_t i = 0; i < expected_addresses->size(); ++i) {
GPR_ASSERT((*actual_addresses)[i] == (*expected_addresses)[i]);
}
grpc_channel_args_destroy(res->resolver_result);
grpc_channel_args_destroy(res->expected_resolver_result);
gpr_event_set(&res->ev, (void*)1);
}
// Create a new resolution containing 2 addresses.
static grpc_channel_args* create_new_resolver_result() {
static size_t test_counter = 0;
@ -115,110 +130,99 @@ static grpc_channel_args* create_new_resolver_result() {
return results;
}
static on_resolution_arg create_on_resolution_arg(grpc_channel_args* results) {
on_resolution_arg on_res_arg;
memset(&on_res_arg, 0, sizeof(on_res_arg));
on_res_arg.expected_resolver_result = results;
gpr_event_init(&on_res_arg.ev);
return on_res_arg;
}
static void test_fake_resolver() {
grpc_core::ExecCtx exec_ctx;
grpc_combiner* combiner = grpc_combiner_create();
// Create resolver.
ResultHandler* result_handler = grpc_core::New<ResultHandler>();
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
build_fake_resolver(combiner, response_generator.get());
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver = build_fake_resolver(
combiner, response_generator.get(),
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(result_handler));
GPR_ASSERT(resolver.get() != nullptr);
resolver->StartLocked();
// Test 1: normal resolution.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
gpr_log(GPR_INFO, "TEST 1");
grpc_channel_args* results = create_new_resolver_result();
on_resolution_arg on_res_arg = create_on_resolution_arg(results);
grpc_closure* on_resolution = GRPC_CLOSURE_CREATE(
on_resolution_cb, &on_res_arg, grpc_combiner_scheduler(combiner));
// Resolution won't be triggered until next_results is set.
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
gpr_event ev1;
gpr_event_init(&ev1);
result_handler->SetExpectedAndEvent(results, &ev1);
response_generator->SetResponse(results);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
grpc_channel_args_destroy(results);
// Test 2: update resolution.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
gpr_log(GPR_INFO, "TEST 2");
results = create_new_resolver_result();
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
// Resolution won't be triggered until next_results is set.
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
gpr_event ev2;
gpr_event_init(&ev2);
result_handler->SetExpectedAndEvent(results, &ev2);
response_generator->SetResponse(results);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
grpc_channel_args_destroy(results);
// Test 3: normal re-resolution.
// next_results == NULL, reresolution_results != NULL.
// Expected response is reresolution_results.
gpr_log(GPR_INFO, "TEST 3");
grpc_channel_args* reresolution_results = create_new_resolver_result();
on_res_arg =
create_on_resolution_arg(grpc_channel_args_copy(reresolution_results));
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
gpr_event ev3;
gpr_event_init(&ev3);
result_handler->SetExpectedAndEvent(reresolution_results, &ev3);
// Set reresolution_results.
// No result will be returned until re-resolution is requested.
response_generator->SetReresolutionResponse(reresolution_results);
// Flush here to guarantee that the response has been set.
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
// Test 4: repeat re-resolution.
// next_results == NULL, reresolution_results != NULL.
// Expected response is reresolution_results.
on_res_arg = create_on_resolution_arg(reresolution_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
gpr_log(GPR_INFO, "TEST 4");
gpr_event ev4;
gpr_event_init(&ev4);
result_handler->SetExpectedAndEvent(reresolution_results, &ev4);
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
grpc_channel_args_destroy(reresolution_results);
// Test 5: normal resolution.
// next_results != NULL, reresolution_results != NULL.
// Expected response is next_results.
gpr_log(GPR_INFO, "TEST 5");
results = create_new_resolver_result();
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
// Resolution won't be triggered until next_results is set.
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
gpr_event ev5;
gpr_event_init(&ev5);
result_handler->SetExpectedAndEvent(results, &ev5);
response_generator->SetResponse(results);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
grpc_channel_args_destroy(results);
// Test 6: no-op.
// Requesting a new resolution without setting the response shouldn't trigger
// the resolution callback.
memset(&on_res_arg, 0, sizeof(on_res_arg));
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_milliseconds_to_deadline(100)) ==
gpr_log(GPR_INFO, "TEST 6");
gpr_event ev6;
gpr_event_init(&ev6);
result_handler->SetExpectedAndEvent(nullptr, &ev6);
GPR_ASSERT(gpr_event_wait(&ev6, grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
// Clean up.
// Note: Need to explicitly unref the resolver and flush the exec_ctx
// to make sure that the final resolver callback (with error set to
// "Resolver Shutdown") is invoked before on_res_arg goes out of scope.
resolver.reset();
grpc_core::ExecCtx::Get()->Flush();
GRPC_COMBINER_UNREF(combiner, "test_fake_resolver");
}

@ -30,15 +30,14 @@
static grpc_combiner* g_combiner;
typedef struct on_resolution_arg {
char* expected_server_name;
grpc_channel_args* resolver_result;
} on_resolution_arg;
void on_resolution_cb(void* arg, grpc_error* error) {
on_resolution_arg* res = static_cast<on_resolution_arg*>(arg);
grpc_channel_args_destroy(res->resolver_result);
}
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
void ReturnResult(const grpc_channel_args* args) override {
grpc_channel_args_destroy(args);
}
void ReturnError(grpc_error* error) override { GRPC_ERROR_UNREF(error); }
};
static void test_succeeds(grpc_core::ResolverFactory* factory,
const char* string) {
@ -50,18 +49,14 @@ static void test_succeeds(grpc_core::ResolverFactory* factory,
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.result_handler =
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
grpc_core::New<ResultHandler>());
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(args);
factory->CreateResolver(std::move(args));
GPR_ASSERT(resolver != nullptr);
on_resolution_arg on_res_arg;
memset(&on_res_arg, 0, sizeof(on_res_arg));
on_res_arg.expected_server_name = uri->path;
grpc_closure* on_resolution = GRPC_CLOSURE_CREATE(
on_resolution_cb, &on_res_arg, grpc_schedule_on_exec_ctx);
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
grpc_uri_destroy(uri);
resolver->StartLocked();
/* Flush ExecCtx to avoid stack-use-after-scope on on_res_arg which is
* accessed in the closure on_resolution_cb */
grpc_core::ExecCtx::Get()->Flush();
@ -77,8 +72,11 @@ static void test_fails(grpc_core::ResolverFactory* factory,
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.result_handler =
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
grpc_core::New<ResultHandler>());
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(args);
factory->CreateResolver(std::move(args));
GPR_ASSERT(resolver == nullptr);
grpc_uri_destroy(uri);
}

@ -160,14 +160,27 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) {
}
}
void CheckResolverResultAssertFailureLocked(void* arg, grpc_error* error) {
EXPECT_NE(error, GRPC_ERROR_NONE);
ArgsStruct* args = static_cast<ArgsStruct*>(arg);
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
class AssertFailureResultHandler : public grpc_core::Resolver::ResultHandler {
public:
explicit AssertFailureResultHandler(ArgsStruct* args) : args_(args) {}
~AssertFailureResultHandler() override {
gpr_atm_rel_store(&args_->done_atm, 1);
gpr_mu_lock(args_->mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(args_->pollset, nullptr));
gpr_mu_unlock(args_->mu);
}
void ReturnResult(const grpc_channel_args* args) override {
GPR_ASSERT(false);
}
void ReturnError(grpc_error* error) override { GPR_ASSERT(false); }
private:
ArgsStruct* args_;
};
void TestCancelActiveDNSQuery(ArgsStruct* args) {
int fake_dns_port = grpc_pick_unused_port_or_die();
@ -180,13 +193,11 @@ void TestCancelActiveDNSQuery(ArgsStruct* args) {
// create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::ResolverRegistry::CreateResolver(
client_target, nullptr, args->pollset_set, args->lock);
client_target, nullptr, args->pollset_set, args->lock,
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
grpc_core::New<AssertFailureResultHandler>(args)));
gpr_free(client_target);
grpc_closure on_resolver_result_changed;
GRPC_CLOSURE_INIT(&on_resolver_result_changed,
CheckResolverResultAssertFailureLocked, (void*)args,
grpc_combiner_scheduler(args->lock));
resolver->NextLocked(&args->channel_args, &on_resolver_result_changed);
resolver->StartLocked();
// Without resetting and causing resolver shutdown, the
// PollPollsetUntilRequestDone call should never finish.
resolver.reset();

@ -239,7 +239,7 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) {
gpr_event_set(&args->ev, (void*)1);
}
void CheckServiceConfigResultLocked(grpc_channel_args* channel_args,
void CheckServiceConfigResultLocked(const grpc_channel_args* channel_args,
ArgsStruct* args) {
const grpc_arg* service_config_arg =
grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG);
@ -253,7 +253,7 @@ void CheckServiceConfigResultLocked(grpc_channel_args* channel_args,
}
}
void CheckLBPolicyResultLocked(grpc_channel_args* channel_args,
void CheckLBPolicyResultLocked(const grpc_channel_args* channel_args,
ArgsStruct* args) {
const grpc_arg* lb_policy_arg =
grpc_channel_args_find(channel_args, GRPC_ARG_LB_POLICY_NAME);
@ -394,54 +394,86 @@ void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) {
}
#endif
void CheckResolverResultLocked(void* argsp, grpc_error* err) {
EXPECT_EQ(err, GRPC_ERROR_NONE);
ArgsStruct* args = (ArgsStruct*)argsp;
grpc_channel_args* channel_args = args->channel_args;
grpc_core::ServerAddressList* addresses =
grpc_core::FindServerAddressListChannelArg(channel_args);
gpr_log(GPR_INFO, "num addrs found: %" PRIdPTR ". expected %" PRIdPTR,
addresses->size(), args->expected_addrs.size());
GPR_ASSERT(addresses->size() == args->expected_addrs.size());
std::vector<GrpcLBAddress> found_lb_addrs;
for (size_t i = 0; i < addresses->size(); i++) {
grpc_core::ServerAddress& addr = (*addresses)[i];
char* str;
grpc_sockaddr_to_string(&str, &addr.address(), 1 /* normalize */);
gpr_log(GPR_INFO, "%s", str);
found_lb_addrs.emplace_back(
GrpcLBAddress(std::string(str), addr.IsBalancer()));
gpr_free(str);
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
static grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> Create(
ArgsStruct* args) {
return grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
grpc_core::New<ResultHandler>(args));
}
if (args->expected_addrs.size() != found_lb_addrs.size()) {
gpr_log(GPR_DEBUG,
"found lb addrs size is: %" PRIdPTR
". expected addrs size is %" PRIdPTR,
found_lb_addrs.size(), args->expected_addrs.size());
abort();
explicit ResultHandler(ArgsStruct* args) : args_(args) {}
void ReturnResult(const grpc_channel_args* result) override {
CheckResult(result);
gpr_atm_rel_store(&args_->done_atm, 1);
gpr_mu_lock(args_->mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(args_->pollset, nullptr));
gpr_mu_unlock(args_->mu);
grpc_channel_args_destroy(result);
}
EXPECT_THAT(args->expected_addrs, UnorderedElementsAreArray(found_lb_addrs));
CheckServiceConfigResultLocked(channel_args, args);
if (args->expected_service_config_string == "") {
CheckLBPolicyResultLocked(channel_args, args);
void ReturnError(grpc_error* error) override {
gpr_log(GPR_ERROR, "resolver returned error: %s", grpc_error_string(error));
GPR_ASSERT(false);
}
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
void CheckResolvedWithoutErrorLocked(void* argsp, grpc_error* err) {
EXPECT_EQ(err, GRPC_ERROR_NONE);
ArgsStruct* args = (ArgsStruct*)argsp;
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
virtual void CheckResult(const grpc_channel_args* channel_args) {}
protected:
ArgsStruct* args_struct() const { return args_; }
private:
ArgsStruct* args_;
};
class CheckingResultHandler : public ResultHandler {
public:
static grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> Create(
ArgsStruct* args) {
return grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler>(
grpc_core::New<CheckingResultHandler>(args));
}
explicit CheckingResultHandler(ArgsStruct* args) : ResultHandler(args) {}
void CheckResult(const grpc_channel_args* channel_args) override {
ArgsStruct* args = args_struct();
grpc_core::ServerAddressList* addresses =
grpc_core::FindServerAddressListChannelArg(channel_args);
gpr_log(GPR_INFO, "num addrs found: %" PRIdPTR ". expected %" PRIdPTR,
addresses->size(), args->expected_addrs.size());
GPR_ASSERT(addresses->size() == args->expected_addrs.size());
std::vector<GrpcLBAddress> found_lb_addrs;
for (size_t i = 0; i < addresses->size(); i++) {
grpc_core::ServerAddress& addr = (*addresses)[i];
char* str;
grpc_sockaddr_to_string(&str, &addr.address(), 1 /* normalize */);
gpr_log(GPR_INFO, "%s", str);
found_lb_addrs.emplace_back(
GrpcLBAddress(std::string(str), addr.IsBalancer()));
gpr_free(str);
}
if (args->expected_addrs.size() != found_lb_addrs.size()) {
gpr_log(GPR_DEBUG,
"found lb addrs size is: %" PRIdPTR
". expected addrs size is %" PRIdPTR,
found_lb_addrs.size(), args->expected_addrs.size());
abort();
}
EXPECT_THAT(args->expected_addrs,
UnorderedElementsAreArray(found_lb_addrs));
CheckServiceConfigResultLocked(channel_args, args);
if (args->expected_service_config_string == "") {
CheckLBPolicyResultLocked(channel_args, args);
}
}
};
void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg,
grpc_error* error)) {
void RunResolvesRelevantRecordsTest(
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> (
*CreateResultHandler)(ArgsStruct* args)) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
@ -491,20 +523,18 @@ void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg,
// create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::ResolverRegistry::CreateResolver(whole_uri, resolver_args,
args.pollset_set, args.lock);
args.pollset_set, args.lock,
CreateResultHandler(&args));
grpc_channel_args_destroy(resolver_args);
gpr_free(whole_uri);
grpc_closure on_resolver_result_changed;
GRPC_CLOSURE_INIT(&on_resolver_result_changed, OnDoneLocked, (void*)&args,
grpc_combiner_scheduler(args.lock));
resolver->NextLocked(&args.channel_args, &on_resolver_result_changed);
resolver->StartLocked();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args);
ArgsFinish(&args);
}
TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
RunResolvesRelevantRecordsTest(CheckResolverResultLocked);
RunResolvesRelevantRecordsTest(CheckingResultHandler::Create);
}
TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
@ -515,7 +545,7 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, dummy_port,
&done_ev);
// Run the resolver test
RunResolvesRelevantRecordsTest(CheckResolvedWithoutErrorLocked);
RunResolvesRelevantRecordsTest(ResultHandler::Create);
// Shutdown and join stress thread
gpr_event_set(&done_ev, (void*)1);
socket_stress_thread.join();

Loading…
Cancel
Save