Merge Master

pull/37461/head
tanvi-jagtap 7 months ago
commit bf542fc40c
  1. 4
      BUILD
  2. 9
      src/core/handshaker/security/secure_endpoint.cc
  3. 6
      src/core/lib/debug/trace_impl.h
  4. 2
      src/core/lib/experiments/experiments.yaml
  5. 85
      src/core/lib/iomgr/ev_posix.cc
  6. 7
      src/core/lib/surface/call.cc
  7. 19
      src/core/lib/surface/call.h
  8. 105
      src/core/resolver/dns/c_ares/dns_resolver_ares.cc
  9. 196
      src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  10. 199
      src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc
  11. 7
      src/core/resolver/dns/c_ares/grpc_ares_wrapper.h
  12. 44
      src/core/util/http_client/httpcli.cc
  13. 19
      src/core/util/http_client/httpcli.h
  14. 1
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  15. 6
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  16. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  17. 2
      src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi
  18. 33
      src/python/grpcio/grpc/_observability.py
  19. 9
      src/python/grpcio_observability/grpc_observability/_cyobservability.pyx
  20. 5
      src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py
  21. 5
      src/python/grpcio_observability/grpc_observability/observability_util.cc

@ -3938,6 +3938,7 @@ grpc_cc_library(
deps = [ deps = [
"config", "config",
"debug_location", "debug_location",
"event_engine_base_hdrs",
"exec_ctx", "exec_ctx",
"gpr", "gpr",
"grpc_base", "grpc_base",
@ -3949,17 +3950,16 @@ grpc_cc_library(
"orphanable", "orphanable",
"ref_counted_ptr", "ref_counted_ptr",
"resource_quota_api", "resource_quota_api",
"sockaddr_utils",
"uri_parser", "uri_parser",
"//src/core:channel_args", "//src/core:channel_args",
"//src/core:channel_args_preconditioning", "//src/core:channel_args_preconditioning",
"//src/core:closure", "//src/core:closure",
"//src/core:error", "//src/core:error",
"//src/core:error_utils", "//src/core:error_utils",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:handshaker_registry", "//src/core:handshaker_registry",
"//src/core:iomgr_fwd", "//src/core:iomgr_fwd",
"//src/core:pollset_set", "//src/core:pollset_set",
"//src/core:resolved_address",
"//src/core:resource_quota", "//src/core:resource_quota",
"//src/core:slice", "//src/core:slice",
"//src/core:slice_refcount", "//src/core:slice_refcount",

@ -252,6 +252,13 @@ static void on_read(void* user_data, grpc_error_handle error) {
{ {
grpc_core::MutexLock l(&ep->read_mu); grpc_core::MutexLock l(&ep->read_mu);
// If we were shut down after this callback was scheduled with OK
// status but before it was invoked, we need to treat that as an error.
if (ep->wrapped_ep == nullptr && error.ok()) {
error = absl::CancelledError("secure endpoint shutdown");
}
uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer); uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
@ -505,8 +512,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
static void endpoint_destroy(grpc_endpoint* secure_ep) { static void endpoint_destroy(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep); secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->read_mu.Lock();
ep->wrapped_ep.reset(); ep->wrapped_ep.reset();
ep->memory_owner.Reset(); ep->memory_owner.Reset();
ep->read_mu.Unlock();
SECURE_ENDPOINT_UNREF(ep, "destroy"); SECURE_ENDPOINT_UNREF(ep, "destroy");
} }

@ -81,10 +81,16 @@ class TraceFlag {
}; };
#define GRPC_TRACE_FLAG_ENABLED_OBJ(obj) GPR_UNLIKELY((obj).enabled()) #define GRPC_TRACE_FLAG_ENABLED_OBJ(obj) GPR_UNLIKELY((obj).enabled())
#define GRPC_TRACE_FLAG_ENABLED(tracer) \ #define GRPC_TRACE_FLAG_ENABLED(tracer) \
GPR_UNLIKELY((grpc_core::tracer##_trace).enabled()) GPR_UNLIKELY((grpc_core::tracer##_trace).enabled())
#define GRPC_TRACE_LOG(tracer, level) \ #define GRPC_TRACE_LOG(tracer, level) \
LOG_IF(level, GRPC_TRACE_FLAG_ENABLED(tracer)) LOG_IF(level, GRPC_TRACE_FLAG_ENABLED(tracer))
#define GRPC_TRACE_DLOG(tracer, level) \
DLOG_IF(level, GRPC_TRACE_FLAG_ENABLED(tracer))
#define GRPC_TRACE_VLOG(tracer, level) \ #define GRPC_TRACE_VLOG(tracer, level) \
if (GRPC_TRACE_FLAG_ENABLED(tracer)) VLOG(level) if (GRPC_TRACE_FLAG_ENABLED(tracer)) VLOG(level)

@ -105,7 +105,7 @@
test_tags: [flow_control_test] test_tags: [flow_control_test]
- name: pick_first_new - name: pick_first_new
description: New pick_first impl with memory reduction. description: New pick_first impl with memory reduction.
expiry: 2024/07/30 expiry: 2024/10/30
owner: roth@google.com owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"] test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]
- name: promise_based_inproc_transport - name: promise_based_inproc_transport

@ -42,18 +42,6 @@
#include "src/core/lib/iomgr/internal_errqueue.h" #include "src/core/lib/iomgr/internal_errqueue.h"
#include "src/core/util/useful.h" #include "src/core/util/useful.h"
// Traces fd create/close operations
// Polling API trace only enabled in debug builds
#ifndef NDEBUG
#define GRPC_POLLING_API_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(polling_api)) { \
LOG(INFO) << "(polling-api) " << absl::StrFormat(format, __VA_ARGS__); \
}
#else
#define GRPC_POLLING_API_TRACE(...)
#endif // NDEBUG
/// Default poll() function - a pointer so that it can be overridden by some /// Default poll() function - a pointer so that it can be overridden by some
/// tests /// tests
#ifndef GPR_AIX #ifndef GPR_AIX
@ -165,7 +153,8 @@ bool grpc_event_engine_run_in_background(void) {
} }
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_TRACE_DLOG(polling_api, INFO) << "(polling-api) fd_create(" << fd << ", "
<< name << ", " << track_err << ")";
GRPC_TRACE_LOG(fd_trace, INFO) << "(fd-trace) fd_create(" << fd << ", " GRPC_TRACE_LOG(fd_trace, INFO) << "(fd-trace) fd_create(" << fd << ", "
<< name << ", " << track_err << ")"; << name << ", " << track_err << ")";
return g_event_engine->fd_create( return g_event_engine->fd_create(
@ -178,8 +167,9 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) {
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
const char* reason) { const char* reason) {
GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd), GRPC_TRACE_DLOG(polling_api, INFO)
on_done, release_fd, reason); << "(polling-api) fd_orphan(" << grpc_fd_wrapped_fd(fd) << ", " << on_done
<< ", " << release_fd << ", " << reason << ")";
GRPC_TRACE_LOG(fd_trace, INFO) GRPC_TRACE_LOG(fd_trace, INFO)
<< "(fd-trace) grpc_fd_orphan, fd:" << grpc_fd_wrapped_fd(fd) << "(fd-trace) grpc_fd_orphan, fd:" << grpc_fd_wrapped_fd(fd)
<< " closed"; << " closed";
@ -188,14 +178,16 @@ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
} }
void grpc_fd_set_pre_allocated(grpc_fd* fd) { void grpc_fd_set_pre_allocated(grpc_fd* fd) {
GRPC_POLLING_API_TRACE("fd_set_pre_allocated(%d)", grpc_fd_wrapped_fd(fd)); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) fd_set_pre_allocated(" << grpc_fd_wrapped_fd(fd) << ")";
GRPC_TRACE_LOG(fd_trace, INFO) GRPC_TRACE_LOG(fd_trace, INFO)
<< "(fd-trace) fd_set_pre_allocated(" << grpc_fd_wrapped_fd(fd) << ")"; << "(fd-trace) fd_set_pre_allocated(" << grpc_fd_wrapped_fd(fd) << ")";
g_event_engine->fd_set_pre_allocated(fd); g_event_engine->fd_set_pre_allocated(fd);
} }
void grpc_fd_shutdown(grpc_fd* fd, grpc_error_handle why) { void grpc_fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
GRPC_POLLING_API_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd)); GRPC_TRACE_LOG(polling_api, INFO)
<< "(polling-api) fd_shutdown(" << grpc_fd_wrapped_fd(fd) << ")";
GRPC_TRACE_LOG(fd_trace, INFO) GRPC_TRACE_LOG(fd_trace, INFO)
<< "(fd-trace) fd_shutdown(" << grpc_fd_wrapped_fd(fd) << ")"; << "(fd-trace) fd_shutdown(" << grpc_fd_wrapped_fd(fd) << ")";
g_event_engine->fd_shutdown(fd, why); g_event_engine->fd_shutdown(fd, why);
@ -226,41 +218,48 @@ void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
static size_t pollset_size(void) { return g_event_engine->pollset_size; } static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
GRPC_POLLING_API_TRACE("pollset_init(%p)", pollset); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_init(" << pollset << ")";
g_event_engine->pollset_init(pollset, mu); g_event_engine->pollset_init(pollset, mu);
} }
static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
GRPC_POLLING_API_TRACE("pollset_shutdown(%p)", pollset); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_shutdown(" << pollset << ")";
g_event_engine->pollset_shutdown(pollset, closure); g_event_engine->pollset_shutdown(pollset, closure);
} }
static void pollset_destroy(grpc_pollset* pollset) { static void pollset_destroy(grpc_pollset* pollset) {
GRPC_POLLING_API_TRACE("pollset_destroy(%p)", pollset); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_destroy(" << pollset << ")";
g_event_engine->pollset_destroy(pollset); g_event_engine->pollset_destroy(pollset);
} }
static grpc_error_handle pollset_work(grpc_pollset* pollset, static grpc_error_handle pollset_work(grpc_pollset* pollset,
grpc_pollset_worker** worker, grpc_pollset_worker** worker,
grpc_core::Timestamp deadline) { grpc_core::Timestamp deadline) {
GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") begin", pollset, GRPC_TRACE_DLOG(polling_api, INFO)
deadline.milliseconds_after_process_epoch()); << "(polling-api) pollset_work(" << pollset << ", "
<< deadline.milliseconds_after_process_epoch() << ") begin";
grpc_error_handle err = grpc_error_handle err =
g_event_engine->pollset_work(pollset, worker, deadline); g_event_engine->pollset_work(pollset, worker, deadline);
GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") end", pollset, GRPC_TRACE_DLOG(polling_api, INFO)
deadline.milliseconds_after_process_epoch()); << "(polling-api) pollset_work(" << pollset << ", "
<< deadline.milliseconds_after_process_epoch() << ") end";
return err; return err;
} }
static grpc_error_handle pollset_kick(grpc_pollset* pollset, static grpc_error_handle pollset_kick(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker) { grpc_pollset_worker* specific_worker) {
GRPC_POLLING_API_TRACE("pollset_kick(%p, %p)", pollset, specific_worker); GRPC_TRACE_DLOG(polling_api, INFO) << "(polling-api) pollset_kick(" << pollset
<< ", " << specific_worker << ")";
return g_event_engine->pollset_kick(pollset, specific_worker); return g_event_engine->pollset_kick(pollset, specific_worker);
} }
void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd) { void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd) {
GRPC_POLLING_API_TRACE("pollset_add_fd(%p, %d)", pollset, GRPC_TRACE_DLOG(polling_api, INFO)
grpc_fd_wrapped_fd(fd)); << "(polling-api) pollset_add_fd(" << pollset << ", "
<< grpc_fd_wrapped_fd(fd) << ")";
g_event_engine->pollset_add_fd(pollset, fd); g_event_engine->pollset_add_fd(pollset, fd);
} }
@ -275,38 +274,44 @@ grpc_pollset_vtable grpc_posix_pollset_vtable = {
static grpc_pollset_set* pollset_set_create(void) { static grpc_pollset_set* pollset_set_create(void) {
grpc_pollset_set* pss = g_event_engine->pollset_set_create(); grpc_pollset_set* pss = g_event_engine->pollset_set_create();
GRPC_POLLING_API_TRACE("pollset_set_create(%p)", pss); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_set_create(" << pss << ")";
return pss; return pss;
} }
static void pollset_set_destroy(grpc_pollset_set* pollset_set) { static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
GRPC_POLLING_API_TRACE("pollset_set_destroy(%p)", pollset_set); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_set_destroy(" << pollset_set << ")";
g_event_engine->pollset_set_destroy(pollset_set); g_event_engine->pollset_set_destroy(pollset_set);
} }
static void pollset_set_add_pollset(grpc_pollset_set* pollset_set, static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
grpc_pollset* pollset) { grpc_pollset* pollset) {
GRPC_POLLING_API_TRACE("pollset_set_add_pollset(%p, %p)", pollset_set, GRPC_TRACE_DLOG(polling_api, INFO) << "(polling-api) pollset_set_add_pollset("
pollset); << pollset_set << ", " << pollset << ")";
g_event_engine->pollset_set_add_pollset(pollset_set, pollset); g_event_engine->pollset_set_add_pollset(pollset_set, pollset);
} }
static void pollset_set_del_pollset(grpc_pollset_set* pollset_set, static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
grpc_pollset* pollset) { grpc_pollset* pollset) {
GRPC_POLLING_API_TRACE("pollset_set_del_pollset(%p, %p)", pollset_set, GRPC_TRACE_DLOG(polling_api, INFO) << "(polling-api) pollset_set_del_pollset("
pollset); << pollset_set << ", " << pollset << ")";
g_event_engine->pollset_set_del_pollset(pollset_set, pollset); g_event_engine->pollset_set_del_pollset(pollset_set, pollset);
} }
static void pollset_set_add_pollset_set(grpc_pollset_set* bag, static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
grpc_pollset_set* item) { grpc_pollset_set* item) {
GRPC_POLLING_API_TRACE("pollset_set_add_pollset_set(%p, %p)", bag, item); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_set_add_pollset_set(" << bag << ", " << item
<< ")";
g_event_engine->pollset_set_add_pollset_set(bag, item); g_event_engine->pollset_set_add_pollset_set(bag, item);
} }
static void pollset_set_del_pollset_set(grpc_pollset_set* bag, static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
grpc_pollset_set* item) { grpc_pollset_set* item) {
GRPC_POLLING_API_TRACE("pollset_set_del_pollset_set(%p, %p)", bag, item); GRPC_TRACE_DLOG(polling_api, INFO)
<< "(polling-api) pollset_set_del_pollset_set(" << bag << ", " << item
<< ")";
g_event_engine->pollset_set_del_pollset_set(bag, item); g_event_engine->pollset_set_del_pollset_set(bag, item);
} }
@ -316,14 +321,16 @@ grpc_pollset_set_vtable grpc_posix_pollset_set_vtable = {
pollset_set_add_pollset_set, pollset_set_del_pollset_set}; pollset_set_add_pollset_set, pollset_set_del_pollset_set};
void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
GRPC_POLLING_API_TRACE("pollset_set_add_fd(%p, %d)", pollset_set, GRPC_TRACE_DLOG(polling_api, INFO)
grpc_fd_wrapped_fd(fd)); << "(polling-api) pollset_set_add_fd(" << pollset_set << ", "
<< grpc_fd_wrapped_fd(fd) << ")";
g_event_engine->pollset_set_add_fd(pollset_set, fd); g_event_engine->pollset_set_add_fd(pollset_set, fd);
} }
void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
GRPC_POLLING_API_TRACE("pollset_set_del_fd(%p, %d)", pollset_set, GRPC_TRACE_DLOG(polling_api, INFO)
grpc_fd_wrapped_fd(fd)); << "(polling-api) pollset_set_del_fd(" << pollset_set << ", "
<< grpc_fd_wrapped_fd(fd) << ")";
g_event_engine->pollset_set_del_fd(pollset_set, fd); g_event_engine->pollset_set_del_fd(pollset_set, fd);
} }

@ -497,6 +497,13 @@ void grpc_call_tracer_set(grpc_call* call,
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer); return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
} }
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
arena->ManagedNew<ClientCallTracerWrapper>(tracer);
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}
void* grpc_call_tracer_get(grpc_call* call) { void* grpc_call_tracer_get(grpc_call* call) {
grpc_core::Arena* arena = grpc_call_get_arena(call); grpc_core::Arena* arena = grpc_call_get_arena(call);
auto* call_tracer = auto* call_tracer =

@ -265,6 +265,16 @@ void grpc_call_log_batch(const char* file, int line, const grpc_op* ops,
void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer); void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer);
// Sets call tracer on the call and manages its life by using the call's arena.
// When using this API, the tracer will be destroyed by grpc_call arena when
// grpc_call is about to be destroyed. The caller of this API SHOULD NOT
// manually destroy the tracer. This API is used by Python as a way of using
// Arena to manage the lifetime of the call tracer. Python needs this API
// because the tracer was created within a separate shared object library which
// doesn't have access to core functions like arena->ManagedNew<>.
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer);
void* grpc_call_tracer_get(grpc_call* call); void* grpc_call_tracer_get(grpc_call* call);
#define GRPC_CALL_LOG_BATCH(ops, nops) \ #define GRPC_CALL_LOG_BATCH(ops, nops) \
@ -276,6 +286,15 @@ void* grpc_call_tracer_get(grpc_call* call);
uint8_t grpc_call_is_client(grpc_call* call); uint8_t grpc_call_is_client(grpc_call* call);
class ClientCallTracerWrapper {
public:
explicit ClientCallTracerWrapper(grpc_core::ClientCallTracer* tracer)
: tracer_(tracer) {}
private:
std::unique_ptr<grpc_core::ClientCallTracer> tracer_;
};
// Return an appropriate compression algorithm for the requested compression \a // Return an appropriate compression algorithm for the requested compression \a
// level in the context of \a call. // level in the context of \a call.
grpc_compression_algorithm grpc_call_compression_for_level( grpc_compression_algorithm grpc_call_compression_for_level(

@ -106,9 +106,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(), resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(),
kDefaultSecurePort, resolver_->interested_parties(), kDefaultSecurePort, resolver_->interested_parties(),
&on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_)); &on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"resolver:%p Started resolving hostnames. hostname_request_:%p", << "(c-ares resolver) resolver:" << resolver_.get()
resolver_.get(), hostname_request_.get()); << " Started resolving hostnames. hostname_request_:"
<< hostname_request_.get();
if (resolver_->enable_srv_queries_) { if (resolver_->enable_srv_queries_) {
Ref(DEBUG_LOCATION, "OnSRVResolved").release(); Ref(DEBUG_LOCATION, "OnSRVResolved").release();
GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr); GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr);
@ -117,9 +118,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->name_to_resolve().c_str(), resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_srv_resolved_, resolver_->interested_parties(), &on_srv_resolved_,
&balancer_addresses_, resolver_->query_timeout_ms_)); &balancer_addresses_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"resolver:%p Started resolving SRV records. srv_request_:%p", << "(c-ares resolver) resolver:" << resolver_.get()
resolver_.get(), srv_request_.get()); << " Started resolving SRV records. srv_request_:"
<< srv_request_.get();
} }
if (resolver_->request_service_config_) { if (resolver_->request_service_config_) {
Ref(DEBUG_LOCATION, "OnTXTResolved").release(); Ref(DEBUG_LOCATION, "OnTXTResolved").release();
@ -129,9 +131,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->name_to_resolve().c_str(), resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_txt_resolved_, resolver_->interested_parties(), &on_txt_resolved_,
&service_config_json_, resolver_->query_timeout_ms_)); &service_config_json_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"resolver:%p Started resolving TXT records. txt_request_:%p", << "(c-ares resolver) resolver:" << resolver_.get()
resolver_.get(), txt_request_.get()); << " Started resolving TXT records. txt_request_:"
<< txt_request_.get();
} }
} }
@ -219,8 +222,9 @@ AresClientChannelDNSResolver::AresClientChannelDNSResolver(
.value_or(GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS))) {} .value_or(GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS))) {}
AresClientChannelDNSResolver::~AresClientChannelDNSResolver() { AresClientChannelDNSResolver::~AresClientChannelDNSResolver() {
GRPC_CARES_TRACE_LOG("resolver:%p destroying AresClientChannelDNSResolver", GRPC_TRACE_VLOG(cares_resolver, 2)
this); << "(c-ares resolver) resolver:" << this
<< " destroying AresClientChannelDNSResolver";
} }
OrphanablePtr<Orphanable> AresClientChannelDNSResolver::StartRequest() { OrphanablePtr<Orphanable> AresClientChannelDNSResolver::StartRequest() {
@ -283,15 +287,16 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) { grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) {
if (hostname_request_ != nullptr || srv_request_ != nullptr || if (hostname_request_ != nullptr || srv_request_ != nullptr ||
txt_request_ != nullptr) { txt_request_ != nullptr) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"resolver:%p OnResolved() waiting for results (hostname: %s, srv: %s, " << "(c-ares resolver) resolver:" << this
"txt: %s)", << " OnResolved() waiting for results (hostname: "
this, hostname_request_ != nullptr ? "waiting" : "done", << (hostname_request_ != nullptr ? "waiting" : "done")
srv_request_ != nullptr ? "waiting" : "done", << ", srv: " << (srv_request_ != nullptr ? "waiting" : "done")
txt_request_ != nullptr ? "waiting" : "done"); << ", txt: " << (txt_request_ != nullptr ? "waiting" : "done") << ")";
return absl::nullopt; return absl::nullopt;
} }
GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this << " OnResolved() proceeding";
Result result; Result result;
result.args = resolver_->channel_args(); result.args = resolver_->channel_args();
// TODO(roth): Change logic to be able to report failures for addresses // TODO(roth): Change logic to be able to report failures for addresses
@ -309,8 +314,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
absl::StrCat("failed to parse service config: ", absl::StrCat("failed to parse service config: ",
StatusToString(service_config_string.status()))); StatusToString(service_config_string.status())));
} else if (!service_config_string->empty()) { } else if (!service_config_string->empty()) {
GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", GRPC_TRACE_VLOG(cares_resolver, 2)
this, service_config_string->c_str()); << "(c-ares resolver) resolver:" << this
<< " selected service config choice: " << *service_config_string;
result.service_config = ServiceConfigImpl::Create( result.service_config = ServiceConfigImpl::Create(
resolver_->channel_args(), *service_config_string); resolver_->channel_args(), *service_config_string);
if (!result.service_config.ok()) { if (!result.service_config.ok()) {
@ -325,8 +331,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_); SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_);
} }
} else { } else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this, GRPC_TRACE_VLOG(cares_resolver, 2)
StatusToString(error).c_str()); << "(c-ares resolver) resolver:" << this
<< " dns resolution failed: " << StatusToString(error);
std::string error_message; std::string error_message;
grpc_error_get_str(error, StatusStrProperty::kDescription, &error_message); grpc_error_get_str(error, StatusStrProperty::kDescription, &error_message);
absl::Status status = absl::UnavailableError( absl::Status status = absl::UnavailableError(
@ -375,8 +382,9 @@ class AresDNSResolver final : public DNSResolver {
class AresRequest { class AresRequest {
public: public:
virtual ~AresRequest() { virtual ~AresRequest() {
GRPC_CARES_TRACE_LOG("AresRequest:%p dtor ares_request_:%p", this, GRPC_TRACE_VLOG(cares_resolver, 2)
grpc_ares_request_.get()); << "(c-ares resolver) AresRequest:" << this
<< " dtor ares_request_:" << grpc_ares_request_.get();
resolver_->UnregisterRequest(task_handle()); resolver_->UnregisterRequest(task_handle());
grpc_pollset_set_destroy(pollset_set_); grpc_pollset_set_destroy(pollset_set_);
} }
@ -397,8 +405,9 @@ class AresDNSResolver final : public DNSResolver {
bool Cancel() { bool Cancel() {
MutexLock lock(&mu_); MutexLock lock(&mu_);
if (grpc_ares_request_ != nullptr) { if (grpc_ares_request_ != nullptr) {
GRPC_CARES_TRACE_LOG("AresRequest:%p Cancel ares_request_:%p", this, GRPC_TRACE_VLOG(cares_resolver, 2)
grpc_ares_request_.get()); << "(c-ares resolver) AresRequest:" << this
<< " Cancel ares_request_:" << grpc_ares_request_.get();
if (completed_) return false; if (completed_) return false;
// OnDnsLookupDone will still be run // OnDnsLookupDone will still be run
completed_ = true; completed_ = true;
@ -499,7 +508,8 @@ class AresDNSResolver final : public DNSResolver {
aba_token), aba_token),
default_port_(default_port), default_port_(default_port),
on_resolve_address_done_(std::move(on_resolve_address_done)) { on_resolve_address_done_(std::move(on_resolve_address_done)) {
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p ctor", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this << " ctor";
} }
std::unique_ptr<grpc_ares_request> MakeRequestLocked() override { std::unique_ptr<grpc_ares_request> MakeRequestLocked() override {
@ -508,13 +518,15 @@ class AresDNSResolver final : public DNSResolver {
name_server().c_str(), name().c_str(), default_port_.c_str(), name_server().c_str(), name().c_str(), default_port_.c_str(),
pollset_set(), on_dns_lookup_done(), &addresses_, pollset_set(), on_dns_lookup_done(), &addresses_,
timeout().millis())); timeout().millis()));
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p Start ares_request_:%p", GRPC_TRACE_VLOG(cares_resolver, 2)
this, ares_request.get()); << "(c-ares resolver) AresHostnameRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request; return ares_request;
} }
void OnComplete(grpc_error_handle error) override { void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p OnComplete", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this << " OnComplete";
if (!error.ok()) { if (!error.ok()) {
on_resolve_address_done_(grpc_error_to_absl_status(error)); on_resolve_address_done_(grpc_error_to_absl_status(error));
return; return;
@ -550,7 +562,8 @@ class AresDNSResolver final : public DNSResolver {
: AresRequest(name, name_server, timeout, interested_parties, resolver, : AresRequest(name, name_server, timeout, interested_parties, resolver,
aba_token), aba_token),
on_resolve_address_done_(std::move(on_resolve_address_done)) { on_resolve_address_done_(std::move(on_resolve_address_done)) {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p ctor", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " ctor";
} }
std::unique_ptr<grpc_ares_request> MakeRequestLocked() override { std::unique_ptr<grpc_ares_request> MakeRequestLocked() override {
@ -558,13 +571,15 @@ class AresDNSResolver final : public DNSResolver {
std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_srv_ares( std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_srv_ares(
name_server().c_str(), name().c_str(), pollset_set(), name_server().c_str(), name().c_str(), pollset_set(),
on_dns_lookup_done(), &balancer_addresses_, timeout().millis())); on_dns_lookup_done(), &balancer_addresses_, timeout().millis()));
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this, GRPC_TRACE_VLOG(cares_resolver, 2)
ares_request.get()); << "(c-ares resolver) AresSRVRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request; return ares_request;
} }
void OnComplete(grpc_error_handle error) override { void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " OnComplete";
if (!error.ok()) { if (!error.ok()) {
on_resolve_address_done_(grpc_error_to_absl_status(error)); on_resolve_address_done_(grpc_error_to_absl_status(error));
return; return;
@ -596,7 +611,8 @@ class AresDNSResolver final : public DNSResolver {
: AresRequest(name, name_server, timeout, interested_parties, resolver, : AresRequest(name, name_server, timeout, interested_parties, resolver,
aba_token), aba_token),
on_resolved_(std::move(on_resolved)) { on_resolved_(std::move(on_resolved)) {
GRPC_CARES_TRACE_LOG("AresTXTRequest:%p ctor", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresTXTRequest:" << this << " ctor";
} }
~AresTXTRequest() override { gpr_free(service_config_json_); } ~AresTXTRequest() override { gpr_free(service_config_json_); }
@ -606,13 +622,15 @@ class AresDNSResolver final : public DNSResolver {
std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_txt_ares( std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_txt_ares(
name_server().c_str(), name().c_str(), pollset_set(), name_server().c_str(), name().c_str(), pollset_set(),
on_dns_lookup_done(), &service_config_json_, timeout().millis())); on_dns_lookup_done(), &service_config_json_, timeout().millis()));
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this, GRPC_TRACE_VLOG(cares_resolver, 2)
ares_request.get()); << "(c-ares resolver) AresSRVRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request; return ares_request;
} }
void OnComplete(grpc_error_handle error) override { void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " OnComplete";
if (!error.ok()) { if (!error.ok()) {
on_resolved_(grpc_error_to_absl_status(error)); on_resolved_(grpc_error_to_absl_status(error));
return; return;
@ -684,14 +702,15 @@ class AresDNSResolver final : public DNSResolver {
MutexLock lock(&mu_); MutexLock lock(&mu_);
if (!open_requests_.contains(handle)) { if (!open_requests_.contains(handle)) {
// Unknown request, possibly completed already, or an invalid handle. // Unknown request, possibly completed already, or an invalid handle.
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"AresDNSResolver:%p attempt to cancel unknown TaskHandle:%s", this, << "(c-ares resolver) AresDNSResolver:" << this
HandleToString(handle).c_str()); << " attempt to cancel unknown TaskHandle:" << HandleToString(handle);
return false; return false;
} }
auto* request = reinterpret_cast<AresRequest*>(handle.keys[0]); auto* request = reinterpret_cast<AresRequest*>(handle.keys[0]);
GRPC_CARES_TRACE_LOG("AresDNSResolver:%p cancel ares_request:%p", this, GRPC_TRACE_VLOG(cares_resolver, 2)
request); << "(c-ares resolver) AresDNSResolver:" << this
<< " cancel ares_request:" << request;
return request->Cancel(); return request->Cancel();
} }

@ -133,8 +133,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
} }
~GrpcPolledFdWindows() override { ~GrpcPolledFdWindows() override {
GRPC_CARES_TRACE_LOG("fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ", GRPC_TRACE_VLOG(cares_resolver, 2)
GetName(), shutdown_called_); << "(c-ares resolver) fd:|" << GetName()
<< "| ~GrpcPolledFdWindows shutdown_called_: " << shutdown_called_;
CSliceUnref(read_buf_); CSliceUnref(read_buf_);
CSliceUnref(write_buf_); CSliceUnref(write_buf_);
CHECK_EQ(read_closure_, nullptr); CHECK_EQ(read_closure_, nullptr);
@ -173,10 +174,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
} }
void ContinueRegisterForOnReadableLocked() { void ContinueRegisterForOnReadableLocked() {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| ContinueRegisterForOnReadableLocked " << "(c-ares resolver) fd:|" << GetName()
"wsa_connect_error_:%d", << "| ContinueRegisterForOnReadableLocked "
GetName(), wsa_connect_error_); << "wsa_connect_error_:" << wsa_connect_error_;
CHECK(connect_done_); CHECK(connect_done_);
if (wsa_connect_error_ != 0) { if (wsa_connect_error_ != 0) {
ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect")); ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
@ -194,10 +195,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
&winsocket_->read_info.overlapped, nullptr)) { &winsocket_->read_info.overlapped, nullptr)) {
int wsa_last_error = WSAGetLastError(); int wsa_last_error = WSAGetLastError();
char* msg = gpr_format_message(wsa_last_error); char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| " << "(c-ares resolver) fd:|" << GetName()
"msg:|%s|", << "| RegisterForOnReadableLocked WSARecvFrom error code:|"
GetName(), wsa_last_error, msg); << wsa_last_error << "| msg:|" << msg << "|";
gpr_free(msg); gpr_free(msg);
if (wsa_last_error != WSA_IO_PENDING) { if (wsa_last_error != WSA_IO_PENDING) {
ScheduleAndNullReadClosure( ScheduleAndNullReadClosure(
@ -210,14 +211,15 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
if (socket_type_ == SOCK_DGRAM) { if (socket_type_ == SOCK_DGRAM) {
GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called", GRPC_TRACE_VLOG(cares_resolver, 2)
GetName()); << "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called";
} else { } else {
CHECK(socket_type_ == SOCK_STREAM); CHECK(socket_type_ == SOCK_STREAM);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d " << "(c-ares resolver) fd:|" << GetName()
"connect_done_: %d", << "| RegisterForOnWriteableLocked called tcp_write_state_: "
GetName(), tcp_write_state_, connect_done_); << tcp_write_state_ << " connect_done_: " << connect_done_;
} }
CHECK_EQ(write_closure_, nullptr); CHECK_EQ(write_closure_, nullptr);
write_closure_ = write_closure; write_closure_ = write_closure;
@ -234,10 +236,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
} }
void ContinueRegisterForOnWriteableLocked() { void ContinueRegisterForOnWriteableLocked() {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| ContinueRegisterForOnWriteableLocked " << "(c-ares resolver) fd:|" << GetName()
"wsa_connect_error_:%d", << "| ContinueRegisterForOnWriteableLocked "
GetName(), wsa_connect_error_); << "wsa_connect_error_:" << wsa_connect_error_;
CHECK(connect_done_); CHECK(connect_done_);
if (wsa_connect_error_ != 0) { if (wsa_connect_error_ != 0) {
ScheduleAndNullWriteClosure( ScheduleAndNullWriteClosure(
@ -288,10 +290,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int /* flags */, ares_socket_t data_len, int /* flags */,
struct sockaddr* from, ares_socklen_t* from_len) { struct sockaddr* from, ares_socklen_t* from_len) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf " << "(c-ares resolver) fd:" << GetName()
"length:|%d|", << " RecvFrom called read_buf_has_data:" << read_buf_has_data_
GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_)); << " Current read buf length:" << GRPC_SLICE_LENGTH(read_buf_);
if (!read_buf_has_data_) { if (!read_buf_has_data_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1; return -1;
@ -340,20 +342,21 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1, int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
bytes_sent_ptr, flags, overlapped, nullptr); bytes_sent_ptr, flags, overlapped, nullptr);
*wsa_error_code = WSAGetLastError(); *wsa_error_code = WSAGetLastError();
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d " << "(c-ares resolver) fd:" << GetName()
"overlapped:%p " << " SendWriteBuf WSASend buf.len:" << buf.len << " *bytes_sent_ptr:"
"return:%d *wsa_error_code:%d", << (bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0)
GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0, << " overlapped:" << overlapped << " return:" << out
overlapped, out, *wsa_error_code); << " *wsa_error_code:" << *wsa_error_code;
return out; return out;
} }
ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov, ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) { int iov_count) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d", << "(c-ares resolver) fd:" << GetName()
GetName(), connect_done_, wsa_connect_error_); << " SendV called connect_done_:" << connect_done_
<< " wsa_connect_error_:" << wsa_connect_error_;
if (!connect_done_) { if (!connect_done_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1; return -1;
@ -377,7 +380,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// c-ares doesn't handle retryable errors on writes of UDP sockets. // c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt // Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline. // to write everything inline.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName()); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " SendVUDP called";
CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0); CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0);
CSliceUnref(write_buf_); CSliceUnref(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count); write_buf_ = FlattenIovec(iov, iov_count);
@ -388,9 +392,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
write_buf_ = grpc_empty_slice(); write_buf_ = grpc_empty_slice();
wsa_error_ctx->SetWSAError(wsa_error_code); wsa_error_ctx->SetWSAError(wsa_error_code);
char* msg = gpr_format_message(wsa_error_code); char* msg = gpr_format_message(wsa_error_code);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(), << "(c-ares resolver) fd:" << GetName()
wsa_error_code, msg); << " SendVUDP SendWriteBuf error code:" << wsa_error_code
<< " msg:" << msg;
gpr_free(msg); gpr_free(msg);
return -1; return -1;
} }
@ -406,8 +411,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// out in the background, and making further send progress in general, will // out in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on // happen as long as c-ares continues to show interest in writeability on
// this fd. // this fd.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d", GRPC_TRACE_VLOG(cares_resolver, 2)
GetName(), tcp_write_state_); << "(c-ares resolver) fd:" << GetName()
<< " SendVTCP called tcp_write_state_:" << tcp_write_state_;
switch (tcp_write_state_) { switch (tcp_write_state_) {
case WRITE_IDLE: case WRITE_IDLE:
tcp_write_state_ = WRITE_REQUESTED; tcp_write_state_ = WRITE_REQUESTED;
@ -450,13 +456,13 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
} }
void OnTcpConnectLocked(grpc_error_handle error) { void OnTcpConnectLocked(grpc_error_handle error) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:%s InnerOnTcpConnectLocked error:|%s| " << "(c-ares resolver) fd:" << GetName()
"pending_register_for_readable:%d" << " InnerOnTcpConnectLocked error:" << StatusToString(error)
" pending_register_for_writeable:%d", << " pending_register_for_readable:"
GetName(), StatusToString(error).c_str(), << pending_continue_register_for_on_readable_locked_
pending_continue_register_for_on_readable_locked_, << " pending_register_for_writeable:"
pending_continue_register_for_on_writeable_locked_); << pending_continue_register_for_on_writeable_locked_;
CHECK(!connect_done_); CHECK(!connect_done_);
connect_done_ = true; connect_done_ = true;
CHECK_EQ(wsa_connect_error_, 0); CHECK_EQ(wsa_connect_error_, 0);
@ -473,10 +479,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if (!wsa_success) { if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError(); wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_); char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d " << "(c-ares resolver) fd:" << GetName()
"msg:|%s|", << " InnerOnTcpConnectLocked WSA overlapped result code:"
GetName(), wsa_connect_error_, msg); << wsa_connect_error_ << " msg:" << msg;
gpr_free(msg); gpr_free(msg);
} }
} }
@ -502,7 +508,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) { ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName()); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " ConnectUDP";
CHECK(!connect_done_); CHECK(!connect_done_);
CHECK_EQ(wsa_connect_error_, 0); CHECK_EQ(wsa_connect_error_, 0);
SOCKET s = grpc_winsocket_wrapped_socket(winsocket_); SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
@ -512,8 +519,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
wsa_error_ctx->SetWSAError(wsa_connect_error_); wsa_error_ctx->SetWSAError(wsa_connect_error_);
connect_done_ = true; connect_done_ = true;
char* msg = gpr_format_message(wsa_connect_error_); char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(), GRPC_TRACE_VLOG(cares_resolver, 2)
wsa_connect_error_, msg); << "(c-ares resolver) fd:" << GetName() << " WSAConnect error code:|"
<< wsa_connect_error_ << "| msg:|" << msg << "|";
gpr_free(msg); gpr_free(msg);
// c-ares expects a posix-style connect API // c-ares expects a posix-style connect API
return out == 0 ? 0 : -1; return out == 0 ? 0 : -1;
@ -521,7 +529,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) { ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName()); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " ConnectTCP";
LPFN_CONNECTEX ConnectEx; LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX; GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes; DWORD ioctl_num_bytes;
@ -532,10 +541,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError(); int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error); wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error); char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d " << "(c-ares resolver) fd:" << GetName()
"msg:|%s|", << " WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:"
GetName(), wsa_last_error, msg); << wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg); gpr_free(msg);
connect_done_ = true; connect_done_ = true;
wsa_connect_error_ = wsa_last_error; wsa_connect_error_ = wsa_last_error;
@ -555,8 +564,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError(); int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error); wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error); char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(), GRPC_TRACE_VLOG(cares_resolver, 2)
wsa_last_error, msg); << "(c-ares resolver) fd:" << GetName()
<< " bind error code:" << wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg); gpr_free(msg);
connect_done_ = true; connect_done_ = true;
wsa_connect_error_ = wsa_last_error; wsa_connect_error_ = wsa_last_error;
@ -569,8 +579,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError(); int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error); wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error); char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(), GRPC_TRACE_VLOG(cares_resolver, 2)
wsa_last_error, msg); << "(c-ares resolver) fd:" << GetName()
<< " ConnectEx error code:" << wsa_last_error << " msg:|" << msg
<< "|";
gpr_free(msg); gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) { if (wsa_last_error == WSA_IO_PENDING) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
@ -610,11 +622,12 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) { if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error, error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
"OnIocpReadableInner"); "OnIocpReadableInner");
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error " << "(c-ares resolver) fd:|" << GetName()
"code:|%d| msg:|%s|", << "| OnIocpReadableInner winsocket_->read_info.wsa_error "
GetName(), winsocket_->read_info.wsa_error, "code:|"
StatusToString(error).c_str()); << winsocket_->read_info.wsa_error << "| msg:|"
<< StatusToString(error) << "|";
} }
} }
} }
@ -626,9 +639,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
CSliceUnref(read_buf_); CSliceUnref(read_buf_);
read_buf_ = grpc_empty_slice(); read_buf_ = grpc_empty_slice();
} }
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(), << "(c-ares resolver) fd:|" << GetName()
GRPC_SLICE_LENGTH(read_buf_)); << "| OnIocpReadable finishing. read buf length now:|"
<< GRPC_SLICE_LENGTH(read_buf_) << "|";
ScheduleAndNullReadClosure(error); ScheduleAndNullReadClosure(error);
} }
@ -639,17 +653,19 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
} }
void OnIocpWriteableLocked(grpc_error_handle error) { void OnIocpWriteableLocked(grpc_error_handle error) {
GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName()); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) OnIocpWriteableInner. fd:|" << GetName() << "|";
CHECK(socket_type_ == SOCK_STREAM); CHECK(socket_type_ == SOCK_STREAM);
if (error.ok()) { if (error.ok()) {
if (winsocket_->write_info.wsa_error != 0) { if (winsocket_->write_info.wsa_error != 0) {
error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error, error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
"OnIocpWriteableInner"); "OnIocpWriteableInner");
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error " << "(c-ares resolver) fd:|" << GetName()
"code:|%d| msg:|%s|", << "| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
GetName(), winsocket_->write_info.wsa_error, "code:|"
StatusToString(error).c_str()); << winsocket_->write_info.wsa_error << "| msg:|"
<< StatusToString(error) << "|";
} }
} }
CHECK(tcp_write_state_ == WRITE_PENDING); CHECK(tcp_write_state_ == WRITE_PENDING);
@ -657,8 +673,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY; tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref( write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info.bytes_transferred); write_buf_, 0, winsocket_->write_info.bytes_transferred);
GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d", GRPC_TRACE_VLOG(cares_resolver, 2)
GetName(), winsocket_->write_info.bytes_transferred); << "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. bytes transferred:"
<< winsocket_->write_info.bytes_transferred;
} else { } else {
CSliceUnref(write_buf_); CSliceUnref(write_buf_);
write_buf_ = grpc_empty_slice(); write_buf_ = grpc_empty_slice();
@ -728,7 +746,9 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
// //
static ares_socket_t Socket(int af, int type, int protocol, void* user_data) { static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
if (type != SOCK_DGRAM && type != SOCK_STREAM) { if (type != SOCK_DGRAM && type != SOCK_STREAM) {
GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type); GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) Socket called with invalid socket type:"
<< type;
return INVALID_SOCKET; return INVALID_SOCKET;
} }
GrpcPolledFdFactoryWindows* self = GrpcPolledFdFactoryWindows* self =
@ -736,15 +756,16 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
SOCKET s = WSASocket(af, type, protocol, nullptr, 0, SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
grpc_get_default_wsa_socket_flags()); grpc_get_default_wsa_socket_flags());
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"WSASocket failed with params af:%d type:%d protocol:%d", af, type, << "(c-ares resolver) WSASocket failed with params af:" << af
protocol); << " type:" << type << " protocol:" << protocol;
return s; return s;
} }
grpc_error_handle error = grpc_tcp_set_non_block(s); grpc_error_handle error = grpc_tcp_set_non_block(s);
if (!error.ok()) { if (!error.ok()) {
GRPC_CARES_TRACE_LOG("WSAIoctl failed with error: %s", GRPC_TRACE_VLOG(cares_resolver, 2)
StatusToString(error).c_str()); << "(c-ares resolver) WSAIoctl failed with error: "
<< StatusToString(error);
return INVALID_SOCKET; return INVALID_SOCKET;
} }
auto on_shutdown_locked = [self, s]() { auto on_shutdown_locked = [self, s]() {
@ -755,9 +776,10 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
}; };
auto polled_fd = new GrpcPolledFdWindows(s, self->mu_, af, type, auto polled_fd = new GrpcPolledFdWindows(s, self->mu_, af, type,
std::move(on_shutdown_locked)); std::move(on_shutdown_locked));
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"fd:|%s| created with params af:%d type:%d protocol:%d", << "(c-ares resolver) fd:" << polled_fd->GetName()
polled_fd->GetName(), af, type, protocol); << " created with params af:" << af << " type:" << type
<< " protocol:" << protocol;
CHECK(self->sockets_.insert({s, polled_fd}).second); CHECK(self->sockets_.insert({s, polled_fd}).second);
return s; return s;
} }

@ -200,8 +200,9 @@ static absl::Status AresStatusToAbslStatus(int status,
static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
grpc_ares_ev_driver* ev_driver) grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
ev_driver); << "(c-ares resolver) request:" << ev_driver->request << " Ref ev_driver "
<< ev_driver;
gpr_ref(&ev_driver->refs); gpr_ref(&ev_driver->refs);
return ev_driver; return ev_driver;
} }
@ -211,11 +212,13 @@ static void grpc_ares_complete_request_locked(grpc_ares_request* r)
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
ev_driver); << "(c-ares resolver) request:" << ev_driver->request
<< " Unref ev_driver " << ev_driver;
if (gpr_unref(&ev_driver->refs)) { if (gpr_unref(&ev_driver->refs)) {
GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
ev_driver); << "(c-ares resolver) request:" << ev_driver->request
<< " destroy ev_driver " << ev_driver;
CHECK_EQ(ev_driver->fds, nullptr); CHECK_EQ(ev_driver->fds, nullptr);
ares_destroy(ev_driver->channel); ares_destroy(ev_driver->channel);
grpc_ares_complete_request_locked(ev_driver->request); grpc_ares_complete_request_locked(ev_driver->request);
@ -225,8 +228,9 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
static void fd_node_destroy_locked(fd_node* fdn) static void fd_node_destroy_locked(fd_node* fdn)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
fdn->grpc_polled_fd->GetName()); << "(c-ares resolver) request:" << fdn->ev_driver->request
<< " delete fd: " << fdn->grpc_polled_fd->GetName();
CHECK(!fdn->readable_registered); CHECK(!fdn->readable_registered);
CHECK(!fdn->writable_registered); CHECK(!fdn->writable_registered);
CHECK(fdn->already_shutdown); CHECK(fdn->already_shutdown);
@ -292,21 +296,21 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm(
// by the c-ares code comments. // by the c-ares code comments.
grpc_core::Duration until_next_ares_backup_poll_alarm = grpc_core::Duration until_next_ares_backup_poll_alarm =
grpc_core::Duration::Seconds(1); grpc_core::Duration::Seconds(1);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p ev_driver=%p. next ares process poll time in " << "(c-ares resolver) request:" << driver->request
"%" PRId64 " ms", << " ev_driver=" << driver << ". next ares process poll time in "
driver->request, driver, until_next_ares_backup_poll_alarm.millis()); << until_next_ares_backup_poll_alarm.millis() << " ms";
return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm; return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm;
} }
static void on_timeout(void* arg, grpc_error_handle error) { static void on_timeout(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg); grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu); grpc_core::MutexLock lock(&driver->request->mu);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " << "(c-ares resolver) request:" << driver->request
"err=%s", << " ev_driver=" << driver
driver->request, driver, driver->shutting_down, << " on_timeout_locked. driver->shutting_down=" << driver->shutting_down
grpc_core::StatusToString(error).c_str()); << ". err=" << grpc_core::StatusToString(error);
if (!driver->shutting_down && error.ok()) { if (!driver->shutting_down && error.ok()) {
grpc_ares_ev_driver_shutdown_locked(driver); grpc_ares_ev_driver_shutdown_locked(driver);
} }
@ -327,20 +331,20 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg); grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu); grpc_core::MutexLock lock(&driver->request->mu);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " << "(c-ares resolver) request:" << driver->request
"driver->shutting_down=%d. " << " ev_driver=" << driver
"err=%s", << " on_ares_backup_poll_alarm_locked. driver->shutting_down="
driver->request, driver, driver->shutting_down, << driver->shutting_down << ". err=" << grpc_core::StatusToString(error);
grpc_core::StatusToString(error).c_str());
if (!driver->shutting_down && error.ok()) { if (!driver->shutting_down && error.ok()) {
fd_node* fdn = driver->fds; fd_node* fdn = driver->fds;
while (fdn != nullptr) { while (fdn != nullptr) {
if (!fdn->already_shutdown) { if (!fdn->already_shutdown) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; " << "(c-ares resolver) request:" << driver->request
"ares_process_fd. fd=%s", << " ev_driver=" << driver
driver->request, driver, fdn->grpc_polled_fd->GetName()); << " on_ares_backup_poll_alarm_locked; ares_process_fd. fd="
<< fdn->grpc_polled_fd->GetName();
ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
ares_process_fd(driver->channel, as, as); ares_process_fd(driver->channel, as, as);
} }
@ -373,8 +377,9 @@ static void on_readable(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* ev_driver = fdn->ev_driver; grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->readable_registered = false; fdn->readable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
fdn->grpc_polled_fd->GetName()); << "(c-ares resolver) request:" << fdn->ev_driver->request
<< " readable on " << fdn->grpc_polled_fd->GetName();
if (error.ok() && !ev_driver->shutting_down) { if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} else { } else {
@ -397,8 +402,9 @@ static void on_writable(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* ev_driver = fdn->ev_driver; grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->writable_registered = false; fdn->writable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
fdn->grpc_polled_fd->GetName()); << "(c-ares resolver) request:" << ev_driver->request << " writable on "
<< fdn->grpc_polled_fd->GetName();
if (error.ok() && !ev_driver->shutting_down) { if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
} else { } else {
@ -433,8 +439,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
fdn->grpc_polled_fd = fdn->grpc_polled_fd =
ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
socks[i], ev_driver->pollset_set); socks[i], ev_driver->pollset_set);
GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, GRPC_TRACE_VLOG(cares_resolver, 2)
fdn->grpc_polled_fd->GetName()); << "(c-ares resolver) request:" << ev_driver->request
<< " new fd: " << fdn->grpc_polled_fd->GetName();
fdn->readable_registered = false; fdn->readable_registered = false;
fdn->writable_registered = false; fdn->writable_registered = false;
fdn->already_shutdown = false; fdn->already_shutdown = false;
@ -449,15 +456,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn, GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) { if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) {
GRPC_CARES_TRACE_LOG("request:%p schedule direct read on: %s", GRPC_TRACE_VLOG(cares_resolver, 2)
ev_driver->request, << "(c-ares resolver) request:" << ev_driver->request
fdn->grpc_polled_fd->GetName()); << " schedule direct read on: "
<< fdn->grpc_polled_fd->GetName();
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure, grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure,
absl::OkStatus()); absl::OkStatus());
} else { } else {
GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", GRPC_TRACE_VLOG(cares_resolver, 2)
ev_driver->request, << "(c-ares resolver) request:" << ev_driver->request
fdn->grpc_polled_fd->GetName()); << " notify read on: " << fdn->grpc_polled_fd->GetName();
fdn->grpc_polled_fd->RegisterForOnReadableLocked( fdn->grpc_polled_fd->RegisterForOnReadableLocked(
&fdn->read_closure); &fdn->read_closure);
} }
@ -467,9 +475,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
// has not been registered with this socket. // has not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) { !fdn->writable_registered) {
GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", GRPC_TRACE_VLOG(cares_resolver, 2)
ev_driver->request, << "(c-ares resolver) request:" << ev_driver->request
fdn->grpc_polled_fd->GetName()); << " notify write on: " << fdn->grpc_polled_fd->GetName();
grpc_ares_ev_driver_ref(ev_driver); grpc_ares_ev_driver_ref(ev_driver);
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
@ -505,10 +513,11 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver)
ev_driver->query_timeout_ms == 0 ev_driver->query_timeout_ms == 0
? grpc_core::Duration::Infinity() ? grpc_core::Duration::Infinity()
: grpc_core::Duration::Milliseconds(ev_driver->query_timeout_ms); : grpc_core::Duration::Milliseconds(ev_driver->query_timeout_ms);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " << "(c-ares resolver) request:" << ev_driver->request
"%" PRId64 " ms", << " ev_driver=" << ev_driver
ev_driver->request, ev_driver, timeout.millis()); << " grpc_ares_ev_driver_start_locked. timeout in " << timeout.millis()
<< " ms";
grpc_ares_ev_driver_ref(ev_driver); grpc_ares_ev_driver_ref(ev_driver);
GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver, GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
@ -547,7 +556,8 @@ grpc_error_handle grpc_ares_ev_driver_create_locked(
} }
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
grpc_ares_test_only_inject_config(&(*ev_driver)->channel); grpc_ares_test_only_inject_config(&(*ev_driver)->channel);
GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << request
<< " grpc_ares_ev_driver_create_locked";
if (status != ARES_SUCCESS) { if (status != ARES_SUCCESS) {
grpc_error_handle err = GRPC_ERROR_CREATE(absl::StrCat( grpc_error_handle err = GRPC_ERROR_CREATE(absl::StrCat(
"Failed to init ares channel. C-ares error: ", ares_strerror(status))); "Failed to init ares channel. C-ares error: ", ares_strerror(status)));
@ -645,10 +655,10 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, const char* host, uint16_t port, grpc_ares_request* parent_request, const char* host, uint16_t port,
bool is_balancer, const char* qtype) bool is_balancer, const char* qtype)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) { ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p create_hostbyname_request_locked host:%s port:%d " << "(c-ares resolver) request:" << parent_request
"is_balancer:%d qtype:%s", << " create_hostbyname_request_locked host:" << host << " port:" << port
parent_request, host, port, is_balancer, qtype); << " is_balancer:" << is_balancer << " qtype:" << qtype;
grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request(); grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request();
hr->parent_request = parent_request; hr->parent_request = parent_request;
hr->host = gpr_strdup(host); hr->host = gpr_strdup(host);
@ -675,9 +685,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
static_cast<grpc_ares_hostbyname_request*>(arg); static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request; grpc_ares_request* r = hr->parent_request;
if (status == ARES_SUCCESS) { if (status == ARES_SUCCESS) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r, << "(c-ares resolver) request:" << r
hr->qtype, hr->host); << " on_hostbyname_done_locked qtype=" << hr->qtype
<< " host=" << hr->host << " ARES_SUCCESS";
std::unique_ptr<EndpointAddressesList>* address_list_ptr = std::unique_ptr<EndpointAddressesList>* address_list_ptr =
hr->is_balancer ? r->balancer_addresses_out : r->addresses_out; hr->is_balancer ? r->balancer_addresses_out : r->addresses_out;
if (*address_list_ptr == nullptr) { if (*address_list_ptr == nullptr) {
@ -701,10 +712,11 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
addr->sin6_port = hr->port; addr->sin6_port = hr->port;
char output[INET6_ADDRSTRLEN]; char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN); ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p c-ares resolver gets a AF_INET6 result: \n" << "(c-ares resolver) request:" << r
" addr: %s\n port: %d\n sin6_scope_id: %d\n", << " c-ares resolver gets a AF_INET6 result: \n"
r, output, ntohs(hr->port), addr->sin6_scope_id); << " addr: " << output << "\n port: " << ntohs(hr->port)
<< "\n sin6_scope_id: " << addr->sin6_scope_id << "\n";
break; break;
} }
case AF_INET: { case AF_INET: {
@ -716,10 +728,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
addr->sin_port = hr->port; addr->sin_port = hr->port;
char output[INET_ADDRSTRLEN]; char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN); ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN);
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p c-ares resolver gets a AF_INET result: \n" << "(c-ares resolver) request:" << r
" addr: %s\n port: %d\n", << " c-ares resolver gets a AF_INET result: \n addr: " << output
r, output, ntohs(hr->port)); << "\n port: " << ntohs(hr->port) << "\n";
break; break;
} }
} }
@ -729,8 +741,9 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
std::string error_msg = absl::StrFormat( std::string error_msg = absl::StrFormat(
"C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s", "C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s",
hr->qtype, hr->host, hr->is_balancer, ares_strerror(status)); hr->qtype, hr->host, hr->is_balancer, ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r, GRPC_TRACE_VLOG(cares_resolver, 2)
error_msg.c_str()); << "(c-ares resolver) request:" << r
<< " on_hostbyname_done_locked: " << error_msg;
r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg),
r->error); r->error);
} }
@ -745,13 +758,14 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
GrpcAresQuery* q = static_cast<GrpcAresQuery*>(arg); GrpcAresQuery* q = static_cast<GrpcAresQuery*>(arg);
grpc_ares_request* r = q->parent_request(); grpc_ares_request* r = q->parent_request();
if (status == ARES_SUCCESS) { if (status == ARES_SUCCESS) {
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r, << "(c-ares resolver) request:" << r
q->name().c_str()); << " on_srv_query_done_locked name=" << q->name() << " ARES_SUCCESS";
struct ares_srv_reply* reply; struct ares_srv_reply* reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r, GRPC_TRACE_VLOG(cares_resolver, 2)
parse_status); << "(c-ares resolver) request:" << r
<< " ares_parse_srv_reply: " << parse_status;
if (parse_status == ARES_SUCCESS) { if (parse_status == ARES_SUCCESS) {
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) { srv_it = srv_it->next) {
@ -775,8 +789,9 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
std::string error_msg = absl::StrFormat( std::string error_msg = absl::StrFormat(
"C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(), "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(),
ares_strerror(status)); ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r, GRPC_TRACE_VLOG(cares_resolver, 2)
error_msg.c_str()); << "(c-ares resolver) request:" << r
<< " on_srv_query_done_locked: " << error_msg;
r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg),
r->error); r->error);
} }
@ -797,8 +812,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
struct ares_txt_ext* result = nullptr; struct ares_txt_ext* result = nullptr;
struct ares_txt_ext* reply = nullptr; struct ares_txt_ext* reply = nullptr;
if (status != ARES_SUCCESS) goto fail; if (status != ARES_SUCCESS) goto fail;
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r, GRPC_TRACE_VLOG(cares_resolver, 2)
q->name().c_str()); << "(c-ares resolver) request:" << r
<< " on_txt_done_locked name=" << q->name() << " ARES_SUCCESS";
status = ares_parse_txt_reply_ext(buf, len, &reply); status = ares_parse_txt_reply_ext(buf, len, &reply);
if (status != ARES_SUCCESS) goto fail; if (status != ARES_SUCCESS) goto fail;
// Find service config in TXT record. // Find service config in TXT record.
@ -826,8 +842,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
service_config_len += result->length; service_config_len += result->length;
} }
(*r->service_config_json_out)[service_config_len] = '\0'; (*r->service_config_json_out)[service_config_len] = '\0';
GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r, GRPC_TRACE_VLOG(cares_resolver, 2)
*r->service_config_json_out); << "(c-ares resolver) request:" << r
<< " found service config: " << *r->service_config_json_out;
} }
// Clean up. // Clean up.
ares_free_data(reply); ares_free_data(reply);
@ -837,8 +854,8 @@ fail:
std::string error_msg = std::string error_msg =
absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s",
q->name(), ares_strerror(status)); q->name(), ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r, GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << r
error_msg.c_str()); << " on_txt_done_locked " << error_msg;
r->error = r->error =
grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error);
} }
@ -847,8 +864,9 @@ grpc_error_handle set_request_dns_server(grpc_ares_request* r,
absl::string_view dns_server) absl::string_view dns_server)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
if (!dns_server.empty()) { if (!dns_server.empty()) {
GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r, GRPC_TRACE_VLOG(cares_resolver, 2)
dns_server.data()); << "(c-ares resolver) request:" << r << " Using DNS server "
<< dns_server.data();
grpc_resolved_address addr; grpc_resolved_address addr;
if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) { if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
r->dns_server_addr.family = AF_INET; r->dns_server_addr.family = AF_INET;
@ -1043,10 +1061,10 @@ static grpc_ares_request* grpc_dns_lookup_hostname_ares_impl(
r->ev_driver = nullptr; r->ev_driver = nullptr;
r->on_done = on_done; r->on_done = on_done;
r->addresses_out = addrs; r->addresses_out = addrs;
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p c-ares grpc_dns_lookup_hostname_ares_impl name=%s, " << "(c-ares resolver) request:" << r
"default_port=%s", << " c-ares grpc_dns_lookup_hostname_ares_impl name=" << name
r, name, default_port); << ", default_port=" << default_port;
// Early out if the target is an ipv4 or ipv6 literal. // Early out if the target is an ipv4 or ipv6 literal.
if (resolve_as_ip_literal_locked(name, default_port, addrs)) { if (resolve_as_ip_literal_locked(name, default_port, addrs)) {
grpc_ares_complete_request_locked(r); grpc_ares_complete_request_locked(r);
@ -1097,8 +1115,9 @@ grpc_ares_request* grpc_dns_lookup_srv_ares_impl(
r->ev_driver = nullptr; r->ev_driver = nullptr;
r->on_done = on_done; r->on_done = on_done;
r->balancer_addresses_out = balancer_addresses; r->balancer_addresses_out = balancer_addresses;
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p c-ares grpc_dns_lookup_srv_ares_impl name=%s", r, name); << "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_srv_ares_impl name=" << name;
grpc_error_handle error; grpc_error_handle error;
// Don't query for SRV records if the target is "localhost" // Don't query for SRV records if the target is "localhost"
if (target_matches_localhost(name)) { if (target_matches_localhost(name)) {
@ -1135,8 +1154,9 @@ grpc_ares_request* grpc_dns_lookup_txt_ares_impl(
r->ev_driver = nullptr; r->ev_driver = nullptr;
r->on_done = on_done; r->on_done = on_done;
r->service_config_json_out = service_config_json; r->service_config_json_out = service_config_json;
GRPC_CARES_TRACE_LOG( GRPC_TRACE_VLOG(cares_resolver, 2)
"request:%p c-ares grpc_dns_lookup_txt_ares_impl name=%s", r, name); << "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_txt_ares_impl name=" << name;
grpc_error_handle error; grpc_error_handle error;
// Don't query for TXT records if the target is "localhost" // Don't query for TXT records if the target is "localhost"
if (target_matches_localhost(name)) { if (target_matches_localhost(name)) {
@ -1185,8 +1205,9 @@ grpc_ares_request* (*grpc_dns_lookup_txt_ares)(
static void grpc_cancel_ares_request_impl(grpc_ares_request* r) { static void grpc_cancel_ares_request_impl(grpc_ares_request* r) {
CHECK_NE(r, nullptr); CHECK_NE(r, nullptr);
grpc_core::MutexLock lock(&r->mu); grpc_core::MutexLock lock(&r->mu);
GRPC_CARES_TRACE_LOG("request:%p grpc_cancel_ares_request ev_driver:%p", r, GRPC_TRACE_VLOG(cares_resolver, 2)
r->ev_driver); << "(c-ares resolver) request:" << r
<< " grpc_cancel_ares_request ev_driver:" << r->ev_driver;
if (r->ev_driver != nullptr) { if (r->ev_driver != nullptr) {
grpc_ares_ev_driver_shutdown_locked(r->ev_driver); grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
} }

@ -39,13 +39,6 @@
#define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000 #define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000
#define GRPC_CARES_TRACE_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) { \
VLOG(2) << "(c-ares resolver) " << absl::StrFormat(format, __VA_ARGS__); \
} \
} while (0)
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
struct grpc_ares_request { struct grpc_ares_request {

@ -38,15 +38,14 @@
#include "src/core/handshaker/handshaker.h" #include "src/core/handshaker/handshaker.h"
#include "src/core/handshaker/handshaker_registry.h" #include "src/core/handshaker/handshaker_registry.h"
#include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h" #include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/api.h" #include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h" #include "src/core/lib/security/security_connector/security_connector.h"
@ -59,6 +58,9 @@ namespace grpc_core {
namespace { namespace {
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::ResolvedAddressToURI;
grpc_httpcli_get_override g_get_override; grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override; grpc_httpcli_post_override g_post_override;
grpc_httpcli_put_override g_put_override; grpc_httpcli_put_override g_put_override;
@ -173,7 +175,10 @@ HttpRequest::HttpRequest(
pollent_(pollent), pollent_(pollent),
pollset_set_(grpc_pollset_set_create()), pollset_set_(grpc_pollset_set_create()),
test_only_generate_response_(std::move(test_only_generate_response)), test_only_generate_response_(std::move(test_only_generate_response)),
resolver_(GetDNSResolver()) { resolver_(
ChannelArgs::FromC(channel_args_)
.GetObjectRef<EventEngine>()
->GetDNSResolver(EventEngine::DNSResolver::ResolverOptions())) {
grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response); grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
grpc_slice_buffer_init(&incoming_); grpc_slice_buffer_init(&incoming_);
grpc_slice_buffer_init(&outgoing_); grpc_slice_buffer_init(&outgoing_);
@ -207,11 +212,14 @@ void HttpRequest::Start() {
test_only_generate_response_.value()(); test_only_generate_response_.value()();
return; return;
} }
if (!resolver_.ok()) {
Finish(resolver_.status());
return;
}
Ref().release(); // ref held by pending DNS resolution Ref().release(); // ref held by pending DNS resolution
dns_request_handle_ = resolver_->LookupHostname( (*resolver_)
absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(), ->LookupHostname(absl::bind_front(&HttpRequest::OnResolved, this),
uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_, uri_.authority(), uri_.scheme());
/*name_server=*/"");
} }
void HttpRequest::Orphan() { void HttpRequest::Orphan() {
@ -220,10 +228,8 @@ void HttpRequest::Orphan() {
CHECK(!cancelled_); CHECK(!cancelled_);
cancelled_ = true; cancelled_ = true;
// cancel potentially pending DNS resolution. // cancel potentially pending DNS resolution.
if (dns_request_handle_.has_value() && if (*resolver_ != nullptr) {
resolver_->Cancel(dns_request_handle_.value())) { resolver_->reset();
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
Unref();
} }
if (handshake_mgr_ != nullptr) { if (handshake_mgr_ != nullptr) {
// Shutdown will cancel any ongoing tcp connect. // Shutdown will cancel any ongoing tcp connect.
@ -239,8 +245,7 @@ void HttpRequest::AppendError(grpc_error_handle error) {
if (overall_error_.ok()) { if (overall_error_.ok()) {
overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
} }
const grpc_resolved_address* addr = &addresses_[next_address_ - 1]; auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]);
auto addr_text = grpc_sockaddr_to_uri(addr);
if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error)); if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error));
overall_error_ = grpc_error_add_child(overall_error_, std::move(error)); overall_error_ = grpc_error_add_child(overall_error_, std::move(error));
} }
@ -310,7 +315,7 @@ void HttpRequest::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
StartWrite(); StartWrite();
} }
void HttpRequest::DoHandshake(const grpc_resolved_address* addr) { void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
// Create the security connector using the credentials and target name. // Create the security connector using the credentials and target name.
ChannelArgs args = ChannelArgs::FromC(channel_args_); ChannelArgs args = ChannelArgs::FromC(channel_args_);
RefCountedPtr<grpc_channel_security_connector> sc = RefCountedPtr<grpc_channel_security_connector> sc =
@ -321,7 +326,7 @@ void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
&overall_error_, 1)); &overall_error_, 1));
return; return;
} }
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr); absl::StatusOr<std::string> address = ResolvedAddressToURI(addr);
if (!address.ok()) { if (!address.ok()) {
Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address", Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
&overall_error_, 1)); &overall_error_, 1));
@ -354,15 +359,16 @@ void HttpRequest::NextAddress(grpc_error_handle error) {
&overall_error_, 1)); &overall_error_, 1));
return; return;
} }
const grpc_resolved_address* addr = &addresses_[next_address_++]; DoHandshake(addresses_[next_address_++]);
DoHandshake(addr);
} }
void HttpRequest::OnResolved( void HttpRequest::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) { absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or) {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
RefCountedPtr<HttpRequest> unreffer(this); RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_); MutexLock lock(&mu_);
dns_request_handle_.reset(); resolver_->reset();
if (cancelled_) { if (cancelled_) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution")); Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
return; return;

@ -32,6 +32,7 @@
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/slice.h> #include <grpc/slice.h>
@ -48,8 +49,6 @@
#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
#include "src/core/util/http_client/parser.h" #include "src/core/util/http_client/parser.h"
@ -223,13 +222,16 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result); void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result);
void DoHandshake(const grpc_resolved_address* addr) void DoHandshake(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void OnResolved( void OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or); absl::StatusOr<std::vector<
grpc_event_engine::experimental::EventEngine::ResolvedAddress>>
addresses_or);
const URI uri_; const URI uri_;
const grpc_slice request_text_; const grpc_slice request_text_;
@ -250,16 +252,17 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
RefCountedPtr<HandshakeManager> handshake_mgr_ ABSL_GUARDED_BY(mu_); RefCountedPtr<HandshakeManager> handshake_mgr_ ABSL_GUARDED_BY(mu_);
bool cancelled_ ABSL_GUARDED_BY(mu_) = false; bool cancelled_ ABSL_GUARDED_BY(mu_) = false;
grpc_http_parser parser_ ABSL_GUARDED_BY(mu_); grpc_http_parser parser_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_resolved_address> addresses_ ABSL_GUARDED_BY(mu_); std::vector<grpc_event_engine::experimental::EventEngine::ResolvedAddress>
addresses_ ABSL_GUARDED_BY(mu_);
size_t next_address_ ABSL_GUARDED_BY(mu_) = 0; size_t next_address_ ABSL_GUARDED_BY(mu_) = 0;
int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0; int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0;
grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_); grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_); grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_);
grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus(); grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus();
std::shared_ptr<DNSResolver> resolver_; absl::StatusOr<std::unique_ptr<
absl::optional<DNSResolver::TaskHandle> dns_request_handle_ grpc_event_engine::experimental::EventEngine::DNSResolver>>
ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle; resolver_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -30,7 +30,6 @@ cdef class _CallState:
cdef object call_tracer_capsule cdef object call_tracer_capsule
cdef void maybe_save_registered_method(self, bytes method_name) except * cdef void maybe_save_registered_method(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except * cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *
cdef void delete_call(self) except * cdef void delete_call(self) except *

@ -76,12 +76,6 @@ cdef class _CallState:
with nogil: with nogil:
grpc_call_unref(self.c_call) grpc_call_unref(self.c_call)
self.c_call = NULL self.c_call = NULL
self.maybe_delete_call_tracer()
cdef void maybe_delete_call_tracer(self) except *:
if not self.call_tracer_capsule:
return
_observability.delete_call_tracer(self.call_tracer_capsule)
cdef void maybe_save_registered_method(self, bytes method_name) except *: cdef void maybe_save_registered_method(self, bytes method_name) except *:
with _observability.get_plugin() as plugin: with _observability.get_plugin() as plugin:

@ -76,7 +76,7 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core":
void RegisterGlobal(ServerCallTracerFactory* factory) nogil void RegisterGlobal(ServerCallTracerFactory* factory) nogil
cdef extern from "src/core/lib/surface/call.h": cdef extern from "src/core/lib/surface/call.h":
void grpc_call_tracer_set(grpc_call* call, void* value) nogil void grpc_call_tracer_set_and_manage(grpc_call* call, void* value) nogil
void* grpc_call_tracer_get(grpc_call* call) nogil void* grpc_call_tracer_get(grpc_call* call) nogil

@ -50,7 +50,7 @@ def maybe_save_server_trace_context(RequestCallEvent event) -> None:
cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr): cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
grpc_call_tracer_set(call, call_tracer) grpc_call_tracer_set_and_manage(call, call_tracer)
cdef void* _get_call_tracer(grpc_call* call): cdef void* _get_call_tracer(grpc_call* call):

@ -100,23 +100,6 @@ class ObservabilityPlugin(
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as name.
Args:
client_call_tracer: A PyCapsule which stores a ClientCallTracer object.
"""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def save_trace_context( def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool self, trace_id: str, span_id: str, is_sampled: bool
@ -276,22 +259,6 @@ def observability_deinit() -> None:
_cygrpc.clear_server_call_tracer_factory() _cygrpc.clear_server_call_tracer_factory()
def delete_call_tracer(client_call_tracer_capsule: Any) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
This method will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as the name.
Args:
client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object.
"""
with get_plugin() as plugin:
if plugin and plugin.observability_enabled:
plugin.delete_client_call_tracer(client_call_tracer_capsule)
def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
"""Record the latency of the RPC, if the plugin is registered and stats is enabled. """Record the latency of the RPC, if the plugin is registered and stats is enabled.

@ -155,15 +155,6 @@ def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifi
return capsule return capsule
def delete_client_call_tracer(object client_call_tracer) -> None:
client_call_tracer: grpc._observability.ClientCallTracerCapsule
if cpython.PyCapsule_IsValid(client_call_tracer, CLIENT_CALL_TRACER):
capsule_ptr = cpython.PyCapsule_GetPointer(client_call_tracer, CLIENT_CALL_TRACER)
call_tracer_ptr = <ClientCallTracer*>capsule_ptr
del call_tracer_ptr
def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]: def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]:
py_labels = {} py_labels = {}
for label in c_labels: for label in c_labels:

@ -438,11 +438,6 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
) )
return capsule return capsule
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)
def save_trace_context( def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool self, trace_id: str, span_id: str, is_sampled: bool
) -> None: ) -> None:

@ -127,9 +127,8 @@ void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
void AddCensusDataToBuffer(const CensusData& data) { void AddCensusDataToBuffer(const CensusData& data) {
std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex); std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) { if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
gpr_log(GPR_DEBUG, VLOG(2) << "Reached maximum census data buffer size, discarding this "
"Reached maximum census data buffer size, discarding this " "CensusData entry";
"CensusData entry");
} else { } else {
g_census_data_buffer->push(data); g_census_data_buffer->push(data);
} }

Loading…
Cancel
Save