From d35162876948427b8ffdaece624a4819da668da5 Mon Sep 17 00:00:00 2001 From: Yijie Ma Date: Wed, 14 Aug 2024 09:56:17 -0700 Subject: [PATCH] [EventEngine] Migrate httpcli to use EventEngine DNSResolver (#37442) Closes #37442 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37442 from yijiem:migrate-iomgr-getdnsresolver 7b3ed7d9803a6a8b5fe5e1899c6bdfbc9bdcd02e PiperOrigin-RevId: 662957279 --- BUILD | 4 +-- src/core/util/http_client/httpcli.cc | 44 ++++++++++++++++------------ src/core/util/http_client/httpcli.h | 19 +++++++----- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/BUILD b/BUILD index a7517c25e23..eb4bc77f081 100644 --- a/BUILD +++ b/BUILD @@ -3938,6 +3938,7 @@ grpc_cc_library( deps = [ "config", "debug_location", + "event_engine_base_hdrs", "exec_ctx", "gpr", "grpc_base", @@ -3949,17 +3950,16 @@ grpc_cc_library( "orphanable", "ref_counted_ptr", "resource_quota_api", - "sockaddr_utils", "uri_parser", "//src/core:channel_args", "//src/core:channel_args_preconditioning", "//src/core:closure", "//src/core:error", "//src/core:error_utils", + "//src/core:event_engine_tcp_socket_utils", "//src/core:handshaker_registry", "//src/core:iomgr_fwd", "//src/core:pollset_set", - "//src/core:resolved_address", "//src/core:resource_quota", "//src/core:slice", "//src/core:slice_refcount", diff --git a/src/core/util/http_client/httpcli.cc b/src/core/util/http_client/httpcli.cc index 12f13347470..d60b1cd68b1 100644 --- a/src/core/util/http_client/httpcli.cc +++ b/src/core/util/http_client/httpcli.cc @@ -38,15 +38,14 @@ #include "src/core/handshaker/handshaker.h" #include "src/core/handshaker/handshaker_registry.h" #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h" -#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/resource_quota/api.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/security_connector/security_connector.h" @@ -59,6 +58,9 @@ namespace grpc_core { namespace { +using grpc_event_engine::experimental::EventEngine; +using grpc_event_engine::experimental::ResolvedAddressToURI; + grpc_httpcli_get_override g_get_override; grpc_httpcli_post_override g_post_override; grpc_httpcli_put_override g_put_override; @@ -173,7 +175,10 @@ HttpRequest::HttpRequest( pollent_(pollent), pollset_set_(grpc_pollset_set_create()), test_only_generate_response_(std::move(test_only_generate_response)), - resolver_(GetDNSResolver()) { + resolver_( + ChannelArgs::FromC(channel_args_) + .GetObjectRef() + ->GetDNSResolver(EventEngine::DNSResolver::ResolverOptions())) { grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response); grpc_slice_buffer_init(&incoming_); grpc_slice_buffer_init(&outgoing_); @@ -207,11 +212,14 @@ void HttpRequest::Start() { test_only_generate_response_.value()(); return; } + if (!resolver_.ok()) { + Finish(resolver_.status()); + return; + } Ref().release(); // ref held by pending DNS resolution - dns_request_handle_ = resolver_->LookupHostname( - absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(), - uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_, - /*name_server=*/""); + (*resolver_) + ->LookupHostname(absl::bind_front(&HttpRequest::OnResolved, this), + uri_.authority(), uri_.scheme()); } void HttpRequest::Orphan() { @@ -220,10 +228,8 @@ void HttpRequest::Orphan() { CHECK(!cancelled_); cancelled_ = true; // cancel potentially pending DNS resolution. - if (dns_request_handle_.has_value() && - resolver_->Cancel(dns_request_handle_.value())) { - Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution")); - Unref(); + if (*resolver_ != nullptr) { + resolver_->reset(); } if (handshake_mgr_ != nullptr) { // Shutdown will cancel any ongoing tcp connect. @@ -239,8 +245,7 @@ void HttpRequest::AppendError(grpc_error_handle error) { if (overall_error_.ok()) { overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); } - const grpc_resolved_address* addr = &addresses_[next_address_ - 1]; - auto addr_text = grpc_sockaddr_to_uri(addr); + auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]); if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error)); overall_error_ = grpc_error_add_child(overall_error_, std::move(error)); } @@ -310,7 +315,7 @@ void HttpRequest::OnHandshakeDone(absl::StatusOr result) { StartWrite(); } -void HttpRequest::DoHandshake(const grpc_resolved_address* addr) { +void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) { // Create the security connector using the credentials and target name. ChannelArgs args = ChannelArgs::FromC(channel_args_); RefCountedPtr sc = @@ -321,7 +326,7 @@ void HttpRequest::DoHandshake(const grpc_resolved_address* addr) { &overall_error_, 1)); return; } - absl::StatusOr address = grpc_sockaddr_to_uri(addr); + absl::StatusOr address = ResolvedAddressToURI(addr); if (!address.ok()) { Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address", &overall_error_, 1)); @@ -354,15 +359,16 @@ void HttpRequest::NextAddress(grpc_error_handle error) { &overall_error_, 1)); return; } - const grpc_resolved_address* addr = &addresses_[next_address_++]; - DoHandshake(addr); + DoHandshake(addresses_[next_address_++]); } void HttpRequest::OnResolved( - absl::StatusOr> addresses_or) { + absl::StatusOr> addresses_or) { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; RefCountedPtr unreffer(this); MutexLock lock(&mu_); - dns_request_handle_.reset(); + resolver_->reset(); if (cancelled_) { Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution")); return; diff --git a/src/core/util/http_client/httpcli.h b/src/core/util/http_client/httpcli.h index 7101ebf7ed8..94822e40fa6 100644 --- a/src/core/util/http_client/httpcli.h +++ b/src/core/util/http_client/httpcli.h @@ -32,6 +32,7 @@ #include "absl/status/statusor.h" #include "absl/types/optional.h" +#include #include #include @@ -48,8 +49,6 @@ #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/uri/uri_parser.h" #include "src/core/util/http_client/parser.h" @@ -223,13 +222,16 @@ class HttpRequest : public InternallyRefCounted { void OnHandshakeDone(absl::StatusOr result); - void DoHandshake(const grpc_resolved_address* addr) + void DoHandshake( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); void OnResolved( - absl::StatusOr> addresses_or); + absl::StatusOr> + addresses_or); const URI uri_; const grpc_slice request_text_; @@ -250,16 +252,17 @@ class HttpRequest : public InternallyRefCounted { RefCountedPtr handshake_mgr_ ABSL_GUARDED_BY(mu_); bool cancelled_ ABSL_GUARDED_BY(mu_) = false; grpc_http_parser parser_ ABSL_GUARDED_BY(mu_); - std::vector addresses_ ABSL_GUARDED_BY(mu_); + std::vector + addresses_ ABSL_GUARDED_BY(mu_); size_t next_address_ ABSL_GUARDED_BY(mu_) = 0; int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0; grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_); grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus(); - std::shared_ptr resolver_; - absl::optional dns_request_handle_ - ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle; + absl::StatusOr> + resolver_; }; } // namespace grpc_core