[EventEngine] Migrate httpcli to use EventEngine DNSResolver (#37442)

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #37442

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37442 from yijiem:migrate-iomgr-getdnsresolver 7b3ed7d980
PiperOrigin-RevId: 662957279
pull/37480/head
Yijie Ma 7 months ago committed by Copybara-Service
parent 1c94bf556c
commit d351628769
  1. 4
      BUILD
  2. 44
      src/core/util/http_client/httpcli.cc
  3. 19
      src/core/util/http_client/httpcli.h

@ -3938,6 +3938,7 @@ grpc_cc_library(
deps = [ deps = [
"config", "config",
"debug_location", "debug_location",
"event_engine_base_hdrs",
"exec_ctx", "exec_ctx",
"gpr", "gpr",
"grpc_base", "grpc_base",
@ -3949,17 +3950,16 @@ grpc_cc_library(
"orphanable", "orphanable",
"ref_counted_ptr", "ref_counted_ptr",
"resource_quota_api", "resource_quota_api",
"sockaddr_utils",
"uri_parser", "uri_parser",
"//src/core:channel_args", "//src/core:channel_args",
"//src/core:channel_args_preconditioning", "//src/core:channel_args_preconditioning",
"//src/core:closure", "//src/core:closure",
"//src/core:error", "//src/core:error",
"//src/core:error_utils", "//src/core:error_utils",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:handshaker_registry", "//src/core:handshaker_registry",
"//src/core:iomgr_fwd", "//src/core:iomgr_fwd",
"//src/core:pollset_set", "//src/core:pollset_set",
"//src/core:resolved_address",
"//src/core:resource_quota", "//src/core:resource_quota",
"//src/core:slice", "//src/core:slice",
"//src/core:slice_refcount", "//src/core:slice_refcount",

@ -38,15 +38,14 @@
#include "src/core/handshaker/handshaker.h" #include "src/core/handshaker/handshaker.h"
#include "src/core/handshaker/handshaker_registry.h" #include "src/core/handshaker/handshaker_registry.h"
#include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h" #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/api.h" #include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h" #include "src/core/lib/security/security_connector/security_connector.h"
@ -59,6 +58,9 @@ namespace grpc_core {
namespace { namespace {
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::ResolvedAddressToURI;
grpc_httpcli_get_override g_get_override; grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override; grpc_httpcli_post_override g_post_override;
grpc_httpcli_put_override g_put_override; grpc_httpcli_put_override g_put_override;
@ -173,7 +175,10 @@ HttpRequest::HttpRequest(
pollent_(pollent), pollent_(pollent),
pollset_set_(grpc_pollset_set_create()), pollset_set_(grpc_pollset_set_create()),
test_only_generate_response_(std::move(test_only_generate_response)), test_only_generate_response_(std::move(test_only_generate_response)),
resolver_(GetDNSResolver()) { resolver_(
ChannelArgs::FromC(channel_args_)
.GetObjectRef<EventEngine>()
->GetDNSResolver(EventEngine::DNSResolver::ResolverOptions())) {
grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response); grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
grpc_slice_buffer_init(&incoming_); grpc_slice_buffer_init(&incoming_);
grpc_slice_buffer_init(&outgoing_); grpc_slice_buffer_init(&outgoing_);
@ -207,11 +212,14 @@ void HttpRequest::Start() {
test_only_generate_response_.value()(); test_only_generate_response_.value()();
return; return;
} }
if (!resolver_.ok()) {
Finish(resolver_.status());
return;
}
Ref().release(); // ref held by pending DNS resolution Ref().release(); // ref held by pending DNS resolution
dns_request_handle_ = resolver_->LookupHostname( (*resolver_)
absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(), ->LookupHostname(absl::bind_front(&HttpRequest::OnResolved, this),
uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_, uri_.authority(), uri_.scheme());
/*name_server=*/"");
} }
void HttpRequest::Orphan() { void HttpRequest::Orphan() {
@ -220,10 +228,8 @@ void HttpRequest::Orphan() {
CHECK(!cancelled_); CHECK(!cancelled_);
cancelled_ = true; cancelled_ = true;
// cancel potentially pending DNS resolution. // cancel potentially pending DNS resolution.
if (dns_request_handle_.has_value() && if (*resolver_ != nullptr) {
resolver_->Cancel(dns_request_handle_.value())) { resolver_->reset();
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
Unref();
} }
if (handshake_mgr_ != nullptr) { if (handshake_mgr_ != nullptr) {
// Shutdown will cancel any ongoing tcp connect. // Shutdown will cancel any ongoing tcp connect.
@ -239,8 +245,7 @@ void HttpRequest::AppendError(grpc_error_handle error) {
if (overall_error_.ok()) { if (overall_error_.ok()) {
overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
} }
const grpc_resolved_address* addr = &addresses_[next_address_ - 1]; auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]);
auto addr_text = grpc_sockaddr_to_uri(addr);
if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error)); if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error));
overall_error_ = grpc_error_add_child(overall_error_, std::move(error)); overall_error_ = grpc_error_add_child(overall_error_, std::move(error));
} }
@ -310,7 +315,7 @@ void HttpRequest::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
StartWrite(); StartWrite();
} }
void HttpRequest::DoHandshake(const grpc_resolved_address* addr) { void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
// Create the security connector using the credentials and target name. // Create the security connector using the credentials and target name.
ChannelArgs args = ChannelArgs::FromC(channel_args_); ChannelArgs args = ChannelArgs::FromC(channel_args_);
RefCountedPtr<grpc_channel_security_connector> sc = RefCountedPtr<grpc_channel_security_connector> sc =
@ -321,7 +326,7 @@ void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
&overall_error_, 1)); &overall_error_, 1));
return; return;
} }
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr); absl::StatusOr<std::string> address = ResolvedAddressToURI(addr);
if (!address.ok()) { if (!address.ok()) {
Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address", Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
&overall_error_, 1)); &overall_error_, 1));
@ -354,15 +359,16 @@ void HttpRequest::NextAddress(grpc_error_handle error) {
&overall_error_, 1)); &overall_error_, 1));
return; return;
} }
const grpc_resolved_address* addr = &addresses_[next_address_++]; DoHandshake(addresses_[next_address_++]);
DoHandshake(addr);
} }
void HttpRequest::OnResolved( void HttpRequest::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) { absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or) {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
RefCountedPtr<HttpRequest> unreffer(this); RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_); MutexLock lock(&mu_);
dns_request_handle_.reset(); resolver_->reset();
if (cancelled_) { if (cancelled_) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution")); Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
return; return;

@ -32,6 +32,7 @@
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/slice.h> #include <grpc/slice.h>
@ -48,8 +49,6 @@
#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
#include "src/core/util/http_client/parser.h" #include "src/core/util/http_client/parser.h"
@ -223,13 +222,16 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result); void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result);
void DoHandshake(const grpc_resolved_address* addr) void DoHandshake(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void OnResolved( void OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or); absl::StatusOr<std::vector<
grpc_event_engine::experimental::EventEngine::ResolvedAddress>>
addresses_or);
const URI uri_; const URI uri_;
const grpc_slice request_text_; const grpc_slice request_text_;
@ -250,16 +252,17 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
RefCountedPtr<HandshakeManager> handshake_mgr_ ABSL_GUARDED_BY(mu_); RefCountedPtr<HandshakeManager> handshake_mgr_ ABSL_GUARDED_BY(mu_);
bool cancelled_ ABSL_GUARDED_BY(mu_) = false; bool cancelled_ ABSL_GUARDED_BY(mu_) = false;
grpc_http_parser parser_ ABSL_GUARDED_BY(mu_); grpc_http_parser parser_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_resolved_address> addresses_ ABSL_GUARDED_BY(mu_); std::vector<grpc_event_engine::experimental::EventEngine::ResolvedAddress>
addresses_ ABSL_GUARDED_BY(mu_);
size_t next_address_ ABSL_GUARDED_BY(mu_) = 0; size_t next_address_ ABSL_GUARDED_BY(mu_) = 0;
int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0; int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0;
grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_); grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_);
grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus(); grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus();
std::shared_ptr<DNSResolver> resolver_; absl::StatusOr<std::unique_ptr<
absl::optional<DNSResolver::TaskHandle> dns_request_handle_ grpc_event_engine::experimental::EventEngine::DNSResolver>>
ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle; resolver_;
}; };
} // namespace grpc_core } // namespace grpc_core

Loading…
Cancel
Save