replace mu with parent's work serializer

pull/30206/head
AJ Heller 3 years ago
parent 66f73c19ea
commit 0fce0ae150
  1. 158
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  2. 1
      src/core/ext/filters/client_channel/resolver/polling_resolver.h

@ -111,41 +111,49 @@ class AresClientChannelDNSResolver : public PollingResolver {
: resolver_(std::move(resolver)) { : resolver_(std::move(resolver)) {
// TODO(hork): replace this callback bookkeeping with promises. // TODO(hork): replace this callback bookkeeping with promises.
// Locking to prevent completion before all records are queried // Locking to prevent completion before all records are queried
MutexLock lock(&on_resolved_mu_);
Ref(DEBUG_LOCATION, "OnHostnameResolved").release(); Ref(DEBUG_LOCATION, "OnHostnameResolved").release();
GRPC_CLOSURE_INIT(&on_hostname_resolved_, OnHostnameResolved, this, resolver_->work_serializer()->Run(
nullptr); [this] {
hostname_request_.reset(grpc_dns_lookup_hostname_ares( // Run in the work serializer to ensure all requests are started
resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(), // before any results are processed.
kDefaultSecurePort, resolver_->interested_parties(), GRPC_CLOSURE_INIT(&on_hostname_resolved_, OnHostnameResolved, this,
&on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_)); nullptr);
GRPC_CARES_TRACE_LOG( hostname_request_.reset(grpc_dns_lookup_hostname_ares(
"resolver:%p Started resolving hostnames. hostname_request_:%p", resolver_->authority().c_str(),
resolver_.get(), hostname_request_.get()); resolver_->name_to_resolve().c_str(), kDefaultSecurePort,
if (resolver_->enable_srv_queries_) { resolver_->interested_parties(), &on_hostname_resolved_,
Ref(DEBUG_LOCATION, "OnSRVResolved").release(); &addresses_, resolver_->query_timeout_ms_));
GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr); GRPC_CARES_TRACE_LOG(
srv_request_.reset(grpc_dns_lookup_srv_ares( "resolver:%p Started resolving hostnames. hostname_request_:%p",
resolver_->authority().c_str(), resolver_.get(), hostname_request_.get());
resolver_->name_to_resolve().c_str(), if (resolver_->enable_srv_queries_) {
resolver_->interested_parties(), &on_srv_resolved_, Ref(DEBUG_LOCATION, "OnSRVResolved").release();
&balancer_addresses_, resolver_->query_timeout_ms_)); GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this,
GRPC_CARES_TRACE_LOG( nullptr);
"resolver:%p Started resolving SRV records. srv_request_:%p", srv_request_.reset(grpc_dns_lookup_srv_ares(
resolver_.get(), srv_request_.get()); resolver_->authority().c_str(),
} resolver_->name_to_resolve().c_str(),
if (resolver_->request_service_config_) { resolver_->interested_parties(), &on_srv_resolved_,
Ref(DEBUG_LOCATION, "OnTXTResolved").release(); &balancer_addresses_, resolver_->query_timeout_ms_));
GRPC_CLOSURE_INIT(&on_txt_resolved_, OnTXTResolved, this, nullptr); GRPC_CARES_TRACE_LOG(
txt_request_.reset(grpc_dns_lookup_txt_ares( "resolver:%p Started resolving SRV records. srv_request_:%p",
resolver_->authority().c_str(), resolver_.get(), srv_request_.get());
resolver_->name_to_resolve().c_str(), }
resolver_->interested_parties(), &on_txt_resolved_, if (resolver_->request_service_config_) {
&service_config_json_, resolver_->query_timeout_ms_)); Ref(DEBUG_LOCATION, "OnTXTResolved").release();
GRPC_CARES_TRACE_LOG( GRPC_CLOSURE_INIT(&on_txt_resolved_, OnTXTResolved, this,
"resolver:%p Started resolving TXT records. txt_request_:%p", nullptr);
resolver_.get(), srv_request_.get()); txt_request_.reset(grpc_dns_lookup_txt_ares(
} resolver_->authority().c_str(),
resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_txt_resolved_,
&service_config_json_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving TXT records. txt_request_:%p",
resolver_.get(), srv_request_.get());
}
},
DEBUG_LOCATION);
} }
~AresRequestWrapper() override { ~AresRequestWrapper() override {
@ -157,7 +165,6 @@ class AresClientChannelDNSResolver : public PollingResolver {
// OrphanablePtr<>, and there's no way to pass the lock annotation through // OrphanablePtr<>, and there's no way to pass the lock annotation through
// there. // there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS { void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
MutexLock lock(&on_resolved_mu_);
if (hostname_request_ != nullptr) { if (hostname_request_ != nullptr) {
grpc_cancel_ares_request(hostname_request_.get()); grpc_cancel_ares_request(hostname_request_.get());
} }
@ -174,26 +181,19 @@ class AresClientChannelDNSResolver : public PollingResolver {
static void OnHostnameResolved(void* arg, grpc_error_handle error); static void OnHostnameResolved(void* arg, grpc_error_handle error);
static void OnSRVResolved(void* arg, grpc_error_handle error); static void OnSRVResolved(void* arg, grpc_error_handle error);
static void OnTXTResolved(void* arg, grpc_error_handle error); static void OnTXTResolved(void* arg, grpc_error_handle error);
absl::StatusOr<Result> OnResolvedLocked(grpc_error_handle error) void OnResolvedLocked(grpc_error_handle error);
ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_);
Mutex on_resolved_mu_;
RefCountedPtr<AresClientChannelDNSResolver> resolver_; RefCountedPtr<AresClientChannelDNSResolver> resolver_;
grpc_closure on_hostname_resolved_; grpc_closure on_hostname_resolved_;
std::unique_ptr<grpc_ares_request> hostname_request_ std::unique_ptr<grpc_ares_request> hostname_request_;
ABSL_GUARDED_BY(on_resolved_mu_);
grpc_closure on_srv_resolved_; grpc_closure on_srv_resolved_;
std::unique_ptr<grpc_ares_request> srv_request_ std::unique_ptr<grpc_ares_request> srv_request_;
ABSL_GUARDED_BY(on_resolved_mu_);
grpc_closure on_txt_resolved_; grpc_closure on_txt_resolved_;
std::unique_ptr<grpc_ares_request> txt_request_ std::unique_ptr<grpc_ares_request> txt_request_;
ABSL_GUARDED_BY(on_resolved_mu_);
// Output fields from ares request. // Output fields from ares request.
std::unique_ptr<ServerAddressList> addresses_ std::unique_ptr<ServerAddressList> addresses_;
ABSL_GUARDED_BY(on_resolved_mu_); std::unique_ptr<ServerAddressList> balancer_addresses_;
std::unique_ptr<ServerAddressList> balancer_addresses_ char* service_config_json_ = nullptr;
ABSL_GUARDED_BY(on_resolved_mu_);
char* service_config_json_ ABSL_GUARDED_BY(on_resolved_mu_) = nullptr;
}; };
~AresClientChannelDNSResolver() override; ~AresClientChannelDNSResolver() override;
@ -330,55 +330,45 @@ std::string ChooseServiceConfig(char* service_config_choice_json,
void AresClientChannelDNSResolver::AresRequestWrapper::OnHostnameResolved( void AresClientChannelDNSResolver::AresRequestWrapper::OnHostnameResolved(
void* arg, grpc_error_handle error) { void* arg, grpc_error_handle error) {
auto* self = static_cast<AresRequestWrapper*>(arg); auto* self = static_cast<AresRequestWrapper*>(arg);
absl::StatusOr<Result> result; self->resolver_->work_serializer()->Run(
{ [self, error] {
MutexLock lock(&self->on_resolved_mu_); self->hostname_request_.reset();
self->hostname_request_.reset(); self->OnResolvedLocked(error);
result = self->OnResolvedLocked(error); self->Unref(DEBUG_LOCATION, "OnHostnameResolved");
} },
if (result.ok()) { DEBUG_LOCATION);
self->resolver_->OnRequestComplete(std::move(*result));
}
self->Unref(DEBUG_LOCATION, "OnHostnameResolved");
} }
void AresClientChannelDNSResolver::AresRequestWrapper::OnSRVResolved( void AresClientChannelDNSResolver::AresRequestWrapper::OnSRVResolved(
void* arg, grpc_error_handle error) { void* arg, grpc_error_handle error) {
auto* self = static_cast<AresRequestWrapper*>(arg); auto* self = static_cast<AresRequestWrapper*>(arg);
absl::StatusOr<Result> result; self->resolver_->work_serializer()->Run(
{ [self, error] {
MutexLock lock(&self->on_resolved_mu_); self->srv_request_.reset();
self->srv_request_.reset(); self->OnResolvedLocked(error);
result = self->OnResolvedLocked(error); self->Unref(DEBUG_LOCATION, "OnSRVResolved");
} },
if (result.ok()) { DEBUG_LOCATION);
self->resolver_->OnRequestComplete(std::move(*result));
}
self->Unref(DEBUG_LOCATION, "OnSRVResolved");
} }
void AresClientChannelDNSResolver::AresRequestWrapper::OnTXTResolved( void AresClientChannelDNSResolver::AresRequestWrapper::OnTXTResolved(
void* arg, grpc_error_handle error) { void* arg, grpc_error_handle error) {
auto* self = static_cast<AresRequestWrapper*>(arg); auto* self = static_cast<AresRequestWrapper*>(arg);
absl::StatusOr<Result> result; self->resolver_->work_serializer()->Run(
{ [self, error] {
MutexLock lock(&self->on_resolved_mu_); self->txt_request_.reset();
self->txt_request_.reset(); self->OnResolvedLocked(error);
result = self->OnResolvedLocked(error); self->Unref(DEBUG_LOCATION, "OnTXTResolved");
} },
if (result.ok()) { DEBUG_LOCATION);
self->resolver_->OnRequestComplete(std::move(*result));
}
self->Unref(DEBUG_LOCATION, "OnTXTResolved");
} }
// Returns a Result if resolution is complete, and a non-OK status otherwise; // Returns a Result if resolution is complete, and a non-OK status otherwise;
// callers must release the lock and call OnRequestComplete if a Result is // callers must release the lock and call OnRequestComplete if a Result is
// returned. This is because OnRequestComplete may Orphan the resolver, which // returned. This is because OnRequestComplete may Orphan the resolver, which
// requires taking the lock. // requires taking the lock.
absl::StatusOr<AresClientChannelDNSResolver::Result> void AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( grpc_error_handle error) {
grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) {
if (hostname_request_ != nullptr || srv_request_ != nullptr || if (hostname_request_ != nullptr || srv_request_ != nullptr ||
txt_request_ != nullptr) { txt_request_ != nullptr) {
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
@ -387,7 +377,7 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
this, hostname_request_ != nullptr ? "waiting" : "done", this, hostname_request_ != nullptr ? "waiting" : "done",
srv_request_ != nullptr ? "waiting" : "done", srv_request_ != nullptr ? "waiting" : "done",
txt_request_ != nullptr ? "waiting" : "done"); txt_request_ != nullptr ? "waiting" : "done");
return absl::FailedPreconditionError("Waiting for results."); return;
} }
GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this); GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this);
Result result; Result result;
@ -439,7 +429,7 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
} }
result.args = grpc_channel_args_copy_and_add( result.args = grpc_channel_args_copy_and_add(
resolver_->channel_args(), new_args.data(), new_args.size()); resolver_->channel_args(), new_args.data(), new_args.size());
return std::move(result); resolver_->OnRequestComplete(std::move(result));
} }
// //

@ -71,6 +71,7 @@ class PollingResolver : public Resolver {
const std::string& name_to_resolve() const { return name_to_resolve_; } const std::string& name_to_resolve() const { return name_to_resolve_; }
grpc_pollset_set* interested_parties() const { return interested_parties_; } grpc_pollset_set* interested_parties() const { return interested_parties_; }
const grpc_channel_args* channel_args() const { return channel_args_; } const grpc_channel_args* channel_args() const { return channel_args_; }
WorkSerializer* work_serializer() { return work_serializer_.get(); }
private: private:
void MaybeStartResolvingLocked(); void MaybeStartResolvingLocked();

Loading…
Cancel
Save