diff --git a/src/core/ext/filters/client_channel/resolver.cc b/src/core/ext/filters/client_channel/resolver.cc index 601b08be246..5d14d51d011 100644 --- a/src/core/ext/filters/client_channel/resolver.cc +++ b/src/core/ext/filters/client_channel/resolver.cc @@ -26,8 +26,10 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_resolver_refcount(false, namespace grpc_core { -Resolver::Resolver(grpc_combiner* combiner) +Resolver::Resolver(grpc_combiner* combiner, + UniquePtr result_handler) : InternallyRefCounted(&grpc_trace_resolver_refcount), + result_handler_(std::move(result_handler)), combiner_(GRPC_COMBINER_REF(combiner, "resolver")) {} Resolver::~Resolver() { GRPC_COMBINER_UNREF(combiner_, "resolver"); } diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 9da849a1017..790779cfe75 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -46,27 +46,34 @@ namespace grpc_core { /// combiner passed to the constructor. class Resolver : public InternallyRefCounted { public: + /// A proxy object used by the resolver to return results to the + /// client channel. + class ResultHandler { + public: + virtual ~ResultHandler() {} + + /// Returns a result to the channel. + /// The list of addresses will be in GRPC_ARG_SERVER_ADDRESS_LIST. + /// The service config (if any) will be in GRPC_ARG_SERVICE_CONFIG. + /// Takes ownership of \a result. + // TODO(roth): Change this API so that addresses and service config are + // passed explicitly instead of being in channel args. + virtual void ReturnResult(const grpc_channel_args* result) GRPC_ABSTRACT; + + /// Returns a transient error to the channel. + /// If the resolver does not set the GRPC_ERROR_INT_GRPC_STATUS + /// attribute on the error, calls will be failed with status UNKNOWN. + virtual void ReturnError(grpc_error* error) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS + }; + // Not copyable nor movable. Resolver(const Resolver&) = delete; Resolver& operator=(const Resolver&) = delete; - /// Requests a callback when a new result becomes available. - /// When the new result is available, sets \a *result to the new result - /// and schedules \a on_complete for execution. - /// Upon transient failure, sets \a *result to nullptr and schedules - /// \a on_complete with no error. - /// If resolution is fatally broken, sets \a *result to nullptr and - /// schedules \a on_complete with an error. - /// TODO(roth): When we have time, improve the way this API represents - /// transient failure vs. shutdown. - /// - /// Note that the client channel will almost always have a request - /// to \a NextLocked() pending. When it gets the callback, it will - /// process the new result and then immediately make another call to - /// \a NextLocked(). This allows push-based resolvers to provide new - /// data as soon as it becomes available. - virtual void NextLocked(grpc_channel_args** result, - grpc_closure* on_complete) GRPC_ABSTRACT; + /// Starts resolving. + virtual void StartLocked() GRPC_ABSTRACT; /// Asks the resolver to obtain an updated resolver result, if /// applicable. @@ -79,8 +86,8 @@ class Resolver : public InternallyRefCounted { /// /// For push-based implementations, this may be a no-op. /// - /// If this causes new data to become available, then the currently - /// pending call to \a NextLocked() will return the new result. + /// Note: Implementations must not invoke any method on the + /// ResultHandler from within this call. virtual void RequestReresolutionLocked() {} /// Resets the re-resolution backoff, if any. @@ -108,16 +115,18 @@ class Resolver : public InternallyRefCounted { // TODO(roth): Once we have a C++-like interface for combiners, this // API should change to take a RefCountedPtr<>, so that we always take // ownership of a new ref. - explicit Resolver(grpc_combiner* combiner); + explicit Resolver(grpc_combiner* combiner, + UniquePtr result_handler); virtual ~Resolver(); - /// Shuts down the resolver. If there is a pending call to - /// NextLocked(), the callback will be scheduled with an error. + /// Shuts down the resolver. virtual void ShutdownLocked() GRPC_ABSTRACT; grpc_combiner* combiner() const { return combiner_; } + ResultHandler* result_handler() const { return result_handler_.get(); } + private: static void ShutdownAndUnrefLocked(void* arg, grpc_error* ignored) { Resolver* resolver = static_cast(arg); @@ -125,6 +134,7 @@ class Resolver : public InternallyRefCounted { resolver->Unref(); } + UniquePtr result_handler_; grpc_combiner* combiner_; }; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index c99943ab2f1..249b9e3958c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -60,10 +60,9 @@ const char kDefaultPort[] = "https"; class AresDnsResolver : public Resolver { public: - explicit AresDnsResolver(const ResolverArgs& args); + explicit AresDnsResolver(ResolverArgs args); - void NextLocked(grpc_channel_args** result, - grpc_closure* on_complete) override; + void StartLocked() override; void RequestReresolutionLocked() override; @@ -76,7 +75,6 @@ class AresDnsResolver : public Resolver { void MaybeStartResolvingLocked(); void StartResolvingLocked(); - void MaybeFinishNextLocked(); static void OnNextResolutionLocked(void* arg, grpc_error* error); static void OnResolvedLocked(void* arg, grpc_error* error); @@ -98,16 +96,6 @@ class AresDnsResolver : public Resolver { bool resolving_ = false; /// the pending resolving request grpc_ares_request* pending_request_ = nullptr; - /// which version of the result have we published? - int published_version_ = 0; - /// which version of the result is current? - int resolved_version_ = 0; - /// pending next completion, or NULL - grpc_closure* next_completion_ = nullptr; - /// target result address for next completion - grpc_channel_args** target_result_ = nullptr; - /// current (fully resolved) result - grpc_channel_args* resolved_result_ = nullptr; /// next resolution timer bool have_next_resolution_timer_ = false; grpc_timer next_resolution_timer_; @@ -129,8 +117,8 @@ class AresDnsResolver : public Resolver { bool enable_srv_queries_; }; -AresDnsResolver::AresDnsResolver(const ResolverArgs& args) - : Resolver(args.combiner), +AresDnsResolver::AresDnsResolver(ResolverArgs args) + : Resolver(args.combiner, std::move(args.result_handler)), backoff_( BackOff::Options() .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * @@ -177,27 +165,16 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args) AresDnsResolver::~AresDnsResolver() { GRPC_CARES_TRACE_LOG("resolver:%p destroying AresDnsResolver", this); - if (resolved_result_ != nullptr) { - grpc_channel_args_destroy(resolved_result_); - } grpc_pollset_set_destroy(interested_parties_); gpr_free(dns_server_); gpr_free(name_to_resolve_); grpc_channel_args_destroy(channel_args_); } -void AresDnsResolver::NextLocked(grpc_channel_args** target_result, - grpc_closure* on_complete) { - GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::NextLocked() is called.", +void AresDnsResolver::StartLocked() { + GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::StartLocked() is called.", this); - GPR_ASSERT(next_completion_ == nullptr); - next_completion_ = on_complete; - target_result_ = target_result; - if (resolved_version_ == 0 && !resolving_) { - MaybeStartResolvingLocked(); - } else { - MaybeFinishNextLocked(); - } + MaybeStartResolvingLocked(); } void AresDnsResolver::RequestReresolutionLocked() { @@ -221,12 +198,6 @@ void AresDnsResolver::ShutdownLocked() { if (pending_request_ != nullptr) { grpc_cancel_ares_request_locked(pending_request_); } - if (next_completion_ != nullptr) { - *target_result_ = nullptr; - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); - next_completion_ = nullptr; - } } void AresDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) { @@ -319,11 +290,14 @@ char* ChooseServiceConfig(char* service_config_choice_json) { void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { AresDnsResolver* r = static_cast(arg); - grpc_channel_args* result = nullptr; GPR_ASSERT(r->resolving_); r->resolving_ = false; gpr_free(r->pending_request_); r->pending_request_ = nullptr; + if (r->shutdown_initiated_) { + r->Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown"); + return; + } if (r->addresses_ != nullptr) { static const char* args_to_remove[1]; size_t num_args_to_remove = 0; @@ -343,17 +317,22 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { (char*)GRPC_ARG_SERVICE_CONFIG, service_config_string); } } - result = grpc_channel_args_copy_and_add_and_remove( + r->result_handler()->ReturnResult(grpc_channel_args_copy_and_add_and_remove( r->channel_args_, args_to_remove, num_args_to_remove, args_to_add, - num_args_to_add); + num_args_to_add)); gpr_free(service_config_string); r->addresses_.reset(); // Reset backoff state so that we start from the beginning when the // next request gets triggered. r->backoff_.Reset(); - } else if (!r->shutdown_initiated_) { - const char* msg = grpc_error_string(error); - GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r, msg); + } else { + GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r, + grpc_error_string(error)); + r->result_handler()->ReturnError(grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "DNS resolution failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + // Set retry timer. grpc_millis next_try = r->backoff_.NextAttemptTime(); grpc_millis timeout = next_try - ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed (will retry): %s", @@ -363,8 +342,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - RefCountedPtr self = r->Ref(DEBUG_LOCATION, "retry-timer"); - self.release(); + r->Ref(DEBUG_LOCATION, "retry-timer").release(); if (timeout > 0) { GRPC_CARES_TRACE_LOG("resolver:%p retrying in %" PRId64 " milliseconds", r, timeout); @@ -374,12 +352,6 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { grpc_timer_init(&r->next_resolution_timer_, next_try, &r->on_next_resolution_); } - if (r->resolved_result_ != nullptr) { - grpc_channel_args_destroy(r->resolved_result_); - } - r->resolved_result_ = result; - ++r->resolved_version_; - r->MaybeFinishNextLocked(); r->Unref(DEBUG_LOCATION, "dns-resolving"); } @@ -403,9 +375,7 @@ void AresDnsResolver::MaybeStartResolvingLocked() { // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - RefCountedPtr self = - Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); - self.release(); + Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release(); grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, &on_next_resolution_); return; @@ -418,8 +388,7 @@ void AresDnsResolver::StartResolvingLocked() { // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - RefCountedPtr self = Ref(DEBUG_LOCATION, "dns-resolving"); - self.release(); + Ref(DEBUG_LOCATION, "dns-resolving").release(); GPR_ASSERT(!resolving_); resolving_ = true; service_config_json_ = nullptr; @@ -433,28 +402,14 @@ void AresDnsResolver::StartResolvingLocked() { this, pending_request_); } -void AresDnsResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && resolved_version_ != published_version_) { - *target_result_ = resolved_result_ == nullptr - ? nullptr - : grpc_channel_args_copy(resolved_result_); - GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::MaybeFinishNextLocked()", - this); - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); - next_completion_ = nullptr; - published_version_ = resolved_version_; - } -} - // // Factory // class AresDnsResolverFactory : public ResolverFactory { public: - OrphanablePtr CreateResolver( - const ResolverArgs& args) const override { - return OrphanablePtr(New(args)); + OrphanablePtr CreateResolver(ResolverArgs args) const override { + return OrphanablePtr(New(std::move(args))); } const char* scheme() const override { return "dns"; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index c365f1abfd8..1c0fe1c6717 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -51,10 +51,9 @@ const char kDefaultPort[] = "https"; class NativeDnsResolver : public Resolver { public: - explicit NativeDnsResolver(const ResolverArgs& args); + explicit NativeDnsResolver(ResolverArgs args); - void NextLocked(grpc_channel_args** result, - grpc_closure* on_complete) override; + void StartLocked() override; void RequestReresolutionLocked() override; @@ -67,7 +66,6 @@ class NativeDnsResolver : public Resolver { void MaybeStartResolvingLocked(); void StartResolvingLocked(); - void MaybeFinishNextLocked(); static void OnNextResolutionLocked(void* arg, grpc_error* error); static void OnResolvedLocked(void* arg, grpc_error* error); @@ -78,19 +76,11 @@ class NativeDnsResolver : public Resolver { grpc_channel_args* channel_args_ = nullptr; /// pollset_set to drive the name resolution process grpc_pollset_set* interested_parties_ = nullptr; + /// are we shutting down? + bool shutdown_ = false; /// are we currently resolving? bool resolving_ = false; grpc_closure on_resolved_; - /// which version of the result have we published? - int published_version_ = 0; - /// which version of the result is current? - int resolved_version_ = 0; - /// pending next completion, or nullptr - grpc_closure* next_completion_ = nullptr; - /// target result address for next completion - grpc_channel_args** target_result_ = nullptr; - /// current (fully resolved) result - grpc_channel_args* resolved_result_ = nullptr; /// next resolution timer bool have_next_resolution_timer_ = false; grpc_timer next_resolution_timer_; @@ -105,8 +95,8 @@ class NativeDnsResolver : public Resolver { grpc_resolved_addresses* addresses_ = nullptr; }; -NativeDnsResolver::NativeDnsResolver(const ResolverArgs& args) - : Resolver(args.combiner), +NativeDnsResolver::NativeDnsResolver(ResolverArgs args) + : Resolver(args.combiner, std::move(args.result_handler)), backoff_( BackOff::Options() .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * @@ -134,25 +124,12 @@ NativeDnsResolver::NativeDnsResolver(const ResolverArgs& args) } NativeDnsResolver::~NativeDnsResolver() { - if (resolved_result_ != nullptr) { - grpc_channel_args_destroy(resolved_result_); - } + grpc_channel_args_destroy(channel_args_); grpc_pollset_set_destroy(interested_parties_); gpr_free(name_to_resolve_); - grpc_channel_args_destroy(channel_args_); } -void NativeDnsResolver::NextLocked(grpc_channel_args** result, - grpc_closure* on_complete) { - GPR_ASSERT(next_completion_ == nullptr); - next_completion_ = on_complete; - target_result_ = result; - if (resolved_version_ == 0 && !resolving_) { - MaybeStartResolvingLocked(); - } else { - MaybeFinishNextLocked(); - } -} +void NativeDnsResolver::StartLocked() { MaybeStartResolvingLocked(); } void NativeDnsResolver::RequestReresolutionLocked() { if (!resolving_) { @@ -168,15 +145,10 @@ void NativeDnsResolver::ResetBackoffLocked() { } void NativeDnsResolver::ShutdownLocked() { + shutdown_ = true; if (have_next_resolution_timer_) { grpc_timer_cancel(&next_resolution_timer_); } - if (next_completion_ != nullptr) { - *target_result_ = nullptr; - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); - next_completion_ = nullptr; - } } void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) { @@ -190,38 +162,42 @@ void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) { void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { NativeDnsResolver* r = static_cast(arg); - grpc_channel_args* result = nullptr; GPR_ASSERT(r->resolving_); r->resolving_ = false; - GRPC_ERROR_REF(error); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(r->name_to_resolve_)); + if (r->shutdown_) { + r->Unref(DEBUG_LOCATION, "dns-resolving"); + return; + } if (r->addresses_ != nullptr) { ServerAddressList addresses; for (size_t i = 0; i < r->addresses_->naddrs; ++i) { addresses.emplace_back(&r->addresses_->addrs[i].addr, r->addresses_->addrs[i].len, nullptr /* args */); } - grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses); - result = grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses_); + grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses); + r->result_handler()->ReturnResult( + grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1)); // Reset backoff state so that we start from the beginning when the // next request gets triggered. r->backoff_.Reset(); } else { - grpc_millis next_try = r->backoff_.NextAttemptTime(); - grpc_millis timeout = next_try - ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); + // Return transient error. + r->result_handler()->ReturnError(grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "DNS resolution failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + // Set up for retry. + grpc_millis next_try = r->backoff_.NextAttemptTime(); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); GPR_ASSERT(!r->have_next_resolution_timer_); r->have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - RefCountedPtr self = - r->Ref(DEBUG_LOCATION, "next_resolution_timer"); - self.release(); + r->Ref(DEBUG_LOCATION, "next_resolution_timer").release(); if (timeout > 0) { gpr_log(GPR_DEBUG, "retrying in %" PRId64 " milliseconds", timeout); } else { @@ -230,13 +206,6 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { grpc_timer_init(&r->next_resolution_timer_, next_try, &r->on_next_resolution_); } - if (r->resolved_result_ != nullptr) { - grpc_channel_args_destroy(r->resolved_result_); - } - r->resolved_result_ = result; - ++r->resolved_version_; - r->MaybeFinishNextLocked(); - GRPC_ERROR_UNREF(error); r->Unref(DEBUG_LOCATION, "dns-resolving"); } @@ -260,9 +229,7 @@ void NativeDnsResolver::MaybeStartResolvingLocked() { // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - RefCountedPtr self = - Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); - self.release(); + Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release(); grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, &on_next_resolution_); return; @@ -276,8 +243,7 @@ void NativeDnsResolver::StartResolvingLocked() { // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - RefCountedPtr self = Ref(DEBUG_LOCATION, "dns-resolving"); - self.release(); + Ref(DEBUG_LOCATION, "dns-resolving").release(); GPR_ASSERT(!resolving_); resolving_ = true; addresses_ = nullptr; @@ -286,30 +252,18 @@ void NativeDnsResolver::StartResolvingLocked() { last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); } -void NativeDnsResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && resolved_version_ != published_version_) { - *target_result_ = resolved_result_ == nullptr - ? nullptr - : grpc_channel_args_copy(resolved_result_); - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); - next_completion_ = nullptr; - published_version_ = resolved_version_; - } -} - // // Factory // class NativeDnsResolverFactory : public ResolverFactory { public: - OrphanablePtr CreateResolver( - const ResolverArgs& args) const override { + OrphanablePtr CreateResolver(ResolverArgs args) const override { if (GPR_UNLIKELY(0 != strcmp(args.uri->authority, ""))) { gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return OrphanablePtr(nullptr); } - return OrphanablePtr(New(args)); + return OrphanablePtr(New(std::move(args))); } const char* scheme() const override { return "dns"; } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 3489f3d491b..153279e323e 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -50,10 +50,9 @@ namespace grpc_core { // FakeResolverResponseGenerator. class FakeResolver : public Resolver { public: - explicit FakeResolver(const ResolverArgs& args); + explicit FakeResolver(ResolverArgs args); - void NextLocked(grpc_channel_args** result, - grpc_closure* on_complete) override; + void StartLocked() override; void RequestReresolutionLocked() override; @@ -62,27 +61,32 @@ class FakeResolver : public Resolver { virtual ~FakeResolver(); - void MaybeFinishNextLocked(); + void ShutdownLocked() override { active_ = false; } - void ShutdownLocked() override; + void MaybeSendResultLocked(); + + static void ReturnReresolutionResult(void* arg, grpc_error* error); // passed-in parameters grpc_channel_args* channel_args_ = nullptr; - // If not NULL, the next set of resolution results to be returned to - // NextLocked()'s closure. + // If not NULL, the next set of resolution results to be returned. grpc_channel_args* next_results_ = nullptr; // Results to use for the pretended re-resolution in // RequestReresolutionLocked(). grpc_channel_args* reresolution_results_ = nullptr; - // pending next completion, or NULL - grpc_closure* next_completion_ = nullptr; - // target result address for next completion - grpc_channel_args** target_result_ = nullptr; + // True between the calls to StartLocked() ShutdownLocked(). + bool active_ = false; // if true, return failure bool return_failure_ = false; + // pending re-resolution + grpc_closure reresolution_closure_; + bool reresolution_closure_pending_ = false; }; -FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) { +FakeResolver::FakeResolver(ResolverArgs args) + : Resolver(args.combiner, std::move(args.result_handler)) { + GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this, + grpc_combiner_scheduler(combiner())); channel_args_ = grpc_channel_args_copy(args.args); FakeResolverResponseGenerator* response_generator = FakeResolverResponseGenerator::GetFromArgs(args.args); @@ -102,46 +106,51 @@ FakeResolver::~FakeResolver() { grpc_channel_args_destroy(channel_args_); } -void FakeResolver::NextLocked(grpc_channel_args** target_result, - grpc_closure* on_complete) { - GPR_ASSERT(next_completion_ == nullptr); - next_completion_ = on_complete; - target_result_ = target_result; - MaybeFinishNextLocked(); +void FakeResolver::StartLocked() { + active_ = true; + MaybeSendResultLocked(); } void FakeResolver::RequestReresolutionLocked() { if (reresolution_results_ != nullptr || return_failure_) { grpc_channel_args_destroy(next_results_); next_results_ = grpc_channel_args_copy(reresolution_results_); - MaybeFinishNextLocked(); + // Return the result in a different closure, so that we don't call + // back into the LB policy while it's still processing the previous + // update. + if (!reresolution_closure_pending_) { + reresolution_closure_pending_ = true; + Ref().release(); // ref held by closure + GRPC_CLOSURE_SCHED(&reresolution_closure_, GRPC_ERROR_NONE); + } } } -void FakeResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && - (next_results_ != nullptr || return_failure_)) { +void FakeResolver::MaybeSendResultLocked() { + if (!active_) return; + if (return_failure_) { + // TODO(roth): Change resolver result generator to be able to inject + // the error to be returned. + result_handler()->ReturnError(grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver transient failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + return_failure_ = false; + } else if (next_results_ != nullptr) { // When both next_results_ and channel_args_ contain an arg with the same // name, only the one in next_results_ will be kept since next_results_ is // before channel_args_. - *target_result_ = - return_failure_ ? nullptr - : grpc_channel_args_union(next_results_, channel_args_); + result_handler()->ReturnResult( + grpc_channel_args_union(next_results_, channel_args_)); grpc_channel_args_destroy(next_results_); next_results_ = nullptr; - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); - next_completion_ = nullptr; - return_failure_ = false; } } -void FakeResolver::ShutdownLocked() { - if (next_completion_ != nullptr) { - *target_result_ = nullptr; - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); - next_completion_ = nullptr; - } +void FakeResolver::ReturnReresolutionResult(void* arg, grpc_error* error) { + FakeResolver* self = static_cast(arg); + self->reresolution_closure_pending_ = false; + self->MaybeSendResultLocked(); + self->Unref(); } // @@ -161,7 +170,7 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg, FakeResolver* resolver = closure_arg->generator->resolver_; grpc_channel_args_destroy(resolver->next_results_); resolver->next_results_ = closure_arg->response; - resolver->MaybeFinishNextLocked(); + resolver->MaybeSendResultLocked(); Delete(closure_arg); } @@ -210,7 +219,7 @@ void FakeResolverResponseGenerator::SetFailureLocked(void* arg, SetResponseClosureArg* closure_arg = static_cast(arg); FakeResolver* resolver = closure_arg->generator->resolver_; resolver->return_failure_ = true; - if (closure_arg->immediate) resolver->MaybeFinishNextLocked(); + if (closure_arg->immediate) resolver->MaybeSendResultLocked(); Delete(closure_arg); } @@ -290,9 +299,8 @@ namespace { class FakeResolverFactory : public ResolverFactory { public: - OrphanablePtr CreateResolver( - const ResolverArgs& args) const override { - return OrphanablePtr(New(args)); + OrphanablePtr CreateResolver(ResolverArgs args) const override { + return OrphanablePtr(New(std::move(args))); } const char* scheme() const override { return "fake"; } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index f423e6d46db..9e3ec1fb7cb 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -58,8 +58,7 @@ class FakeResolverResponseGenerator // is called. void SetReresolutionResponse(grpc_channel_args* response); - // Tells the resolver to return a transient failure (signalled by - // returning a null result with no error). + // Tells the resolver to return a transient failure. void SetFailure(); // Same as SetFailure(), but instead of returning the error diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 1654747a79f..df93c76399d 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -45,66 +45,29 @@ namespace { class SockaddrResolver : public Resolver { public: /// Takes ownership of \a addresses. - SockaddrResolver(const ResolverArgs& args, - UniquePtr addresses); + explicit SockaddrResolver(ResolverArgs args); + ~SockaddrResolver() override; - void NextLocked(grpc_channel_args** result, - grpc_closure* on_complete) override; + void StartLocked() override; - void ShutdownLocked() override; + void ShutdownLocked() override {} private: - virtual ~SockaddrResolver(); - - void MaybeFinishNextLocked(); - - /// the addresses that we've "resolved" - UniquePtr addresses_; /// channel args - grpc_channel_args* channel_args_ = nullptr; - /// have we published? - bool published_ = false; - /// pending next completion, or NULL - grpc_closure* next_completion_ = nullptr; - /// target result address for next completion - grpc_channel_args** target_result_ = nullptr; + const grpc_channel_args* channel_args_ = nullptr; }; -SockaddrResolver::SockaddrResolver(const ResolverArgs& args, - UniquePtr addresses) - : Resolver(args.combiner), - addresses_(std::move(addresses)), - channel_args_(grpc_channel_args_copy(args.args)) {} +SockaddrResolver::SockaddrResolver(ResolverArgs args) + : Resolver(args.combiner, std::move(args.result_handler)), + channel_args_(args.args) {} SockaddrResolver::~SockaddrResolver() { grpc_channel_args_destroy(channel_args_); } -void SockaddrResolver::NextLocked(grpc_channel_args** target_result, - grpc_closure* on_complete) { - GPR_ASSERT(!next_completion_); - next_completion_ = on_complete; - target_result_ = target_result; - MaybeFinishNextLocked(); -} - -void SockaddrResolver::ShutdownLocked() { - if (next_completion_ != nullptr) { - *target_result_ = nullptr; - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Resolver Shutdown")); - next_completion_ = nullptr; - } -} - -void SockaddrResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && !published_) { - published_ = true; - grpc_arg arg = CreateServerAddressListChannelArg(addresses_.get()); - *target_result_ = grpc_channel_args_copy_and_add(channel_args_, &arg, 1); - GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); - next_completion_ = nullptr; - } +void SockaddrResolver::StartLocked() { + result_handler()->ReturnResult(channel_args_); + channel_args_ = nullptr; } // @@ -114,7 +77,7 @@ void SockaddrResolver::MaybeFinishNextLocked() { void DoNothing(void* ignored) {} OrphanablePtr CreateSockaddrResolver( - const ResolverArgs& args, + ResolverArgs args, bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) { if (0 != strcmp(args.uri->authority, "")) { gpr_log(GPR_ERROR, "authority-based URIs not supported by the %s scheme", @@ -127,7 +90,7 @@ OrphanablePtr CreateSockaddrResolver( grpc_slice_buffer path_parts; grpc_slice_buffer_init(&path_parts); grpc_slice_split(path_slice, ",", &path_parts); - auto addresses = MakeUnique(); + ServerAddressList addresses; bool errors_found = false; for (size_t i = 0; i < path_parts.count; i++) { grpc_uri ith_uri = *args.uri; @@ -135,26 +98,28 @@ OrphanablePtr CreateSockaddrResolver( ith_uri.path = part_str.get(); grpc_resolved_address addr; if (!parse(&ith_uri, &addr)) { - errors_found = true; /* GPR_TRUE */ + errors_found = true; break; } - addresses->emplace_back(addr, nullptr /* args */); + addresses.emplace_back(addr, nullptr /* args */); } grpc_slice_buffer_destroy_internal(&path_parts); grpc_slice_unref_internal(path_slice); if (errors_found) { return OrphanablePtr(nullptr); } + // Add addresses to channel args. + // Note: SockaddrResolver takes ownership of channel args. + grpc_arg arg = CreateServerAddressListChannelArg(&addresses); + args.args = grpc_channel_args_copy_and_add(args.args, &arg, 1); // Instantiate resolver. - return OrphanablePtr( - New(args, std::move(addresses))); + return OrphanablePtr(New(std::move(args))); } class IPv4ResolverFactory : public ResolverFactory { public: - OrphanablePtr CreateResolver( - const ResolverArgs& args) const override { - return CreateSockaddrResolver(args, grpc_parse_ipv4); + OrphanablePtr CreateResolver(ResolverArgs args) const override { + return CreateSockaddrResolver(std::move(args), grpc_parse_ipv4); } const char* scheme() const override { return "ipv4"; } @@ -162,9 +127,8 @@ class IPv4ResolverFactory : public ResolverFactory { class IPv6ResolverFactory : public ResolverFactory { public: - OrphanablePtr CreateResolver( - const ResolverArgs& args) const override { - return CreateSockaddrResolver(args, grpc_parse_ipv6); + OrphanablePtr CreateResolver(ResolverArgs args) const override { + return CreateSockaddrResolver(std::move(args), grpc_parse_ipv6); } const char* scheme() const override { return "ipv6"; } @@ -173,9 +137,8 @@ class IPv6ResolverFactory : public ResolverFactory { #ifdef GRPC_HAVE_UNIX_SOCKET class UnixResolverFactory : public ResolverFactory { public: - OrphanablePtr CreateResolver( - const ResolverArgs& args) const override { - return CreateSockaddrResolver(args, grpc_parse_unix); + OrphanablePtr CreateResolver(ResolverArgs args) const override { + return CreateSockaddrResolver(std::move(args), grpc_parse_unix); } UniquePtr GetDefaultAuthority(grpc_uri* uri) const override { diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h index d891ef62e1d..273fd8d24f0 100644 --- a/src/core/ext/filters/client_channel/resolver_factory.h +++ b/src/core/ext/filters/client_channel/resolver_factory.h @@ -41,12 +41,14 @@ struct ResolverArgs { grpc_pollset_set* pollset_set = nullptr; /// The combiner under which all resolver calls will be run. grpc_combiner* combiner = nullptr; + /// The result handler to be used by the resolver. + UniquePtr result_handler; }; class ResolverFactory { public: /// Returns a new resolver instance. - virtual OrphanablePtr CreateResolver(const ResolverArgs& args) const + virtual OrphanablePtr CreateResolver(ResolverArgs args) const GRPC_ABSTRACT; /// Returns a string representing the default authority to use for this diff --git a/src/core/ext/filters/client_channel/resolver_registry.cc b/src/core/ext/filters/client_channel/resolver_registry.cc index 91c0267f95e..5b00eab341e 100644 --- a/src/core/ext/filters/client_channel/resolver_registry.cc +++ b/src/core/ext/filters/client_channel/resolver_registry.cc @@ -134,7 +134,8 @@ ResolverFactory* ResolverRegistry::LookupResolverFactory(const char* scheme) { OrphanablePtr ResolverRegistry::CreateResolver( const char* target, const grpc_channel_args* args, - grpc_pollset_set* pollset_set, grpc_combiner* combiner) { + grpc_pollset_set* pollset_set, grpc_combiner* combiner, + UniquePtr result_handler) { GPR_ASSERT(g_state != nullptr); grpc_uri* uri = nullptr; char* canonical_target = nullptr; @@ -145,8 +146,10 @@ OrphanablePtr ResolverRegistry::CreateResolver( resolver_args.args = args; resolver_args.pollset_set = pollset_set; resolver_args.combiner = combiner; + resolver_args.result_handler = std::move(result_handler); OrphanablePtr resolver = - factory == nullptr ? nullptr : factory->CreateResolver(resolver_args); + factory == nullptr ? nullptr + : factory->CreateResolver(std::move(resolver_args)); grpc_uri_destroy(uri); gpr_free(canonical_target); return resolver; diff --git a/src/core/ext/filters/client_channel/resolver_registry.h b/src/core/ext/filters/client_channel/resolver_registry.h index d6ec6811bd7..1fbe01aabc2 100644 --- a/src/core/ext/filters/client_channel/resolver_registry.h +++ b/src/core/ext/filters/client_channel/resolver_registry.h @@ -62,10 +62,10 @@ class ResolverRegistry { /// \a args are the channel args to be included in resolver results. /// \a pollset_set is used to drive I/O in the name resolution process. /// \a combiner is the combiner under which all resolver calls will be run. - static OrphanablePtr CreateResolver(const char* target, - const grpc_channel_args* args, - grpc_pollset_set* pollset_set, - grpc_combiner* combiner); + static OrphanablePtr CreateResolver( + const char* target, const grpc_channel_args* args, + grpc_pollset_set* pollset_set, grpc_combiner* combiner, + UniquePtr result_handler); /// Returns the default authority to pass from a client for \a target. static UniquePtr GetDefaultAuthority(const char* target); diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index 52b14dcc7de..63cf56b1a44 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -65,6 +65,36 @@ namespace grpc_core { +// +// ResolvingLoadBalancingPolicy::ResolverResultHandler +// + +class ResolvingLoadBalancingPolicy::ResolverResultHandler + : public Resolver::ResultHandler { + public: + explicit ResolverResultHandler( + RefCountedPtr parent) + : parent_(std::move(parent)) {} + + ~ResolverResultHandler() { + if (parent_->tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete", + parent_.get()); + } + } + + void ReturnResult(const grpc_channel_args* result) override { + parent_->OnResolverResultChangedLocked(result); + } + + void ReturnError(grpc_error* error) override { + parent_->OnResolverError(error); + } + + private: + RefCountedPtr parent_; +}; + // // ResolvingLoadBalancingPolicy::ResolvingControlHelper // @@ -196,12 +226,9 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( } grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) { - GRPC_CLOSURE_INIT( - &on_resolver_result_changed_, - &ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked, this, - grpc_combiner_scheduler(combiner())); resolver_ = ResolverRegistry::CreateResolver( - target_uri_.get(), &args, interested_parties(), combiner()); + target_uri_.get(), &args, interested_parties(), combiner(), + UniquePtr(New(Ref()))); if (resolver_ == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); } @@ -288,62 +315,34 @@ void ResolvingLoadBalancingPolicy::StartResolvingLocked() { channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, UniquePtr(New(Ref()))); - Ref().release(); - resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_); + resolver_->StartLocked(); } -// Invoked from the resolver NextLocked() callback when the resolver -// is shutting down. -void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) { - if (tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down", this); +void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { + if (resolver_ == nullptr) { + GRPC_ERROR_UNREF(error); + return; } - { - MutexLock lock(&lb_policy_mu_); - if (lb_policy_ != nullptr) { - if (tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, - lb_policy_.get()); - } - grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), - interested_parties()); - lb_policy_.reset(); - } - if (pending_lb_policy_ != nullptr) { - if (tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p", - this, pending_lb_policy_.get()); - } - grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(), - interested_parties()); - pending_lb_policy_.reset(); - } + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this, + grpc_error_string(error)); } - if (resolver_ != nullptr) { - // This should never happen; it can only be triggered by a resolver - // implementation spotaneously deciding to report shutdown without - // being orphaned. This code is included just to be defensive. - if (tracer_->enabled()) { - gpr_log(GPR_INFO, - "resolving_lb=%p: spontaneous shutdown from resolver %p", this, - resolver_.get()); - } - resolver_.reset(); - grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver spontaneous shutdown", &error, 1); + // If we already have an LB policy from a previous resolution + // result, then we continue to let it set the connectivity state. + // Otherwise, we go into TRANSIENT_FAILURE. + if (lb_policy_ == nullptr) { + grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Resolver transient failure", &error, 1); channel_control_helper()->UpdateState( - GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - UniquePtr(New(error))); + GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(state_error), + UniquePtr(New(state_error))); } - grpc_channel_args_destroy(resolver_result_); - resolver_result_ = nullptr; GRPC_ERROR_UNREF(error); - Unref(); } void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( const char* lb_policy_name, RefCountedPtr lb_policy_config, - TraceStringVector* trace_strings) { + const grpc_channel_args& args, TraceStringVector* trace_strings) { // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store // the new child policy in pending_child_policy_. Once the new child @@ -411,7 +410,7 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this, lb_policy_ == nullptr ? "" : "pending ", lb_policy_name); } - auto new_policy = CreateLbPolicyLocked(lb_policy_name, trace_strings); + auto new_policy = CreateLbPolicyLocked(lb_policy_name, args, trace_strings); auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_; { MutexLock lock(&lb_policy_mu_); @@ -432,21 +431,21 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( policy_to_update == pending_lb_policy_.get() ? "pending " : "", policy_to_update); } - policy_to_update->UpdateLocked(*resolver_result_, - std::move(lb_policy_config)); + policy_to_update->UpdateLocked(args, std::move(lb_policy_config)); } // Creates a new LB policy. // Updates trace_strings to indicate what was done. OrphanablePtr ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( - const char* lb_policy_name, TraceStringVector* trace_strings) { + const char* lb_policy_name, const grpc_channel_args& args, + TraceStringVector* trace_strings) { ResolvingControlHelper* helper = New(Ref()); LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.channel_control_helper = UniquePtr(helper); - lb_policy_args.args = resolver_result_; + lb_policy_args.args = &args; OrphanablePtr lb_policy = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( lb_policy_name, std::move(lb_policy_args)); @@ -480,9 +479,10 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( } void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked( + const grpc_channel_args& resolver_result, TraceStringVector* trace_strings) { const ServerAddressList* addresses = - FindServerAddressListChannelArg(resolver_result_); + FindServerAddressListChannelArg(&resolver_result); const bool resolution_contains_addresses = addresses != nullptr && addresses->size() > 0; if (!resolution_contains_addresses && @@ -516,27 +516,16 @@ void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked( } } -// Callback invoked when a resolver result is available. void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( - void* arg, grpc_error* error) { - auto* self = static_cast(arg); - if (self->tracer_->enabled()) { - const char* disposition = - self->resolver_result_ != nullptr - ? "" - : (error == GRPC_ERROR_NONE ? " (transient error)" - : " (resolver shutdown)"); - gpr_log(GPR_INFO, - "resolving_lb=%p: got resolver result: resolver_result=%p " - "error=%s%s", - self, self->resolver_result_, grpc_error_string(error), - disposition); - } - // Handle shutdown. - if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) { - self->OnResolverShutdownLocked(GRPC_ERROR_REF(error)); + const grpc_channel_args* result) { + // Handle race conditions. + if (resolver_ == nullptr) { + grpc_channel_args_destroy(result); return; } + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result %p", this, result); + } // We only want to trace the address resolution in the follow cases: // (a) Address resolution resulted in service config change. // (b) Address resolution that causes number of backends to go from @@ -547,63 +536,34 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( // // we track a list of strings to eventually be concatenated and traced. TraceStringVector trace_strings; - // resolver_result_ will be null in the case of a transient - // resolution error. In that case, we don't have any new result to - // process, which means that we keep using the previous result (if any). - if (self->resolver_result_ == nullptr) { - if (self->tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure", self); - } - // If we already have an LB policy from a previous resolution - // result, then we continue to let it set the connectivity state. - // Otherwise, we go into TRANSIENT_FAILURE. - if (self->lb_policy_ == nullptr) { - // TODO(roth): When we change the resolver API to be able to - // return transient errors in a cleaner way, we should make it the - // resolver's responsibility to attach a status to the error, - // rather than doing it centrally here. - grpc_error* state_error = grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver transient failure", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - self->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(state_error), - UniquePtr( - New(state_error))); - } + // Parse the resolver result. + const char* lb_policy_name = nullptr; + RefCountedPtr lb_policy_config; + bool service_config_changed = false; + if (process_resolver_result_ != nullptr) { + service_config_changed = + process_resolver_result_(process_resolver_result_user_data_, *result, + &lb_policy_name, &lb_policy_config); } else { - // Parse the resolver result. - const char* lb_policy_name = nullptr; - RefCountedPtr lb_policy_config; - bool service_config_changed = false; - if (self->process_resolver_result_ != nullptr) { - service_config_changed = self->process_resolver_result_( - self->process_resolver_result_user_data_, *self->resolver_result_, - &lb_policy_name, &lb_policy_config); - } else { - lb_policy_name = self->child_policy_name_.get(); - lb_policy_config = self->child_lb_config_; - } - GPR_ASSERT(lb_policy_name != nullptr); - self->CreateOrUpdateLbPolicyLocked( - lb_policy_name, std::move(lb_policy_config), &trace_strings); - // Add channel trace event. - if (self->channelz_node() != nullptr) { - if (service_config_changed) { - // TODO(ncteisen): might be worth somehow including a snippet of the - // config in the trace, at the risk of bloating the trace logs. - trace_strings.push_back(gpr_strdup("Service config changed")); - } - self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings); - self->ConcatenateAndAddChannelTraceLocked(&trace_strings); + lb_policy_name = child_policy_name_.get(); + lb_policy_config = child_lb_config_; + } + GPR_ASSERT(lb_policy_name != nullptr); + // Create or update LB policy, as needed. + CreateOrUpdateLbPolicyLocked(lb_policy_name, std::move(lb_policy_config), + *result, &trace_strings); + // Add channel trace event. + if (channelz_node() != nullptr) { + if (service_config_changed) { + // TODO(ncteisen): might be worth somehow including a snippet of the + // config in the trace, at the risk of bloating the trace logs. + trace_strings.push_back(gpr_strdup("Service config changed")); } - // Clean up. - grpc_channel_args_destroy(self->resolver_result_); - self->resolver_result_ = nullptr; + MaybeAddTraceMessagesForAddressChangesLocked(*result, &trace_strings); + ConcatenateAndAddChannelTraceLocked(&trace_strings); } - // Renew resolver callback. - self->resolver_->NextLocked(&self->resolver_result_, - &self->on_resolver_result_changed_); + // Clean up. + grpc_channel_args_destroy(result); } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index b8f406da1b6..fa34611c979 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -93,6 +93,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { private: using TraceStringVector = InlinedVector; + class ResolverResultHandler; class ResolvingControlHelper; ~ResolvingLoadBalancingPolicy(); @@ -101,17 +102,20 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { void ShutdownLocked() override; void StartResolvingLocked(); - void OnResolverShutdownLocked(grpc_error* error); + void OnResolverError(grpc_error* error); void CreateOrUpdateLbPolicyLocked(const char* lb_policy_name, - RefCountedPtr, + RefCountedPtr lb_policy_config, + const grpc_channel_args& args, TraceStringVector* trace_strings); OrphanablePtr CreateLbPolicyLocked( - const char* lb_policy_name, TraceStringVector* trace_strings); + const char* lb_policy_name, const grpc_channel_args& args, + TraceStringVector* trace_strings); void MaybeAddTraceMessagesForAddressChangesLocked( + const grpc_channel_args& resolver_result, TraceStringVector* trace_strings); void ConcatenateAndAddChannelTraceLocked( TraceStringVector* trace_strings) const; - static void OnResolverResultChangedLocked(void* arg, grpc_error* error); + void OnResolverResultChangedLocked(const grpc_channel_args* result); // Passed in from caller at construction time. TraceFlag* tracer_; @@ -124,9 +128,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { // Resolver and associated state. OrphanablePtr resolver_; bool started_resolving_ = false; - grpc_channel_args* resolver_result_ = nullptr; bool previous_resolution_contained_addresses_ = false; - grpc_closure on_resolver_result_changed_; // Child LB policy. OrphanablePtr lb_policy_; diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 5ff303a9dc6..c47c027b379 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -56,6 +56,9 @@ grpc_channel_args* grpc_channel_args_union(const grpc_channel_args* a, /** Destroy arguments created by \a grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args* a); +inline void grpc_channel_args_destroy(const grpc_channel_args* a) { + grpc_channel_args_destroy(const_cast(a)); +} /** Returns the compression algorithm set in \a a. */ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index 0cf549d01da..76ac585fb4c 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -90,7 +90,8 @@ static void my_cancel_ares_request_locked(grpc_ares_request* request) { } static grpc_core::OrphanablePtr create_resolver( - const char* name) { + const char* name, + grpc_core::UniquePtr result_handler) { grpc_core::ResolverFactory* factory = grpc_core::ResolverRegistry::LookupResolverFactory("dns"); grpc_uri* uri = grpc_uri_parse(name, 0); @@ -98,15 +99,52 @@ static grpc_core::OrphanablePtr create_resolver( grpc_core::ResolverArgs args; args.uri = uri; args.combiner = g_combiner; + args.result_handler = std::move(result_handler); grpc_core::OrphanablePtr resolver = - factory->CreateResolver(args); + factory->CreateResolver(std::move(args)); grpc_uri_destroy(uri); return resolver; } -static void on_done(void* ev, grpc_error* error) { - gpr_event_set(static_cast(ev), (void*)1); -} +class ResultHandler : public grpc_core::Resolver::ResultHandler { + public: + struct ResolverOutput { + const grpc_channel_args* result = nullptr; + grpc_error* error = nullptr; + gpr_event ev; + + ResolverOutput() { gpr_event_init(&ev); } + ~ResolverOutput() { + grpc_channel_args_destroy(result); + GRPC_ERROR_UNREF(error); + } + }; + + void SetOutput(ResolverOutput* output) { + gpr_atm_rel_store(&output_, reinterpret_cast(output)); + } + + void ReturnResult(const grpc_channel_args* args) override { + ResolverOutput* output = + reinterpret_cast(gpr_atm_acq_load(&output_)); + GPR_ASSERT(output != nullptr); + output->result = args; + output->error = GRPC_ERROR_NONE; + gpr_event_set(&output->ev, (void*)1); + } + + void ReturnError(grpc_error* error) override { + ResolverOutput* output = + reinterpret_cast(gpr_atm_acq_load(&output_)); + GPR_ASSERT(output != nullptr); + output->result = nullptr; + output->error = error; + gpr_event_set(&output->ev, (void*)1); + } + + private: + gpr_atm output_ = 0; // ResolverOutput* +}; // interleave waiting for an event with a timer check static bool wait_loop(int deadline_seconds, gpr_event* ev) { @@ -121,32 +159,6 @@ static bool wait_loop(int deadline_seconds, gpr_event* ev) { return false; } -typedef struct next_args { - grpc_core::Resolver* resolver; - grpc_channel_args** result; - grpc_closure* on_complete; -} next_args; - -static void call_resolver_next_now_lock_taken(void* arg, - grpc_error* error_unused) { - next_args* a = static_cast(arg); - a->resolver->NextLocked(a->result, a->on_complete); - gpr_free(a); -} - -static void call_resolver_next_after_locking(grpc_core::Resolver* resolver, - grpc_channel_args** result, - grpc_closure* on_complete, - grpc_combiner* combiner) { - next_args* a = static_cast(gpr_malloc(sizeof(*a))); - a->resolver = resolver; - a->result = result; - a->on_complete = on_complete; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(call_resolver_next_now_lock_taken, a, - grpc_combiner_scheduler(combiner)), - GRPC_ERROR_NONE); -} - int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); @@ -156,33 +168,28 @@ int main(int argc, char** argv) { grpc_set_resolver_impl(&test_resolver); grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; - grpc_channel_args* result = (grpc_channel_args*)1; { grpc_core::ExecCtx exec_ctx; - grpc_core::OrphanablePtr resolver = - create_resolver("dns:test"); - gpr_event ev1; - gpr_event_init(&ev1); - call_resolver_next_after_locking( - resolver.get(), &result, - GRPC_CLOSURE_CREATE(on_done, &ev1, grpc_schedule_on_exec_ctx), - g_combiner); + ResultHandler* result_handler = grpc_core::New(); + grpc_core::OrphanablePtr resolver = create_resolver( + "dns:test", grpc_core::UniquePtr( + result_handler)); + ResultHandler::ResolverOutput output1; + result_handler->SetOutput(&output1); + resolver->StartLocked(); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(wait_loop(5, &ev1)); - GPR_ASSERT(result == nullptr); - - gpr_event ev2; - gpr_event_init(&ev2); - call_resolver_next_after_locking( - resolver.get(), &result, - GRPC_CLOSURE_CREATE(on_done, &ev2, grpc_schedule_on_exec_ctx), - g_combiner); + GPR_ASSERT(wait_loop(5, &output1.ev)); + GPR_ASSERT(output1.result == nullptr); + GPR_ASSERT(output1.error != GRPC_ERROR_NONE); + + ResultHandler::ResolverOutput output2; + result_handler->SetOutput(&output2); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(wait_loop(30, &ev2)); - GPR_ASSERT(result != nullptr); + GPR_ASSERT(wait_loop(30, &output2.ev)); + GPR_ASSERT(output2.result != nullptr); + GPR_ASSERT(output2.error == GRPC_ERROR_NONE); - grpc_channel_args_destroy(result); GRPC_COMBINER_UNREF(g_combiner, "test"); } diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 3157d6019f3..82ff5b04fe0 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -170,19 +170,52 @@ static void poll_pollset_until_request_done(iomgr_args* args) { gpr_event_set(&args->ev, (void*)1); } +struct OnResolutionCallbackArg; + +class ResultHandler : public grpc_core::Resolver::ResultHandler { + public: + using ResultCallback = void (*)(const grpc_channel_args* result, + OnResolutionCallbackArg* state); + + void SetCallback(ResultCallback result_cb, OnResolutionCallbackArg* state) { + GPR_ASSERT(result_cb_ == nullptr); + result_cb_ = result_cb; + GPR_ASSERT(state_ == nullptr); + state_ = state; + } + + void ReturnResult(const grpc_channel_args* args) override { + GPR_ASSERT(result_cb_ != nullptr); + GPR_ASSERT(state_ != nullptr); + ResultCallback cb = result_cb_; + OnResolutionCallbackArg* state = state_; + result_cb_ = nullptr; + state_ = nullptr; + cb(args, state); + } + + void ReturnError(grpc_error* error) override { + gpr_log(GPR_ERROR, "resolver returned error: %s", grpc_error_string(error)); + GPR_ASSERT(false); + } + + private: + ResultCallback result_cb_ = nullptr; + OnResolutionCallbackArg* state_ = nullptr; +}; + struct OnResolutionCallbackArg { const char* uri_str = nullptr; grpc_core::OrphanablePtr resolver; - grpc_channel_args* result = nullptr; + ResultHandler* result_handler; }; // Set to true by the last callback in the resolution chain. static bool g_all_callbacks_invoked; -static void on_second_resolution(void* arg, grpc_error* error) { - OnResolutionCallbackArg* cb_arg = static_cast(arg); - grpc_channel_args_destroy(cb_arg->result); - GPR_ASSERT(error == GRPC_ERROR_NONE); +static void on_second_resolution(const grpc_channel_args* result, + OnResolutionCallbackArg* cb_arg) { + grpc_channel_args_destroy(result); gpr_log(GPR_INFO, "2nd: g_resolution_count: %d", g_resolution_count); // The resolution callback was not invoked until new data was // available, which was delayed until after the cooldown period. @@ -197,18 +230,14 @@ static void on_second_resolution(void* arg, grpc_error* error) { g_all_callbacks_invoked = true; } -static void on_first_resolution(void* arg, grpc_error* error) { - OnResolutionCallbackArg* cb_arg = static_cast(arg); - grpc_channel_args_destroy(cb_arg->result); - GPR_ASSERT(error == GRPC_ERROR_NONE); +static void on_first_resolution(const grpc_channel_args* result, + OnResolutionCallbackArg* cb_arg) { + grpc_channel_args_destroy(result); gpr_log(GPR_INFO, "1st: g_resolution_count: %d", g_resolution_count); // There's one initial system-level resolution and one invocation of a // notification callback (the current function). GPR_ASSERT(g_resolution_count == 1); - cb_arg->resolver->NextLocked( - &cb_arg->result, - GRPC_CLOSURE_CREATE(on_second_resolution, arg, - grpc_combiner_scheduler(g_combiner))); + cb_arg->result_handler->SetCallback(on_second_resolution, cb_arg); cb_arg->resolver->RequestReresolutionLocked(); gpr_mu_lock(g_iomgr_args.mu); GRPC_LOG_IF_ERROR("pollset_kick", @@ -220,6 +249,8 @@ static void start_test_under_combiner(void* arg, grpc_error* error) { OnResolutionCallbackArg* res_cb_arg = static_cast(arg); + res_cb_arg->result_handler = grpc_core::New(); + grpc_core::ResolverFactory* factory = grpc_core::ResolverRegistry::LookupResolverFactory("dns"); grpc_uri* uri = grpc_uri_parse(res_cb_arg->uri_str, 0); @@ -229,25 +260,22 @@ static void start_test_under_combiner(void* arg, grpc_error* error) { grpc_core::ResolverArgs args; args.uri = uri; args.combiner = g_combiner; + args.result_handler = + grpc_core::UniquePtr( + res_cb_arg->result_handler); g_resolution_count = 0; - grpc_arg cooldown_arg; - cooldown_arg.key = - const_cast(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS); - cooldown_arg.type = GRPC_ARG_INTEGER; - cooldown_arg.value.integer = kMinResolutionPeriodMs; - auto* cooldown_channel_args = - grpc_channel_args_copy_and_add(nullptr, &cooldown_arg, 1); - args.args = cooldown_channel_args; - res_cb_arg->resolver = factory->CreateResolver(args); - grpc_channel_args_destroy(cooldown_channel_args); + grpc_arg cooldown_arg = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS), + kMinResolutionPeriodMs); + grpc_channel_args cooldown_args = {1, &cooldown_arg}; + args.args = &cooldown_args; + res_cb_arg->resolver = factory->CreateResolver(std::move(args)); GPR_ASSERT(res_cb_arg->resolver != nullptr); - // First resolution, would incur in system-level resolution. - res_cb_arg->resolver->NextLocked( - &res_cb_arg->result, - GRPC_CLOSURE_CREATE(on_first_resolution, res_cb_arg, - grpc_combiner_scheduler(g_combiner))); grpc_uri_destroy(uri); + // First resolution, would incur in system-level resolution. + res_cb_arg->result_handler->SetCallback(on_first_resolution, res_cb_arg); + res_cb_arg->resolver->StartLocked(); } static void test_cooldown() { diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index 6f153cc9bf6..ed3b4e66472 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -39,8 +39,10 @@ static void test_succeeds(grpc_core::ResolverFactory* factory, grpc_core::ResolverArgs args; args.uri = uri; args.combiner = g_combiner; + args.result_handler = + grpc_core::MakeUnique(); grpc_core::OrphanablePtr resolver = - factory->CreateResolver(args); + factory->CreateResolver(std::move(args)); GPR_ASSERT(resolver != nullptr); grpc_uri_destroy(uri); } @@ -55,8 +57,10 @@ static void test_fails(grpc_core::ResolverFactory* factory, grpc_core::ResolverArgs args; args.uri = uri; args.combiner = g_combiner; + args.result_handler = + grpc_core::MakeUnique(); grpc_core::OrphanablePtr resolver = - factory->CreateResolver(args); + factory->CreateResolver(std::move(args)); GPR_ASSERT(resolver == nullptr); grpc_uri_destroy(uri); } diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc index 3b06fe063ae..9927404fc10 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.cc +++ b/test/core/client_channel/resolvers/fake_resolver_test.cc @@ -33,9 +33,49 @@ #include "test/core/util/test_config.h" +class ResultHandler : public grpc_core::Resolver::ResultHandler { + public: + ~ResultHandler() override { grpc_channel_args_destroy(expected_); } + + void SetExpectedAndEvent(grpc_channel_args* expected, gpr_event* ev) { + GPR_ASSERT(expected_ == nullptr); + GPR_ASSERT(ev_ == nullptr); + expected_ = grpc_channel_args_copy(expected); + ev_ = ev; + } + + void ReturnResult(const grpc_channel_args* args) override { + GPR_ASSERT(expected_ != nullptr); + GPR_ASSERT(ev_ != nullptr); + // We only check the addresses channel arg because that's the only one + // explicitly set by the test via + // FakeResolverResponseGenerator::SetResponse(). + const grpc_core::ServerAddressList* actual_addresses = + grpc_core::FindServerAddressListChannelArg(args); + const grpc_core::ServerAddressList* expected_addresses = + grpc_core::FindServerAddressListChannelArg(expected_); + GPR_ASSERT(actual_addresses->size() == expected_addresses->size()); + for (size_t i = 0; i < expected_addresses->size(); ++i) { + GPR_ASSERT((*actual_addresses)[i] == (*expected_addresses)[i]); + } + grpc_channel_args_destroy(args); + grpc_channel_args_destroy(expected_); + expected_ = nullptr; + gpr_event_set(ev_, (void*)1); + ev_ = nullptr; + } + + void ReturnError(grpc_error* error) override {} + + private: + grpc_channel_args* expected_ = nullptr; + gpr_event* ev_ = nullptr; +}; + static grpc_core::OrphanablePtr build_fake_resolver( grpc_combiner* combiner, - grpc_core::FakeResolverResponseGenerator* response_generator) { + grpc_core::FakeResolverResponseGenerator* response_generator, + grpc_core::UniquePtr result_handler) { grpc_core::ResolverFactory* factory = grpc_core::ResolverRegistry::LookupResolverFactory("fake"); grpc_arg generator_arg = @@ -45,37 +85,12 @@ static grpc_core::OrphanablePtr build_fake_resolver( grpc_core::ResolverArgs args; args.args = &channel_args; args.combiner = combiner; + args.result_handler = std::move(result_handler); grpc_core::OrphanablePtr resolver = - factory->CreateResolver(args); + factory->CreateResolver(std::move(args)); return resolver; } -typedef struct on_resolution_arg { - grpc_channel_args* resolver_result; - grpc_channel_args* expected_resolver_result; - gpr_event ev; -} on_resolution_arg; - -// Callback to check the resolution result is as expected. -void on_resolution_cb(void* arg, grpc_error* error) { - if (error != GRPC_ERROR_NONE) return; - on_resolution_arg* res = static_cast(arg); - // We only check the addresses channel arg because that's the only one - // explicitly set by the test via - // FakeResolverResponseGenerator::SetResponse(). - const grpc_core::ServerAddressList* actual_addresses = - grpc_core::FindServerAddressListChannelArg(res->resolver_result); - const grpc_core::ServerAddressList* expected_addresses = - grpc_core::FindServerAddressListChannelArg(res->expected_resolver_result); - GPR_ASSERT(actual_addresses->size() == expected_addresses->size()); - for (size_t i = 0; i < expected_addresses->size(); ++i) { - GPR_ASSERT((*actual_addresses)[i] == (*expected_addresses)[i]); - } - grpc_channel_args_destroy(res->resolver_result); - grpc_channel_args_destroy(res->expected_resolver_result); - gpr_event_set(&res->ev, (void*)1); -} - // Create a new resolution containing 2 addresses. static grpc_channel_args* create_new_resolver_result() { static size_t test_counter = 0; @@ -115,110 +130,99 @@ static grpc_channel_args* create_new_resolver_result() { return results; } -static on_resolution_arg create_on_resolution_arg(grpc_channel_args* results) { - on_resolution_arg on_res_arg; - memset(&on_res_arg, 0, sizeof(on_res_arg)); - on_res_arg.expected_resolver_result = results; - gpr_event_init(&on_res_arg.ev); - return on_res_arg; -} - static void test_fake_resolver() { grpc_core::ExecCtx exec_ctx; grpc_combiner* combiner = grpc_combiner_create(); // Create resolver. + ResultHandler* result_handler = grpc_core::New(); grpc_core::RefCountedPtr response_generator = grpc_core::MakeRefCounted(); - grpc_core::OrphanablePtr resolver = - build_fake_resolver(combiner, response_generator.get()); + grpc_core::OrphanablePtr resolver = build_fake_resolver( + combiner, response_generator.get(), + grpc_core::UniquePtr(result_handler)); GPR_ASSERT(resolver.get() != nullptr); + resolver->StartLocked(); // Test 1: normal resolution. // next_results != NULL, reresolution_results == NULL. // Expected response is next_results. + gpr_log(GPR_INFO, "TEST 1"); grpc_channel_args* results = create_new_resolver_result(); - on_resolution_arg on_res_arg = create_on_resolution_arg(results); - grpc_closure* on_resolution = GRPC_CLOSURE_CREATE( - on_resolution_cb, &on_res_arg, grpc_combiner_scheduler(combiner)); - // Resolution won't be triggered until next_results is set. - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); + gpr_event ev1; + gpr_event_init(&ev1); + result_handler->SetExpectedAndEvent(results, &ev1); response_generator->SetResponse(results); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&on_res_arg.ev, - grpc_timeout_seconds_to_deadline(5)) != nullptr); + GPR_ASSERT(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)) != + nullptr); + grpc_channel_args_destroy(results); // Test 2: update resolution. // next_results != NULL, reresolution_results == NULL. // Expected response is next_results. + gpr_log(GPR_INFO, "TEST 2"); results = create_new_resolver_result(); - on_res_arg = create_on_resolution_arg(results); - on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg, - grpc_combiner_scheduler(combiner)); - // Resolution won't be triggered until next_results is set. - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); + gpr_event ev2; + gpr_event_init(&ev2); + result_handler->SetExpectedAndEvent(results, &ev2); response_generator->SetResponse(results); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&on_res_arg.ev, - grpc_timeout_seconds_to_deadline(5)) != nullptr); + GPR_ASSERT(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)) != + nullptr); + grpc_channel_args_destroy(results); // Test 3: normal re-resolution. // next_results == NULL, reresolution_results != NULL. // Expected response is reresolution_results. + gpr_log(GPR_INFO, "TEST 3"); grpc_channel_args* reresolution_results = create_new_resolver_result(); - on_res_arg = - create_on_resolution_arg(grpc_channel_args_copy(reresolution_results)); - on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg, - grpc_combiner_scheduler(combiner)); - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); + gpr_event ev3; + gpr_event_init(&ev3); + result_handler->SetExpectedAndEvent(reresolution_results, &ev3); // Set reresolution_results. + // No result will be returned until re-resolution is requested. response_generator->SetReresolutionResponse(reresolution_results); - // Flush here to guarantee that the response has been set. grpc_core::ExecCtx::Get()->Flush(); // Trigger a re-resolution. resolver->RequestReresolutionLocked(); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&on_res_arg.ev, - grpc_timeout_seconds_to_deadline(5)) != nullptr); + GPR_ASSERT(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)) != + nullptr); // Test 4: repeat re-resolution. // next_results == NULL, reresolution_results != NULL. // Expected response is reresolution_results. - on_res_arg = create_on_resolution_arg(reresolution_results); - on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg, - grpc_combiner_scheduler(combiner)); - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); + gpr_log(GPR_INFO, "TEST 4"); + gpr_event ev4; + gpr_event_init(&ev4); + result_handler->SetExpectedAndEvent(reresolution_results, &ev4); // Trigger a re-resolution. resolver->RequestReresolutionLocked(); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&on_res_arg.ev, - grpc_timeout_seconds_to_deadline(5)) != nullptr); + GPR_ASSERT(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)) != + nullptr); + grpc_channel_args_destroy(reresolution_results); // Test 5: normal resolution. // next_results != NULL, reresolution_results != NULL. // Expected response is next_results. + gpr_log(GPR_INFO, "TEST 5"); results = create_new_resolver_result(); - on_res_arg = create_on_resolution_arg(results); - on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg, - grpc_combiner_scheduler(combiner)); - // Resolution won't be triggered until next_results is set. - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); + gpr_event ev5; + gpr_event_init(&ev5); + result_handler->SetExpectedAndEvent(results, &ev5); response_generator->SetResponse(results); grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&on_res_arg.ev, - grpc_timeout_seconds_to_deadline(5)) != nullptr); + GPR_ASSERT(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)) != + nullptr); + grpc_channel_args_destroy(results); // Test 6: no-op. // Requesting a new resolution without setting the response shouldn't trigger // the resolution callback. - memset(&on_res_arg, 0, sizeof(on_res_arg)); - on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg, - grpc_combiner_scheduler(combiner)); - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); - grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&on_res_arg.ev, - grpc_timeout_milliseconds_to_deadline(100)) == + gpr_log(GPR_INFO, "TEST 6"); + gpr_event ev6; + gpr_event_init(&ev6); + result_handler->SetExpectedAndEvent(nullptr, &ev6); + GPR_ASSERT(gpr_event_wait(&ev6, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); // Clean up. - // Note: Need to explicitly unref the resolver and flush the exec_ctx - // to make sure that the final resolver callback (with error set to - // "Resolver Shutdown") is invoked before on_res_arg goes out of scope. resolver.reset(); - grpc_core::ExecCtx::Get()->Flush(); GRPC_COMBINER_UNREF(combiner, "test_fake_resolver"); } diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc index ff7db6046d7..37abe20fe8d 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc @@ -30,15 +30,14 @@ static grpc_combiner* g_combiner; -typedef struct on_resolution_arg { - char* expected_server_name; - grpc_channel_args* resolver_result; -} on_resolution_arg; - -void on_resolution_cb(void* arg, grpc_error* error) { - on_resolution_arg* res = static_cast(arg); - grpc_channel_args_destroy(res->resolver_result); -} +class ResultHandler : public grpc_core::Resolver::ResultHandler { + public: + void ReturnResult(const grpc_channel_args* args) override { + grpc_channel_args_destroy(args); + } + + void ReturnError(grpc_error* error) override { GRPC_ERROR_UNREF(error); } +}; static void test_succeeds(grpc_core::ResolverFactory* factory, const char* string) { @@ -50,18 +49,14 @@ static void test_succeeds(grpc_core::ResolverFactory* factory, grpc_core::ResolverArgs args; args.uri = uri; args.combiner = g_combiner; + args.result_handler = + grpc_core::UniquePtr( + grpc_core::New()); grpc_core::OrphanablePtr resolver = - factory->CreateResolver(args); + factory->CreateResolver(std::move(args)); GPR_ASSERT(resolver != nullptr); - - on_resolution_arg on_res_arg; - memset(&on_res_arg, 0, sizeof(on_res_arg)); - on_res_arg.expected_server_name = uri->path; - grpc_closure* on_resolution = GRPC_CLOSURE_CREATE( - on_resolution_cb, &on_res_arg, grpc_schedule_on_exec_ctx); - - resolver->NextLocked(&on_res_arg.resolver_result, on_resolution); grpc_uri_destroy(uri); + resolver->StartLocked(); /* Flush ExecCtx to avoid stack-use-after-scope on on_res_arg which is * accessed in the closure on_resolution_cb */ grpc_core::ExecCtx::Get()->Flush(); @@ -77,8 +72,11 @@ static void test_fails(grpc_core::ResolverFactory* factory, grpc_core::ResolverArgs args; args.uri = uri; args.combiner = g_combiner; + args.result_handler = + grpc_core::UniquePtr( + grpc_core::New()); grpc_core::OrphanablePtr resolver = - factory->CreateResolver(args); + factory->CreateResolver(std::move(args)); GPR_ASSERT(resolver == nullptr); grpc_uri_destroy(uri); } diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 3e789f0b149..74da4380be5 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -160,14 +160,27 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) { } } -void CheckResolverResultAssertFailureLocked(void* arg, grpc_error* error) { - EXPECT_NE(error, GRPC_ERROR_NONE); - ArgsStruct* args = static_cast(arg); - gpr_atm_rel_store(&args->done_atm, 1); - gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); - gpr_mu_unlock(args->mu); -} +class AssertFailureResultHandler : public grpc_core::Resolver::ResultHandler { + public: + explicit AssertFailureResultHandler(ArgsStruct* args) : args_(args) {} + + ~AssertFailureResultHandler() override { + gpr_atm_rel_store(&args_->done_atm, 1); + gpr_mu_lock(args_->mu); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(args_->pollset, nullptr)); + gpr_mu_unlock(args_->mu); + } + + void ReturnResult(const grpc_channel_args* args) override { + GPR_ASSERT(false); + } + + void ReturnError(grpc_error* error) override { GPR_ASSERT(false); } + + private: + ArgsStruct* args_; +}; void TestCancelActiveDNSQuery(ArgsStruct* args) { int fake_dns_port = grpc_pick_unused_port_or_die(); @@ -180,13 +193,11 @@ void TestCancelActiveDNSQuery(ArgsStruct* args) { // create resolver and resolve grpc_core::OrphanablePtr resolver = grpc_core::ResolverRegistry::CreateResolver( - client_target, nullptr, args->pollset_set, args->lock); + client_target, nullptr, args->pollset_set, args->lock, + grpc_core::UniquePtr( + grpc_core::New(args))); gpr_free(client_target); - grpc_closure on_resolver_result_changed; - GRPC_CLOSURE_INIT(&on_resolver_result_changed, - CheckResolverResultAssertFailureLocked, (void*)args, - grpc_combiner_scheduler(args->lock)); - resolver->NextLocked(&args->channel_args, &on_resolver_result_changed); + resolver->StartLocked(); // Without resetting and causing resolver shutdown, the // PollPollsetUntilRequestDone call should never finish. resolver.reset(); diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 9532529e45d..abf27cdd058 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -239,7 +239,7 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) { gpr_event_set(&args->ev, (void*)1); } -void CheckServiceConfigResultLocked(grpc_channel_args* channel_args, +void CheckServiceConfigResultLocked(const grpc_channel_args* channel_args, ArgsStruct* args) { const grpc_arg* service_config_arg = grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG); @@ -253,7 +253,7 @@ void CheckServiceConfigResultLocked(grpc_channel_args* channel_args, } } -void CheckLBPolicyResultLocked(grpc_channel_args* channel_args, +void CheckLBPolicyResultLocked(const grpc_channel_args* channel_args, ArgsStruct* args) { const grpc_arg* lb_policy_arg = grpc_channel_args_find(channel_args, GRPC_ARG_LB_POLICY_NAME); @@ -394,54 +394,86 @@ void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) { } #endif -void CheckResolverResultLocked(void* argsp, grpc_error* err) { - EXPECT_EQ(err, GRPC_ERROR_NONE); - ArgsStruct* args = (ArgsStruct*)argsp; - grpc_channel_args* channel_args = args->channel_args; - grpc_core::ServerAddressList* addresses = - grpc_core::FindServerAddressListChannelArg(channel_args); - gpr_log(GPR_INFO, "num addrs found: %" PRIdPTR ". expected %" PRIdPTR, - addresses->size(), args->expected_addrs.size()); - GPR_ASSERT(addresses->size() == args->expected_addrs.size()); - std::vector found_lb_addrs; - for (size_t i = 0; i < addresses->size(); i++) { - grpc_core::ServerAddress& addr = (*addresses)[i]; - char* str; - grpc_sockaddr_to_string(&str, &addr.address(), 1 /* normalize */); - gpr_log(GPR_INFO, "%s", str); - found_lb_addrs.emplace_back( - GrpcLBAddress(std::string(str), addr.IsBalancer())); - gpr_free(str); +class ResultHandler : public grpc_core::Resolver::ResultHandler { + public: + static grpc_core::UniquePtr Create( + ArgsStruct* args) { + return grpc_core::UniquePtr( + grpc_core::New(args)); } - if (args->expected_addrs.size() != found_lb_addrs.size()) { - gpr_log(GPR_DEBUG, - "found lb addrs size is: %" PRIdPTR - ". expected addrs size is %" PRIdPTR, - found_lb_addrs.size(), args->expected_addrs.size()); - abort(); + + explicit ResultHandler(ArgsStruct* args) : args_(args) {} + + void ReturnResult(const grpc_channel_args* result) override { + CheckResult(result); + gpr_atm_rel_store(&args_->done_atm, 1); + gpr_mu_lock(args_->mu); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(args_->pollset, nullptr)); + gpr_mu_unlock(args_->mu); + grpc_channel_args_destroy(result); } - EXPECT_THAT(args->expected_addrs, UnorderedElementsAreArray(found_lb_addrs)); - CheckServiceConfigResultLocked(channel_args, args); - if (args->expected_service_config_string == "") { - CheckLBPolicyResultLocked(channel_args, args); + + void ReturnError(grpc_error* error) override { + gpr_log(GPR_ERROR, "resolver returned error: %s", grpc_error_string(error)); + GPR_ASSERT(false); } - gpr_atm_rel_store(&args->done_atm, 1); - gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); - gpr_mu_unlock(args->mu); -} -void CheckResolvedWithoutErrorLocked(void* argsp, grpc_error* err) { - EXPECT_EQ(err, GRPC_ERROR_NONE); - ArgsStruct* args = (ArgsStruct*)argsp; - gpr_atm_rel_store(&args->done_atm, 1); - gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); - gpr_mu_unlock(args->mu); -} + virtual void CheckResult(const grpc_channel_args* channel_args) {} + + protected: + ArgsStruct* args_struct() const { return args_; } + + private: + ArgsStruct* args_; +}; + +class CheckingResultHandler : public ResultHandler { + public: + static grpc_core::UniquePtr Create( + ArgsStruct* args) { + return grpc_core::UniquePtr( + grpc_core::New(args)); + } + + explicit CheckingResultHandler(ArgsStruct* args) : ResultHandler(args) {} + + void CheckResult(const grpc_channel_args* channel_args) override { + ArgsStruct* args = args_struct(); + grpc_core::ServerAddressList* addresses = + grpc_core::FindServerAddressListChannelArg(channel_args); + gpr_log(GPR_INFO, "num addrs found: %" PRIdPTR ". expected %" PRIdPTR, + addresses->size(), args->expected_addrs.size()); + GPR_ASSERT(addresses->size() == args->expected_addrs.size()); + std::vector found_lb_addrs; + for (size_t i = 0; i < addresses->size(); i++) { + grpc_core::ServerAddress& addr = (*addresses)[i]; + char* str; + grpc_sockaddr_to_string(&str, &addr.address(), 1 /* normalize */); + gpr_log(GPR_INFO, "%s", str); + found_lb_addrs.emplace_back( + GrpcLBAddress(std::string(str), addr.IsBalancer())); + gpr_free(str); + } + if (args->expected_addrs.size() != found_lb_addrs.size()) { + gpr_log(GPR_DEBUG, + "found lb addrs size is: %" PRIdPTR + ". expected addrs size is %" PRIdPTR, + found_lb_addrs.size(), args->expected_addrs.size()); + abort(); + } + EXPECT_THAT(args->expected_addrs, + UnorderedElementsAreArray(found_lb_addrs)); + CheckServiceConfigResultLocked(channel_args, args); + if (args->expected_service_config_string == "") { + CheckLBPolicyResultLocked(channel_args, args); + } + } +}; -void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg, - grpc_error* error)) { +void RunResolvesRelevantRecordsTest( + grpc_core::UniquePtr ( + *CreateResultHandler)(ArgsStruct* args)) { grpc_core::ExecCtx exec_ctx; ArgsStruct args; ArgsInit(&args); @@ -491,20 +523,18 @@ void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg, // create resolver and resolve grpc_core::OrphanablePtr resolver = grpc_core::ResolverRegistry::CreateResolver(whole_uri, resolver_args, - args.pollset_set, args.lock); + args.pollset_set, args.lock, + CreateResultHandler(&args)); grpc_channel_args_destroy(resolver_args); gpr_free(whole_uri); - grpc_closure on_resolver_result_changed; - GRPC_CLOSURE_INIT(&on_resolver_result_changed, OnDoneLocked, (void*)&args, - grpc_combiner_scheduler(args.lock)); - resolver->NextLocked(&args.channel_args, &on_resolver_result_changed); + resolver->StartLocked(); grpc_core::ExecCtx::Get()->Flush(); PollPollsetUntilRequestDone(&args); ArgsFinish(&args); } TEST(ResolverComponentTest, TestResolvesRelevantRecords) { - RunResolvesRelevantRecordsTest(CheckResolverResultLocked); + RunResolvesRelevantRecordsTest(CheckingResultHandler::Create); } TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) { @@ -515,7 +545,7 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) { std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, dummy_port, &done_ev); // Run the resolver test - RunResolvesRelevantRecordsTest(CheckResolvedWithoutErrorLocked); + RunResolvesRelevantRecordsTest(ResultHandler::Create); // Shutdown and join stress thread gpr_event_set(&done_ev, (void*)1); socket_stress_thread.join();