[EventEngine] Implement the EventEngine-driven client channel resolver. (#32632)
This PR also centralizes the client channel resolver selection. Resolver selection is still done using the plugin system, but when the Ares and native client channel resolvers go away, we can consider bootstrapping this differently.pull/32766/head
parent
b94d55bd20
commit
7dec55de5a
34 changed files with 1069 additions and 148 deletions
@ -0,0 +1,30 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_DNS_RESOLVER_ARES_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_DNS_RESOLVER_ARES_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/lib/config/core_configuration.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
bool ShouldUseAresDnsResolver(absl::string_view resolver_env); |
||||
void RegisterAresDnsResolver(CoreConfiguration::Builder*); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_DNS_RESOLVER_ARES_H
|
@ -0,0 +1,60 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/resolver/dns/dns_resolver_plugin.h" |
||||
|
||||
#include <memory> |
||||
|
||||
#include "absl/strings/match.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.h" |
||||
#include "src/core/ext/filters/client_channel/resolver/dns/event_engine/event_engine_client_channel_resolver.h" |
||||
#include "src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.h" |
||||
#include "src/core/lib/config/config_vars.h" |
||||
#include "src/core/lib/experiments/experiments.h" |
||||
#include "src/core/lib/gprpp/crash.h" |
||||
#include "src/core/lib/resolver/resolver_factory.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
void RegisterDnsResolver(CoreConfiguration::Builder* builder) { |
||||
if (IsEventEngineDnsEnabled()) { |
||||
gpr_log(GPR_DEBUG, "Using EventEngine dns resolver"); |
||||
builder->resolver_registry()->RegisterResolverFactory( |
||||
std::make_unique<EventEngineClientChannelDNSResolverFactory>()); |
||||
return; |
||||
} |
||||
auto resolver = ConfigVars::Get().DnsResolver(); |
||||
// ---- Ares resolver ----
|
||||
if (ShouldUseAresDnsResolver(resolver)) { |
||||
gpr_log(GPR_DEBUG, "Using ares dns resolver"); |
||||
RegisterAresDnsResolver(builder); |
||||
return; |
||||
} |
||||
// ---- Native resolver ----
|
||||
if (absl::EqualsIgnoreCase(resolver, "native") || |
||||
!builder->resolver_registry()->HasResolverFactory("dns")) { |
||||
gpr_log(GPR_DEBUG, "Using native dns resolver"); |
||||
RegisterNativeDnsResolver(builder); |
||||
return; |
||||
} |
||||
Crash( |
||||
"Unable to set DNS resolver! Likely a logic error in gRPC-core, " |
||||
"please file a bug."); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,27 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_DNS_RESOLVER_PLUGIN_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_DNS_RESOLVER_PLUGIN_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/config/core_configuration.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Centralized decision logic about which client channel DNS resolver to enable.
|
||||
void RegisterDnsResolver(CoreConfiguration::Builder* builder); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_DNS_RESOLVER_PLUGIN_H
|
@ -0,0 +1,524 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/resolver/dns/event_engine/event_engine_client_channel_resolver.h" |
||||
|
||||
#include <inttypes.h> |
||||
#include <stddef.h> |
||||
|
||||
#include <algorithm> |
||||
#include <chrono> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
#include <vector> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/cleanup/cleanup.h" |
||||
#include "absl/container/flat_hash_set.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/strip.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h" |
||||
#include "src/core/ext/filters/client_channel/resolver/dns/event_engine/service_config_helper.h" |
||||
#include "src/core/ext/filters/client_channel/resolver/polling_resolver.h" |
||||
#include "src/core/lib/backoff/backoff.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/event_engine/handle_containers.h" |
||||
#include "src/core/lib/event_engine/resolved_address_internal.h" |
||||
#include "src/core/lib/event_engine/utils.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/gprpp/validation_errors.h" |
||||
#include "src/core/lib/iomgr/resolve_address.h" |
||||
#include "src/core/lib/resolver/resolver.h" |
||||
#include "src/core/lib/resolver/resolver_factory.h" |
||||
#include "src/core/lib/resolver/server_address.h" |
||||
#include "src/core/lib/service_config/service_config.h" |
||||
#include "src/core/lib/service_config/service_config_impl.h" |
||||
|
||||
// IWYU pragma: no_include <ratio>
|
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
||||
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
||||
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 |
||||
#define GRPC_DNS_RECONNECT_JITTER 0.2 |
||||
#define GRPC_DNS_DEFAULT_QUERY_TIMEOUT_MS 120000 |
||||
|
||||
using grpc_event_engine::experimental::EventEngine; |
||||
using grpc_event_engine::experimental::HandleToString; |
||||
using grpc_event_engine::experimental::LookupTaskHandleSet; |
||||
|
||||
// TODO(hork): Investigate adding a resolver test scenario where the first
|
||||
// balancer hostname lookup result is an error, and the second contains valid
|
||||
// addresses.
|
||||
// TODO(hork): Add a test that checks for proper authority from balancer
|
||||
// addresses.
|
||||
|
||||
// TODO(hork): replace this with `dns_resolver` when all other resolver
|
||||
// implementations are removed.
|
||||
TraceFlag grpc_event_engine_client_channel_resolver_trace( |
||||
false, "event_engine_client_channel_resolver"); |
||||
|
||||
#define GRPC_EVENT_ENGINE_RESOLVER_TRACE(format, ...) \ |
||||
if (GRPC_TRACE_FLAG_ENABLED( \
|
||||
grpc_event_engine_client_channel_resolver_trace)) { \
|
||||
gpr_log(GPR_DEBUG, "(event_engine client channel resolver) " format, \
|
||||
__VA_ARGS__); \
|
||||
} |
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// EventEngineClientChannelDNSResolver
|
||||
// ----------------------------------------------------------------------------
|
||||
class EventEngineClientChannelDNSResolver : public PollingResolver { |
||||
public: |
||||
EventEngineClientChannelDNSResolver(ResolverArgs args, |
||||
Duration min_time_between_resolutions); |
||||
OrphanablePtr<Orphanable> StartRequest() override; |
||||
|
||||
private: |
||||
// ----------------------------------------------------------------------------
|
||||
// EventEngineDNSRequestWrapper declaration
|
||||
// ----------------------------------------------------------------------------
|
||||
class EventEngineDNSRequestWrapper |
||||
: public InternallyRefCounted<EventEngineDNSRequestWrapper> { |
||||
public: |
||||
EventEngineDNSRequestWrapper( |
||||
RefCountedPtr<EventEngineClientChannelDNSResolver> resolver, |
||||
std::unique_ptr<EventEngine::DNSResolver> event_engine_resolver); |
||||
~EventEngineDNSRequestWrapper() override; |
||||
|
||||
// Note that thread safety cannot be analyzed due to this being invoked from
|
||||
// OrphanablePtr<>, and there's no way to pass the lock annotation through
|
||||
// there.
|
||||
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; |
||||
|
||||
private: |
||||
void OnHostnameResolved( |
||||
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses); |
||||
void OnSRVResolved( |
||||
absl::StatusOr<std::vector<EventEngine::DNSResolver::SRVRecord>> |
||||
srv_records); |
||||
void OnBalancerHostnamesResolved( |
||||
std::string authority, |
||||
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses); |
||||
void OnTXTResolved(absl::StatusOr<std::string> service_config); |
||||
// Returns a Result if resolution is complete.
|
||||
// callers must release the lock and call OnRequestComplete if a Result is
|
||||
// returned. This is because OnRequestComplete may Orphan the resolver,
|
||||
// which requires taking the lock.
|
||||
absl::optional<Resolver::Result> OnResolvedLocked() |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_); |
||||
// Helper method to populate server addresses on resolver result.
|
||||
void MaybePopulateAddressesLocked(Resolver::Result* result) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_); |
||||
// Helper method to populate balancer addresses on resolver result.
|
||||
void MaybePopulateBalancerAddressesLocked(Resolver::Result* result) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_); |
||||
// Helper method to populate service config on resolver result.
|
||||
void MaybePopulateServiceConfigLocked(Resolver::Result* result) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_); |
||||
|
||||
RefCountedPtr<EventEngineClientChannelDNSResolver> resolver_; |
||||
Mutex on_resolved_mu_; |
||||
// Lookup callbacks
|
||||
absl::optional<EventEngine::DNSResolver::LookupTaskHandle> hostname_handle_ |
||||
ABSL_GUARDED_BY(on_resolved_mu_); |
||||
absl::optional<EventEngine::DNSResolver::LookupTaskHandle> srv_handle_ |
||||
ABSL_GUARDED_BY(on_resolved_mu_); |
||||
absl::optional<EventEngine::DNSResolver::LookupTaskHandle> txt_handle_ |
||||
ABSL_GUARDED_BY(on_resolved_mu_); |
||||
LookupTaskHandleSet balancer_hostname_handles_ |
||||
ABSL_GUARDED_BY(on_resolved_mu_); |
||||
// Output fields from requests.
|
||||
ServerAddressList addresses_ ABSL_GUARDED_BY(on_resolved_mu_); |
||||
ServerAddressList balancer_addresses_ ABSL_GUARDED_BY(on_resolved_mu_); |
||||
ValidationErrors errors_ ABSL_GUARDED_BY(on_resolved_mu_); |
||||
absl::StatusOr<std::string> service_config_json_ |
||||
ABSL_GUARDED_BY(on_resolved_mu_); |
||||
// Other internal state
|
||||
size_t number_of_balancer_hostnames_resolved_ |
||||
ABSL_GUARDED_BY(on_resolved_mu_) = 0; |
||||
bool orphaned_ ABSL_GUARDED_BY(on_resolved_mu_) = false; |
||||
std::unique_ptr<EventEngine::DNSResolver> event_engine_resolver_; |
||||
}; |
||||
|
||||
/// whether to request the service config
|
||||
const bool request_service_config_; |
||||
// whether or not to enable SRV DNS queries
|
||||
const bool enable_srv_queries_; |
||||
// timeout in milliseconds for active DNS queries
|
||||
EventEngine::Duration query_timeout_ms_; |
||||
std::shared_ptr<EventEngine> event_engine_; |
||||
}; |
||||
|
||||
EventEngineClientChannelDNSResolver::EventEngineClientChannelDNSResolver( |
||||
ResolverArgs args, Duration min_time_between_resolutions) |
||||
: PollingResolver(std::move(args), min_time_between_resolutions, |
||||
BackOff::Options() |
||||
.set_initial_backoff(Duration::Milliseconds( |
||||
GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)) |
||||
.set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) |
||||
.set_jitter(GRPC_DNS_RECONNECT_JITTER) |
||||
.set_max_backoff(Duration::Milliseconds( |
||||
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)), |
||||
&grpc_event_engine_client_channel_resolver_trace), |
||||
request_service_config_( |
||||
!channel_args() |
||||
.GetBool(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION) |
||||
.value_or(true)), |
||||
enable_srv_queries_(channel_args() |
||||
.GetBool(GRPC_ARG_DNS_ENABLE_SRV_QUERIES) |
||||
.value_or(false)), |
||||
// TODO(yijiem): decide if the ares channel arg timeout should be reused.
|
||||
query_timeout_ms_(std::chrono::milliseconds( |
||||
std::max(0, channel_args() |
||||
.GetInt(GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS) |
||||
.value_or(GRPC_DNS_DEFAULT_QUERY_TIMEOUT_MS)))), |
||||
event_engine_(channel_args().GetObjectRef<EventEngine>()) {} |
||||
|
||||
OrphanablePtr<Orphanable> EventEngineClientChannelDNSResolver::StartRequest() { |
||||
return MakeOrphanable<EventEngineDNSRequestWrapper>( |
||||
Ref(DEBUG_LOCATION, "dns-resolving"), |
||||
event_engine_->GetDNSResolver({/*dns_server=*/authority()})); |
||||
} |
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// EventEngineDNSRequestWrapper definition
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
EventEngineDNSRequestWrapper( |
||||
RefCountedPtr<EventEngineClientChannelDNSResolver> resolver, |
||||
std::unique_ptr<EventEngine::DNSResolver> event_engine_resolver) |
||||
: resolver_(std::move(resolver)), |
||||
event_engine_resolver_(std::move(event_engine_resolver)) { |
||||
// Locking to prevent completion before all records are queried
|
||||
MutexLock lock(&on_resolved_mu_); |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p Starting hostname resolution for %s", resolver_.get(), |
||||
resolver_->name_to_resolve().c_str()); |
||||
hostname_handle_ = event_engine_resolver_->LookupHostname( |
||||
[self = Ref(DEBUG_LOCATION, "OnHostnameResolved")]( |
||||
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses) { |
||||
self->OnHostnameResolved(std::move(addresses)); |
||||
}, |
||||
resolver_->name_to_resolve(), kDefaultSecurePort, |
||||
resolver_->query_timeout_ms_); |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE("hostname lookup handle: %s", |
||||
HandleToString(*hostname_handle_).c_str()); |
||||
if (resolver_->enable_srv_queries_) { |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p Starting SRV record resolution for %s", |
||||
resolver_.get(), resolver_->name_to_resolve().c_str()); |
||||
srv_handle_ = event_engine_resolver_->LookupSRV( |
||||
[self = Ref(DEBUG_LOCATION, "OnSRVResolved")]( |
||||
absl::StatusOr<std::vector<EventEngine::DNSResolver::SRVRecord>> |
||||
srv_records) { self->OnSRVResolved(std::move(srv_records)); }, |
||||
resolver_->name_to_resolve(), resolver_->query_timeout_ms_); |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE("srv lookup handle: %s", |
||||
HandleToString(*srv_handle_).c_str()); |
||||
} |
||||
if (resolver_->request_service_config_) { |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p Starting TXT record resolution for %s", |
||||
resolver_.get(), resolver_->name_to_resolve().c_str()); |
||||
txt_handle_ = event_engine_resolver_->LookupTXT( |
||||
[self = Ref(DEBUG_LOCATION, "OnTXTResolved")]( |
||||
absl::StatusOr<std::string> service_config) { |
||||
self->OnTXTResolved(std::move(service_config)); |
||||
}, |
||||
absl::StrCat("_grpc_config.", resolver_->name_to_resolve()), |
||||
resolver_->query_timeout_ms_); |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE("txt lookup handle: %s", |
||||
HandleToString(*txt_handle_).c_str()); |
||||
} |
||||
} |
||||
|
||||
EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
~EventEngineDNSRequestWrapper() { |
||||
resolver_.reset(DEBUG_LOCATION, "dns-resolving"); |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
Orphan() { |
||||
{ |
||||
MutexLock lock(&on_resolved_mu_); |
||||
orphaned_ = true; |
||||
// Event if cancellation fails here, OnResolvedLocked will return early, and
|
||||
// the resolver will never see a completed request.
|
||||
if (hostname_handle_.has_value()) { |
||||
event_engine_resolver_->CancelLookup(*hostname_handle_); |
||||
} |
||||
if (srv_handle_.has_value()) { |
||||
event_engine_resolver_->CancelLookup(*srv_handle_); |
||||
} |
||||
for (const auto& handle : balancer_hostname_handles_) { |
||||
event_engine_resolver_->CancelLookup(handle); |
||||
} |
||||
if (txt_handle_.has_value()) { |
||||
event_engine_resolver_->CancelLookup(*txt_handle_); |
||||
} |
||||
} |
||||
Unref(DEBUG_LOCATION, "Orphan"); |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
OnHostnameResolved(absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> |
||||
new_addresses) { |
||||
ValidationErrors::ScopedField field(&errors_, "hostname lookup"); |
||||
absl::optional<Resolver::Result> result; |
||||
{ |
||||
MutexLock lock(&on_resolved_mu_); |
||||
if (orphaned_) return; |
||||
hostname_handle_.reset(); |
||||
if (!new_addresses.ok()) { |
||||
errors_.AddError(new_addresses.status().message()); |
||||
} else { |
||||
addresses_.reserve(addresses_.size() + new_addresses->size()); |
||||
for (const auto& addr : *new_addresses) { |
||||
addresses_.emplace_back(CreateGRPCResolvedAddress(addr), ChannelArgs()); |
||||
} |
||||
} |
||||
result = OnResolvedLocked(); |
||||
} |
||||
if (result.has_value()) { |
||||
resolver_->OnRequestComplete(std::move(*result)); |
||||
} |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
OnSRVResolved( |
||||
absl::StatusOr<std::vector<EventEngine::DNSResolver::SRVRecord>> |
||||
srv_records) { |
||||
ValidationErrors::ScopedField field(&errors_, "srv lookup"); |
||||
absl::optional<Resolver::Result> result; |
||||
auto cleanup = absl::MakeCleanup([&]() { |
||||
if (result.has_value()) { |
||||
resolver_->OnRequestComplete(std::move(*result)); |
||||
} |
||||
}); |
||||
MutexLock lock(&on_resolved_mu_); |
||||
if (orphaned_) return; |
||||
srv_handle_.reset(); |
||||
if (!srv_records.ok()) { |
||||
// An error has occurred, finish resolving.
|
||||
errors_.AddError(srv_records.status().message()); |
||||
result = OnResolvedLocked(); |
||||
return; |
||||
} |
||||
// Do a subsequent hostname query since SRV records were returned
|
||||
for (auto& srv_record : *srv_records) { |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p Starting balancer hostname resolution for %s:%d", |
||||
resolver_.get(), srv_record.host.c_str(), srv_record.port); |
||||
auto handle = event_engine_resolver_->LookupHostname( |
||||
[host = std::move(srv_record.host), |
||||
self = Ref(DEBUG_LOCATION, "OnBalancerHostnamesResolved")]( |
||||
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> |
||||
new_balancer_addresses) mutable { |
||||
self->OnBalancerHostnamesResolved(std::move(host), |
||||
std::move(new_balancer_addresses)); |
||||
}, |
||||
srv_record.host, std::to_string(srv_record.port), |
||||
resolver_->query_timeout_ms_); |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE("balancer hostname lookup handle: %s", |
||||
HandleToString(handle).c_str()); |
||||
balancer_hostname_handles_.insert(handle); |
||||
} |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
OnBalancerHostnamesResolved( |
||||
std::string authority, |
||||
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> |
||||
new_balancer_addresses) { |
||||
ValidationErrors::ScopedField field( |
||||
&errors_, absl::StrCat("balancer lookup for ", authority)); |
||||
absl::optional<Resolver::Result> result; |
||||
auto cleanup = absl::MakeCleanup([&]() { |
||||
if (result.has_value()) { |
||||
resolver_->OnRequestComplete(std::move(*result)); |
||||
} |
||||
}); |
||||
MutexLock lock(&on_resolved_mu_); |
||||
if (orphaned_) return; |
||||
++number_of_balancer_hostnames_resolved_; |
||||
if (!new_balancer_addresses.ok()) { |
||||
// An error has occurred, finish resolving.
|
||||
errors_.AddError(new_balancer_addresses.status().message()); |
||||
} else { |
||||
// Capture the addresses and finish resolving.
|
||||
balancer_addresses_.reserve(balancer_addresses_.size() + |
||||
new_balancer_addresses->size()); |
||||
auto srv_channel_args = |
||||
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, authority); |
||||
for (const auto& addr : *new_balancer_addresses) { |
||||
balancer_addresses_.emplace_back(CreateGRPCResolvedAddress(addr), |
||||
srv_channel_args); |
||||
} |
||||
} |
||||
result = OnResolvedLocked(); |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
OnTXTResolved(absl::StatusOr<std::string> service_config) { |
||||
ValidationErrors::ScopedField field(&errors_, "txt lookup"); |
||||
absl::optional<Resolver::Result> result; |
||||
{ |
||||
MutexLock lock(&on_resolved_mu_); |
||||
if (orphaned_) return; |
||||
GPR_ASSERT(txt_handle_.has_value()); |
||||
txt_handle_.reset(); |
||||
if (!service_config.ok()) { |
||||
errors_.AddError(service_config.status().message()); |
||||
service_config_json_ = service_config.status(); |
||||
} else { |
||||
service_config_json_ = absl::StrCat("grpc_config=", *service_config); |
||||
} |
||||
result = OnResolvedLocked(); |
||||
} |
||||
if (result.has_value()) { |
||||
resolver_->OnRequestComplete(std::move(*result)); |
||||
} |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
MaybePopulateAddressesLocked(Resolver::Result* result) { |
||||
if (addresses_.empty()) return; |
||||
result->addresses = std::move(addresses_); |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
MaybePopulateBalancerAddressesLocked(Resolver::Result* result) { |
||||
if (!balancer_addresses_.empty()) { |
||||
result->args = |
||||
SetGrpcLbBalancerAddresses(result->args, balancer_addresses_); |
||||
} |
||||
} |
||||
|
||||
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper:: |
||||
MaybePopulateServiceConfigLocked(Resolver::Result* result) { |
||||
// This function is called only if we are returning addresses. In that case,
|
||||
// we currently ignore TXT lookup failures.
|
||||
// TODO(roth): Consider differentiating between NXDOMAIN and other failures,
|
||||
// so that we can return an error in the non-NXDOMAIN case.
|
||||
if (!service_config_json_.ok()) return; |
||||
// TXT lookup succeeded, so parse the config.
|
||||
auto service_config = ChooseServiceConfig(*service_config_json_); |
||||
if (!service_config.ok()) { |
||||
result->service_config = absl::UnavailableError(absl::StrCat( |
||||
"failed to parse service config: ", service_config.status().message())); |
||||
return; |
||||
} |
||||
if (service_config->empty()) return; |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p selected service config choice: %s", |
||||
event_engine_resolver_.get(), service_config->c_str()); |
||||
result->service_config = |
||||
ServiceConfigImpl::Create(resolver_->channel_args(), *service_config); |
||||
if (!result->service_config.ok()) { |
||||
result->service_config = absl::UnavailableError( |
||||
absl::StrCat("failed to parse service config: ", |
||||
result->service_config.status().message())); |
||||
} |
||||
} |
||||
|
||||
absl::optional<Resolver::Result> EventEngineClientChannelDNSResolver:: |
||||
EventEngineDNSRequestWrapper::OnResolvedLocked() { |
||||
if (orphaned_) return absl::nullopt; |
||||
// Wait for all requested queries to return.
|
||||
if (hostname_handle_.has_value() || srv_handle_.has_value() || |
||||
txt_handle_.has_value() || |
||||
number_of_balancer_hostnames_resolved_ != |
||||
balancer_hostname_handles_.size()) { |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p OnResolved() waiting for results (hostname: %s, " |
||||
"srv: %s, " |
||||
"txt: %s, " |
||||
"balancer addresses: %" PRIuPTR "/%" PRIuPTR " complete", |
||||
this, hostname_handle_.has_value() ? "waiting" : "done", |
||||
srv_handle_.has_value() ? "waiting" : "done", |
||||
txt_handle_.has_value() ? "waiting" : "done", |
||||
number_of_balancer_hostnames_resolved_, |
||||
balancer_hostname_handles_.size()); |
||||
return absl::nullopt; |
||||
} |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE( |
||||
"DNSResolver::%p OnResolvedLocked() proceeding", this); |
||||
Resolver::Result result; |
||||
result.args = resolver_->channel_args(); |
||||
// If both addresses and balancer addresses failed, return an error for both
|
||||
// addresses and service config.
|
||||
if (addresses_.empty() && balancer_addresses_.empty()) { |
||||
absl::Status status = errors_.status( |
||||
absl::StatusCode::kUnavailable, |
||||
absl::StrCat("errors resolving ", resolver_->name_to_resolve())); |
||||
GRPC_EVENT_ENGINE_RESOLVER_TRACE("%s", status.message().data()); |
||||
result.addresses = status; |
||||
result.service_config = status; |
||||
return std::move(result); |
||||
} |
||||
if (!errors_.ok()) { |
||||
result.resolution_note = errors_.message( |
||||
absl::StrCat("errors resolving ", resolver_->name_to_resolve())); |
||||
} |
||||
// We have at least one of addresses or balancer addresses, so we're going to
|
||||
// return a non-error for addresses.
|
||||
result.addresses.emplace(); |
||||
MaybePopulateAddressesLocked(&result); |
||||
MaybePopulateServiceConfigLocked(&result); |
||||
MaybePopulateBalancerAddressesLocked(&result); |
||||
return std::move(result); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
bool EventEngineClientChannelDNSResolverFactory::IsValidUri( |
||||
const URI& uri) const { |
||||
if (absl::StripPrefix(uri.path(), "/").empty()) { |
||||
gpr_log(GPR_ERROR, "no server name supplied in dns URI"); |
||||
return false; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
OrphanablePtr<Resolver> |
||||
EventEngineClientChannelDNSResolverFactory::CreateResolver( |
||||
ResolverArgs args) const { |
||||
Duration min_time_between_resolutions = std::max( |
||||
Duration::Zero(), args.args |
||||
.GetDurationFromIntMillis( |
||||
GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS) |
||||
.value_or(Duration::Seconds(30))); |
||||
return MakeOrphanable<EventEngineClientChannelDNSResolver>( |
||||
std::move(args), min_time_between_resolutions); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,35 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_EVENT_ENGINE_EVENT_ENGINE_CLIENT_CHANNEL_RESOLVER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_EVENT_ENGINE_EVENT_ENGINE_CLIENT_CHANNEL_RESOLVER_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/resolver/resolver.h" |
||||
#include "src/core/lib/resolver/resolver_factory.h" |
||||
#include "src/core/lib/uri/uri_parser.h" |
||||
|
||||
namespace grpc_core { |
||||
class EventEngineClientChannelDNSResolverFactory : public ResolverFactory { |
||||
public: |
||||
absl::string_view scheme() const override { return "dns"; } |
||||
bool IsValidUri(const URI& uri) const override; |
||||
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_EVENT_ENGINE_EVENT_ENGINE_CLIENT_CHANNEL_RESOLVER_H
|
@ -0,0 +1,97 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/resolver/dns/event_engine/service_config_helper.h" |
||||
|
||||
#include <stdlib.h> |
||||
|
||||
#include <algorithm> |
||||
#include <vector> |
||||
|
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/lib/gprpp/status_helper.h" |
||||
#include "src/core/lib/iomgr/gethostname.h" |
||||
#include "src/core/lib/json/json.h" |
||||
#include "src/core/lib/json/json_args.h" |
||||
#include "src/core/lib/json/json_object_loader.h" |
||||
#include "src/core/lib/json/json_reader.h" |
||||
#include "src/core/lib/json/json_writer.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
struct ServiceConfigChoice { |
||||
std::vector<std::string> client_language; |
||||
int percentage = -1; |
||||
std::vector<std::string> client_hostname; |
||||
Json::Object service_config; |
||||
|
||||
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { |
||||
static const auto* loader = |
||||
JsonObjectLoader<ServiceConfigChoice>() |
||||
.OptionalField("clientLanguage", |
||||
&ServiceConfigChoice::client_language) |
||||
.OptionalField("percentage", &ServiceConfigChoice::percentage) |
||||
.OptionalField("clientHostname", |
||||
&ServiceConfigChoice::client_hostname) |
||||
.Field("serviceConfig", &ServiceConfigChoice::service_config) |
||||
.Finish(); |
||||
return loader; |
||||
} |
||||
}; |
||||
|
||||
bool vector_contains(const std::vector<std::string>& v, |
||||
const std::string& value) { |
||||
return std::find(v.begin(), v.end(), value) != v.end(); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
absl::StatusOr<std::string> ChooseServiceConfig( |
||||
absl::string_view service_config_json) { |
||||
auto json = JsonParse(service_config_json); |
||||
GRPC_RETURN_IF_ERROR(json.status()); |
||||
auto choices = LoadFromJson<std::vector<ServiceConfigChoice>>(*json); |
||||
GRPC_RETURN_IF_ERROR(choices.status()); |
||||
for (const ServiceConfigChoice& choice : *choices) { |
||||
// Check client language, if specified.
|
||||
if (!choice.client_language.empty() && |
||||
!vector_contains(choice.client_language, "c++")) { |
||||
continue; |
||||
} |
||||
// Check client hostname, if specified.
|
||||
if (!choice.client_hostname.empty()) { |
||||
const char* hostname = grpc_gethostname(); |
||||
if (!vector_contains(choice.client_hostname, hostname)) { |
||||
continue; |
||||
} |
||||
} |
||||
// Check percentage, if specified.
|
||||
if (choice.percentage != -1) { |
||||
int random_pct = rand() % 100; |
||||
if (random_pct > choice.percentage || choice.percentage == 0) { |
||||
continue; |
||||
} |
||||
} |
||||
return JsonDump(choice.service_config); |
||||
} |
||||
// No matching service config was found
|
||||
return ""; |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,32 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_EVENT_ENGINE_SERVICE_CONFIG_HELPER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_EVENT_ENGINE_SERVICE_CONFIG_HELPER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <string> |
||||
|
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
absl::StatusOr<std::string> ChooseServiceConfig( |
||||
absl::string_view service_config_json); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_EVENT_ENGINE_SERVICE_CONFIG_HELPER_H
|
@ -0,0 +1,24 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_NATIVE_DNS_RESOLVER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_NATIVE_DNS_RESOLVER_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/config/core_configuration.h" |
||||
|
||||
namespace grpc_core { |
||||
void RegisterNativeDnsResolver(CoreConfiguration::Builder* builder); |
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_NATIVE_DNS_RESOLVER_H
|
Loading…
Reference in new issue