diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index c04c68fa231..96d327d6a17 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -111,41 +111,49 @@ class AresClientChannelDNSResolver : public PollingResolver { : resolver_(std::move(resolver)) { // TODO(hork): replace this callback bookkeeping with promises. // Locking to prevent completion before all records are queried - MutexLock lock(&on_resolved_mu_); Ref(DEBUG_LOCATION, "OnHostnameResolved").release(); - GRPC_CLOSURE_INIT(&on_hostname_resolved_, OnHostnameResolved, this, - nullptr); - hostname_request_.reset(grpc_dns_lookup_hostname_ares( - resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(), - kDefaultSecurePort, resolver_->interested_parties(), - &on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving hostnames. hostname_request_:%p", - resolver_.get(), hostname_request_.get()); - if (resolver_->enable_srv_queries_) { - Ref(DEBUG_LOCATION, "OnSRVResolved").release(); - GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr); - srv_request_.reset(grpc_dns_lookup_srv_ares( - resolver_->authority().c_str(), - resolver_->name_to_resolve().c_str(), - resolver_->interested_parties(), &on_srv_resolved_, - &balancer_addresses_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving SRV records. srv_request_:%p", - resolver_.get(), srv_request_.get()); - } - if (resolver_->request_service_config_) { - Ref(DEBUG_LOCATION, "OnTXTResolved").release(); - GRPC_CLOSURE_INIT(&on_txt_resolved_, OnTXTResolved, this, nullptr); - txt_request_.reset(grpc_dns_lookup_txt_ares( - resolver_->authority().c_str(), - resolver_->name_to_resolve().c_str(), - resolver_->interested_parties(), &on_txt_resolved_, - &service_config_json_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving TXT records. txt_request_:%p", - resolver_.get(), srv_request_.get()); - } + resolver_->work_serializer()->Run( + [this] { + // Run in the work serializer to ensure all requests are started + // before any results are processed. + GRPC_CLOSURE_INIT(&on_hostname_resolved_, OnHostnameResolved, this, + nullptr); + hostname_request_.reset(grpc_dns_lookup_hostname_ares( + resolver_->authority().c_str(), + resolver_->name_to_resolve().c_str(), kDefaultSecurePort, + resolver_->interested_parties(), &on_hostname_resolved_, + &addresses_, resolver_->query_timeout_ms_)); + GRPC_CARES_TRACE_LOG( + "resolver:%p Started resolving hostnames. hostname_request_:%p", + resolver_.get(), hostname_request_.get()); + if (resolver_->enable_srv_queries_) { + Ref(DEBUG_LOCATION, "OnSRVResolved").release(); + GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, + nullptr); + srv_request_.reset(grpc_dns_lookup_srv_ares( + resolver_->authority().c_str(), + resolver_->name_to_resolve().c_str(), + resolver_->interested_parties(), &on_srv_resolved_, + &balancer_addresses_, resolver_->query_timeout_ms_)); + GRPC_CARES_TRACE_LOG( + "resolver:%p Started resolving SRV records. srv_request_:%p", + resolver_.get(), srv_request_.get()); + } + if (resolver_->request_service_config_) { + Ref(DEBUG_LOCATION, "OnTXTResolved").release(); + GRPC_CLOSURE_INIT(&on_txt_resolved_, OnTXTResolved, this, + nullptr); + txt_request_.reset(grpc_dns_lookup_txt_ares( + resolver_->authority().c_str(), + resolver_->name_to_resolve().c_str(), + resolver_->interested_parties(), &on_txt_resolved_, + &service_config_json_, resolver_->query_timeout_ms_)); + GRPC_CARES_TRACE_LOG( + "resolver:%p Started resolving TXT records. txt_request_:%p", + resolver_.get(), srv_request_.get()); + } + }, + DEBUG_LOCATION); } ~AresRequestWrapper() override { @@ -157,7 +165,6 @@ class AresClientChannelDNSResolver : public PollingResolver { // OrphanablePtr<>, and there's no way to pass the lock annotation through // there. void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS { - MutexLock lock(&on_resolved_mu_); if (hostname_request_ != nullptr) { grpc_cancel_ares_request(hostname_request_.get()); } @@ -174,26 +181,19 @@ class AresClientChannelDNSResolver : public PollingResolver { static void OnHostnameResolved(void* arg, grpc_error_handle error); static void OnSRVResolved(void* arg, grpc_error_handle error); static void OnTXTResolved(void* arg, grpc_error_handle error); - absl::StatusOr OnResolvedLocked(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_); + void OnResolvedLocked(grpc_error_handle error); - Mutex on_resolved_mu_; RefCountedPtr resolver_; grpc_closure on_hostname_resolved_; - std::unique_ptr hostname_request_ - ABSL_GUARDED_BY(on_resolved_mu_); + std::unique_ptr hostname_request_; grpc_closure on_srv_resolved_; - std::unique_ptr srv_request_ - ABSL_GUARDED_BY(on_resolved_mu_); + std::unique_ptr srv_request_; grpc_closure on_txt_resolved_; - std::unique_ptr txt_request_ - ABSL_GUARDED_BY(on_resolved_mu_); + std::unique_ptr txt_request_; // Output fields from ares request. - std::unique_ptr addresses_ - ABSL_GUARDED_BY(on_resolved_mu_); - std::unique_ptr balancer_addresses_ - ABSL_GUARDED_BY(on_resolved_mu_); - char* service_config_json_ ABSL_GUARDED_BY(on_resolved_mu_) = nullptr; + std::unique_ptr addresses_; + std::unique_ptr balancer_addresses_; + char* service_config_json_ = nullptr; }; ~AresClientChannelDNSResolver() override; @@ -330,55 +330,45 @@ std::string ChooseServiceConfig(char* service_config_choice_json, void AresClientChannelDNSResolver::AresRequestWrapper::OnHostnameResolved( void* arg, grpc_error_handle error) { auto* self = static_cast(arg); - absl::StatusOr result; - { - MutexLock lock(&self->on_resolved_mu_); - self->hostname_request_.reset(); - result = self->OnResolvedLocked(error); - } - if (result.ok()) { - self->resolver_->OnRequestComplete(std::move(*result)); - } - self->Unref(DEBUG_LOCATION, "OnHostnameResolved"); + self->resolver_->work_serializer()->Run( + [self, error] { + self->hostname_request_.reset(); + self->OnResolvedLocked(error); + self->Unref(DEBUG_LOCATION, "OnHostnameResolved"); + }, + DEBUG_LOCATION); } void AresClientChannelDNSResolver::AresRequestWrapper::OnSRVResolved( void* arg, grpc_error_handle error) { auto* self = static_cast(arg); - absl::StatusOr result; - { - MutexLock lock(&self->on_resolved_mu_); - self->srv_request_.reset(); - result = self->OnResolvedLocked(error); - } - if (result.ok()) { - self->resolver_->OnRequestComplete(std::move(*result)); - } - self->Unref(DEBUG_LOCATION, "OnSRVResolved"); + self->resolver_->work_serializer()->Run( + [self, error] { + self->srv_request_.reset(); + self->OnResolvedLocked(error); + self->Unref(DEBUG_LOCATION, "OnSRVResolved"); + }, + DEBUG_LOCATION); } void AresClientChannelDNSResolver::AresRequestWrapper::OnTXTResolved( void* arg, grpc_error_handle error) { auto* self = static_cast(arg); - absl::StatusOr result; - { - MutexLock lock(&self->on_resolved_mu_); - self->txt_request_.reset(); - result = self->OnResolvedLocked(error); - } - if (result.ok()) { - self->resolver_->OnRequestComplete(std::move(*result)); - } - self->Unref(DEBUG_LOCATION, "OnTXTResolved"); + self->resolver_->work_serializer()->Run( + [self, error] { + self->txt_request_.reset(); + self->OnResolvedLocked(error); + self->Unref(DEBUG_LOCATION, "OnTXTResolved"); + }, + DEBUG_LOCATION); } // Returns a Result if resolution is complete, and a non-OK status otherwise; // callers must release the lock and call OnRequestComplete if a Result is // returned. This is because OnRequestComplete may Orphan the resolver, which // requires taking the lock. -absl::StatusOr -AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( - grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) { +void AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( + grpc_error_handle error) { if (hostname_request_ != nullptr || srv_request_ != nullptr || txt_request_ != nullptr) { GRPC_CARES_TRACE_LOG( @@ -387,7 +377,7 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( this, hostname_request_ != nullptr ? "waiting" : "done", srv_request_ != nullptr ? "waiting" : "done", txt_request_ != nullptr ? "waiting" : "done"); - return absl::FailedPreconditionError("Waiting for results."); + return; } GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this); Result result; @@ -439,7 +429,7 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( } result.args = grpc_channel_args_copy_and_add( resolver_->channel_args(), new_args.data(), new_args.size()); - return std::move(result); + resolver_->OnRequestComplete(std::move(result)); } // diff --git a/src/core/ext/filters/client_channel/resolver/polling_resolver.h b/src/core/ext/filters/client_channel/resolver/polling_resolver.h index 224f8dc0815..ae5ff46386f 100644 --- a/src/core/ext/filters/client_channel/resolver/polling_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/polling_resolver.h @@ -71,6 +71,7 @@ class PollingResolver : public Resolver { const std::string& name_to_resolve() const { return name_to_resolve_; } grpc_pollset_set* interested_parties() const { return interested_parties_; } const grpc_channel_args* channel_args() const { return channel_args_; } + WorkSerializer* work_serializer() { return work_serializer_.get(); } private: void MaybeStartResolvingLocked();