Merge Master

pull/37476/head
tanvi-jagtap 3 months ago
commit c05fd6445e
  1. 4
      BUILD
  2. 9
      src/core/handshaker/security/secure_endpoint.cc
  3. 2
      src/core/lib/experiments/experiments.yaml
  4. 7
      src/core/lib/surface/call.cc
  5. 19
      src/core/lib/surface/call.h
  6. 105
      src/core/resolver/dns/c_ares/dns_resolver_ares.cc
  7. 196
      src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  8. 199
      src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc
  9. 7
      src/core/resolver/dns/c_ares/grpc_ares_wrapper.h
  10. 44
      src/core/util/http_client/httpcli.cc
  11. 19
      src/core/util/http_client/httpcli.h
  12. 1
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  13. 6
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  14. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  15. 2
      src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi
  16. 33
      src/python/grpcio/grpc/_observability.py
  17. 9
      src/python/grpcio_observability/grpc_observability/_cyobservability.pyx
  18. 5
      src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py
  19. 5
      src/python/grpcio_observability/grpc_observability/observability_util.cc

@ -3938,6 +3938,7 @@ grpc_cc_library(
deps = [
"config",
"debug_location",
"event_engine_base_hdrs",
"exec_ctx",
"gpr",
"grpc_base",
@ -3949,17 +3950,16 @@ grpc_cc_library(
"orphanable",
"ref_counted_ptr",
"resource_quota_api",
"sockaddr_utils",
"uri_parser",
"//src/core:channel_args",
"//src/core:channel_args_preconditioning",
"//src/core:closure",
"//src/core:error",
"//src/core:error_utils",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:handshaker_registry",
"//src/core:iomgr_fwd",
"//src/core:pollset_set",
"//src/core:resolved_address",
"//src/core:resource_quota",
"//src/core:slice",
"//src/core:slice_refcount",

@ -252,6 +252,13 @@ static void on_read(void* user_data, grpc_error_handle error) {
{
grpc_core::MutexLock l(&ep->read_mu);
// If we were shut down after this callback was scheduled with OK
// status but before it was invoked, we need to treat that as an error.
if (ep->wrapped_ep == nullptr && error.ok()) {
error = absl::CancelledError("secure endpoint shutdown");
}
uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
@ -505,8 +512,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
static void endpoint_destroy(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->read_mu.Lock();
ep->wrapped_ep.reset();
ep->memory_owner.Reset();
ep->read_mu.Unlock();
SECURE_ENDPOINT_UNREF(ep, "destroy");
}

@ -105,7 +105,7 @@
test_tags: [flow_control_test]
- name: pick_first_new
description: New pick_first impl with memory reduction.
expiry: 2024/07/30
expiry: 2024/10/30
owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]
- name: promise_based_inproc_transport

@ -497,6 +497,13 @@ void grpc_call_tracer_set(grpc_call* call,
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
arena->ManagedNew<ClientCallTracerWrapper>(tracer);
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}
void* grpc_call_tracer_get(grpc_call* call) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
auto* call_tracer =

@ -265,6 +265,16 @@ void grpc_call_log_batch(const char* file, int line, const grpc_op* ops,
void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer);
// Sets call tracer on the call and manages its life by using the call's arena.
// When using this API, the tracer will be destroyed by grpc_call arena when
// grpc_call is about to be destroyed. The caller of this API SHOULD NOT
// manually destroy the tracer. This API is used by Python as a way of using
// Arena to manage the lifetime of the call tracer. Python needs this API
// because the tracer was created within a separate shared object library which
// doesn't have access to core functions like arena->ManagedNew<>.
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer);
void* grpc_call_tracer_get(grpc_call* call);
#define GRPC_CALL_LOG_BATCH(ops, nops) \
@ -276,6 +286,15 @@ void* grpc_call_tracer_get(grpc_call* call);
uint8_t grpc_call_is_client(grpc_call* call);
class ClientCallTracerWrapper {
public:
explicit ClientCallTracerWrapper(grpc_core::ClientCallTracer* tracer)
: tracer_(tracer) {}
private:
std::unique_ptr<grpc_core::ClientCallTracer> tracer_;
};
// Return an appropriate compression algorithm for the requested compression \a
// level in the context of \a call.
grpc_compression_algorithm grpc_call_compression_for_level(

@ -106,9 +106,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(),
kDefaultSecurePort, resolver_->interested_parties(),
&on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving hostnames. hostname_request_:%p",
resolver_.get(), hostname_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << resolver_.get()
<< " Started resolving hostnames. hostname_request_:"
<< hostname_request_.get();
if (resolver_->enable_srv_queries_) {
Ref(DEBUG_LOCATION, "OnSRVResolved").release();
GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr);
@ -117,9 +118,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_srv_resolved_,
&balancer_addresses_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving SRV records. srv_request_:%p",
resolver_.get(), srv_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << resolver_.get()
<< " Started resolving SRV records. srv_request_:"
<< srv_request_.get();
}
if (resolver_->request_service_config_) {
Ref(DEBUG_LOCATION, "OnTXTResolved").release();
@ -129,9 +131,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_txt_resolved_,
&service_config_json_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving TXT records. txt_request_:%p",
resolver_.get(), txt_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << resolver_.get()
<< " Started resolving TXT records. txt_request_:"
<< txt_request_.get();
}
}
@ -219,8 +222,9 @@ AresClientChannelDNSResolver::AresClientChannelDNSResolver(
.value_or(GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS))) {}
AresClientChannelDNSResolver::~AresClientChannelDNSResolver() {
GRPC_CARES_TRACE_LOG("resolver:%p destroying AresClientChannelDNSResolver",
this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " destroying AresClientChannelDNSResolver";
}
OrphanablePtr<Orphanable> AresClientChannelDNSResolver::StartRequest() {
@ -283,15 +287,16 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) {
if (hostname_request_ != nullptr || srv_request_ != nullptr ||
txt_request_ != nullptr) {
GRPC_CARES_TRACE_LOG(
"resolver:%p OnResolved() waiting for results (hostname: %s, srv: %s, "
"txt: %s)",
this, hostname_request_ != nullptr ? "waiting" : "done",
srv_request_ != nullptr ? "waiting" : "done",
txt_request_ != nullptr ? "waiting" : "done");
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " OnResolved() waiting for results (hostname: "
<< (hostname_request_ != nullptr ? "waiting" : "done")
<< ", srv: " << (srv_request_ != nullptr ? "waiting" : "done")
<< ", txt: " << (txt_request_ != nullptr ? "waiting" : "done") << ")";
return absl::nullopt;
}
GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this << " OnResolved() proceeding";
Result result;
result.args = resolver_->channel_args();
// TODO(roth): Change logic to be able to report failures for addresses
@ -309,8 +314,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
absl::StrCat("failed to parse service config: ",
StatusToString(service_config_string.status())));
} else if (!service_config_string->empty()) {
GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s",
this, service_config_string->c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " selected service config choice: " << *service_config_string;
result.service_config = ServiceConfigImpl::Create(
resolver_->channel_args(), *service_config_string);
if (!result.service_config.ok()) {
@ -325,8 +331,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_);
}
} else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this,
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " dns resolution failed: " << StatusToString(error);
std::string error_message;
grpc_error_get_str(error, StatusStrProperty::kDescription, &error_message);
absl::Status status = absl::UnavailableError(
@ -375,8 +382,9 @@ class AresDNSResolver final : public DNSResolver {
class AresRequest {
public:
virtual ~AresRequest() {
GRPC_CARES_TRACE_LOG("AresRequest:%p dtor ares_request_:%p", this,
grpc_ares_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresRequest:" << this
<< " dtor ares_request_:" << grpc_ares_request_.get();
resolver_->UnregisterRequest(task_handle());
grpc_pollset_set_destroy(pollset_set_);
}
@ -397,8 +405,9 @@ class AresDNSResolver final : public DNSResolver {
bool Cancel() {
MutexLock lock(&mu_);
if (grpc_ares_request_ != nullptr) {
GRPC_CARES_TRACE_LOG("AresRequest:%p Cancel ares_request_:%p", this,
grpc_ares_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresRequest:" << this
<< " Cancel ares_request_:" << grpc_ares_request_.get();
if (completed_) return false;
// OnDnsLookupDone will still be run
completed_ = true;
@ -499,7 +508,8 @@ class AresDNSResolver final : public DNSResolver {
aba_token),
default_port_(default_port),
on_resolve_address_done_(std::move(on_resolve_address_done)) {
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p ctor", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this << " ctor";
}
std::unique_ptr<grpc_ares_request> MakeRequestLocked() override {
@ -508,13 +518,15 @@ class AresDNSResolver final : public DNSResolver {
name_server().c_str(), name().c_str(), default_port_.c_str(),
pollset_set(), on_dns_lookup_done(), &addresses_,
timeout().millis()));
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p Start ares_request_:%p",
this, ares_request.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request;
}
void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p OnComplete", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this << " OnComplete";
if (!error.ok()) {
on_resolve_address_done_(grpc_error_to_absl_status(error));
return;
@ -550,7 +562,8 @@ class AresDNSResolver final : public DNSResolver {
: AresRequest(name, name_server, timeout, interested_parties, resolver,
aba_token),
on_resolve_address_done_(std::move(on_resolve_address_done)) {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p ctor", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " ctor";
}
std::unique_ptr<grpc_ares_request> MakeRequestLocked() override {
@ -558,13 +571,15 @@ class AresDNSResolver final : public DNSResolver {
std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_srv_ares(
name_server().c_str(), name().c_str(), pollset_set(),
on_dns_lookup_done(), &balancer_addresses_, timeout().millis()));
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this,
ares_request.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request;
}
void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " OnComplete";
if (!error.ok()) {
on_resolve_address_done_(grpc_error_to_absl_status(error));
return;
@ -596,7 +611,8 @@ class AresDNSResolver final : public DNSResolver {
: AresRequest(name, name_server, timeout, interested_parties, resolver,
aba_token),
on_resolved_(std::move(on_resolved)) {
GRPC_CARES_TRACE_LOG("AresTXTRequest:%p ctor", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresTXTRequest:" << this << " ctor";
}
~AresTXTRequest() override { gpr_free(service_config_json_); }
@ -606,13 +622,15 @@ class AresDNSResolver final : public DNSResolver {
std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_txt_ares(
name_server().c_str(), name().c_str(), pollset_set(),
on_dns_lookup_done(), &service_config_json_, timeout().millis()));
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this,
ares_request.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request;
}
void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " OnComplete";
if (!error.ok()) {
on_resolved_(grpc_error_to_absl_status(error));
return;
@ -684,14 +702,15 @@ class AresDNSResolver final : public DNSResolver {
MutexLock lock(&mu_);
if (!open_requests_.contains(handle)) {
// Unknown request, possibly completed already, or an invalid handle.
GRPC_CARES_TRACE_LOG(
"AresDNSResolver:%p attempt to cancel unknown TaskHandle:%s", this,
HandleToString(handle).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresDNSResolver:" << this
<< " attempt to cancel unknown TaskHandle:" << HandleToString(handle);
return false;
}
auto* request = reinterpret_cast<AresRequest*>(handle.keys[0]);
GRPC_CARES_TRACE_LOG("AresDNSResolver:%p cancel ares_request:%p", this,
request);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresDNSResolver:" << this
<< " cancel ares_request:" << request;
return request->Cancel();
}

@ -133,8 +133,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
~GrpcPolledFdWindows() override {
GRPC_CARES_TRACE_LOG("fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ",
GetName(), shutdown_called_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| ~GrpcPolledFdWindows shutdown_called_: " << shutdown_called_;
CSliceUnref(read_buf_);
CSliceUnref(write_buf_);
CHECK_EQ(read_closure_, nullptr);
@ -173,10 +174,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void ContinueRegisterForOnReadableLocked() {
GRPC_CARES_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| ContinueRegisterForOnReadableLocked "
<< "wsa_connect_error_:" << wsa_connect_error_;
CHECK(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
@ -194,10 +195,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
&winsocket_->read_info.overlapped, nullptr)) {
int wsa_last_error = WSAGetLastError();
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG(
"fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
"msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnReadableLocked WSARecvFrom error code:|"
<< wsa_last_error << "| msg:|" << msg << "|";
gpr_free(msg);
if (wsa_last_error != WSA_IO_PENDING) {
ScheduleAndNullReadClosure(
@ -210,14 +211,15 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
if (socket_type_ == SOCK_DGRAM) {
GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called",
GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called";
} else {
CHECK(socket_type_ == SOCK_STREAM);
GRPC_CARES_TRACE_LOG(
"fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
"connect_done_: %d",
GetName(), tcp_write_state_, connect_done_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called tcp_write_state_: "
<< tcp_write_state_ << " connect_done_: " << connect_done_;
}
CHECK_EQ(write_closure_, nullptr);
write_closure_ = write_closure;
@ -234,10 +236,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void ContinueRegisterForOnWriteableLocked() {
GRPC_CARES_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| ContinueRegisterForOnWriteableLocked "
<< "wsa_connect_error_:" << wsa_connect_error_;
CHECK(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullWriteClosure(
@ -288,10 +290,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int /* flags */,
struct sockaddr* from, ares_socklen_t* from_len) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
"length:|%d|",
GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " RecvFrom called read_buf_has_data:" << read_buf_has_data_
<< " Current read buf length:" << GRPC_SLICE_LENGTH(read_buf_);
if (!read_buf_has_data_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
@ -340,20 +342,21 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
bytes_sent_ptr, flags, overlapped, nullptr);
*wsa_error_code = WSAGetLastError();
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
"overlapped:%p "
"return:%d *wsa_error_code:%d",
GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
overlapped, out, *wsa_error_code);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendWriteBuf WSASend buf.len:" << buf.len << " *bytes_sent_ptr:"
<< (bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0)
<< " overlapped:" << overlapped << " return:" << out
<< " *wsa_error_code:" << *wsa_error_code;
return out;
}
ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
GetName(), connect_done_, wsa_connect_error_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendV called connect_done_:" << connect_done_
<< " wsa_connect_error_:" << wsa_connect_error_;
if (!connect_done_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
@ -377,7 +380,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " SendVUDP called";
CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0);
CSliceUnref(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count);
@ -388,9 +392,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
write_buf_ = grpc_empty_slice();
wsa_error_ctx->SetWSAError(wsa_error_code);
char* msg = gpr_format_message(wsa_error_code);
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
wsa_error_code, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendVUDP SendWriteBuf error code:" << wsa_error_code
<< " msg:" << msg;
gpr_free(msg);
return -1;
}
@ -406,8 +411,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// out in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on
// this fd.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
GetName(), tcp_write_state_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendVTCP called tcp_write_state_:" << tcp_write_state_;
switch (tcp_write_state_) {
case WRITE_IDLE:
tcp_write_state_ = WRITE_REQUESTED;
@ -450,13 +456,13 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void OnTcpConnectLocked(grpc_error_handle error) {
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked error:|%s| "
"pending_register_for_readable:%d"
" pending_register_for_writeable:%d",
GetName(), StatusToString(error).c_str(),
pending_continue_register_for_on_readable_locked_,
pending_continue_register_for_on_writeable_locked_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " InnerOnTcpConnectLocked error:" << StatusToString(error)
<< " pending_register_for_readable:"
<< pending_continue_register_for_on_readable_locked_
<< " pending_register_for_writeable:"
<< pending_continue_register_for_on_writeable_locked_;
CHECK(!connect_done_);
connect_done_ = true;
CHECK_EQ(wsa_connect_error_, 0);
@ -473,10 +479,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
"msg:|%s|",
GetName(), wsa_connect_error_, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " InnerOnTcpConnectLocked WSA overlapped result code:"
<< wsa_connect_error_ << " msg:" << msg;
gpr_free(msg);
}
}
@ -502,7 +508,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " ConnectUDP";
CHECK(!connect_done_);
CHECK_EQ(wsa_connect_error_, 0);
SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
@ -512,8 +519,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
wsa_error_ctx->SetWSAError(wsa_connect_error_);
connect_done_ = true;
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(),
wsa_connect_error_, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " WSAConnect error code:|"
<< wsa_connect_error_ << "| msg:|" << msg << "|";
gpr_free(msg);
// c-ares expects a posix-style connect API
return out == 0 ? 0 : -1;
@ -521,7 +529,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " ConnectTCP";
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
@ -532,10 +541,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG(
"fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
"msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:"
<< wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
@ -555,8 +564,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(),
wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " bind error code:" << wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
@ -569,8 +579,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(),
wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " ConnectEx error code:" << wsa_last_error << " msg:|" << msg
<< "|";
gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
@ -610,11 +622,12 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
"OnIocpReadableInner");
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->read_info.wsa_error,
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|"
<< winsocket_->read_info.wsa_error << "| msg:|"
<< StatusToString(error) << "|";
}
}
}
@ -626,9 +639,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
CSliceUnref(read_buf_);
read_buf_ = grpc_empty_slice();
}
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
GRPC_SLICE_LENGTH(read_buf_));
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpReadable finishing. read buf length now:|"
<< GRPC_SLICE_LENGTH(read_buf_) << "|";
ScheduleAndNullReadClosure(error);
}
@ -639,17 +653,19 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void OnIocpWriteableLocked(grpc_error_handle error) {
GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) OnIocpWriteableInner. fd:|" << GetName() << "|";
CHECK(socket_type_ == SOCK_STREAM);
if (error.ok()) {
if (winsocket_->write_info.wsa_error != 0) {
error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
"OnIocpWriteableInner");
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->write_info.wsa_error,
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|"
<< winsocket_->write_info.wsa_error << "| msg:|"
<< StatusToString(error) << "|";
}
}
CHECK(tcp_write_state_ == WRITE_PENDING);
@ -657,8 +673,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info.bytes_transferred);
GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
GetName(), winsocket_->write_info.bytes_transferred);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. bytes transferred:"
<< winsocket_->write_info.bytes_transferred;
} else {
CSliceUnref(write_buf_);
write_buf_ = grpc_empty_slice();
@ -728,7 +746,9 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
//
static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
if (type != SOCK_DGRAM && type != SOCK_STREAM) {
GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) Socket called with invalid socket type:"
<< type;
return INVALID_SOCKET;
}
GrpcPolledFdFactoryWindows* self =
@ -736,15 +756,16 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
grpc_get_default_wsa_socket_flags());
if (s == INVALID_SOCKET) {
GRPC_CARES_TRACE_LOG(
"WSASocket failed with params af:%d type:%d protocol:%d", af, type,
protocol);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) WSASocket failed with params af:" << af
<< " type:" << type << " protocol:" << protocol;
return s;
}
grpc_error_handle error = grpc_tcp_set_non_block(s);
if (!error.ok()) {
GRPC_CARES_TRACE_LOG("WSAIoctl failed with error: %s",
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) WSAIoctl failed with error: "
<< StatusToString(error);
return INVALID_SOCKET;
}
auto on_shutdown_locked = [self, s]() {
@ -755,9 +776,10 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
};
auto polled_fd = new GrpcPolledFdWindows(s, self->mu_, af, type,
std::move(on_shutdown_locked));
GRPC_CARES_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << polled_fd->GetName()
<< " created with params af:" << af << " type:" << type
<< " protocol:" << protocol;
CHECK(self->sockets_.insert({s, polled_fd}).second);
return s;
}

@ -200,8 +200,9 @@ static absl::Status AresStatusToAbslStatus(int status,
static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request,
ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request << " Ref ev_driver "
<< ev_driver;
gpr_ref(&ev_driver->refs);
return ev_driver;
}
@ -211,11 +212,13 @@ static void grpc_ares_complete_request_locked(grpc_ares_request* r)
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request,
ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " Unref ev_driver " << ev_driver;
if (gpr_unref(&ev_driver->refs)) {
GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request,
ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " destroy ev_driver " << ev_driver;
CHECK_EQ(ev_driver->fds, nullptr);
ares_destroy(ev_driver->channel);
grpc_ares_complete_request_locked(ev_driver->request);
@ -225,8 +228,9 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
static void fd_node_destroy_locked(fd_node* fdn)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << fdn->ev_driver->request
<< " delete fd: " << fdn->grpc_polled_fd->GetName();
CHECK(!fdn->readable_registered);
CHECK(!fdn->writable_registered);
CHECK(fdn->already_shutdown);
@ -292,21 +296,21 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm(
// by the c-ares code comments.
grpc_core::Duration until_next_ares_backup_poll_alarm =
grpc_core::Duration::Seconds(1);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p. next ares process poll time in "
"%" PRId64 " ms",
driver->request, driver, until_next_ares_backup_poll_alarm.millis());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver << ". next ares process poll time in "
<< until_next_ares_backup_poll_alarm.millis() << " ms";
return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm;
}
static void on_timeout(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. "
"err=%s",
driver->request, driver, driver->shutting_down,
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver
<< " on_timeout_locked. driver->shutting_down=" << driver->shutting_down
<< ". err=" << grpc_core::StatusToString(error);
if (!driver->shutting_down && error.ok()) {
grpc_ares_ev_driver_shutdown_locked(driver);
}
@ -327,20 +331,20 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. "
"driver->shutting_down=%d. "
"err=%s",
driver->request, driver, driver->shutting_down,
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver
<< " on_ares_backup_poll_alarm_locked. driver->shutting_down="
<< driver->shutting_down << ". err=" << grpc_core::StatusToString(error);
if (!driver->shutting_down && error.ok()) {
fd_node* fdn = driver->fds;
while (fdn != nullptr) {
if (!fdn->already_shutdown) {
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; "
"ares_process_fd. fd=%s",
driver->request, driver, fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver
<< " on_ares_backup_poll_alarm_locked; ares_process_fd. fd="
<< fdn->grpc_polled_fd->GetName();
ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
ares_process_fd(driver->channel, as, as);
}
@ -373,8 +377,9 @@ static void on_readable(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->readable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << fdn->ev_driver->request
<< " readable on " << fdn->grpc_polled_fd->GetName();
if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} else {
@ -397,8 +402,9 @@ static void on_writable(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->writable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request << " writable on "
<< fdn->grpc_polled_fd->GetName();
if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
} else {
@ -433,8 +439,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
fdn->grpc_polled_fd =
ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
socks[i], ev_driver->pollset_set);
GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " new fd: " << fdn->grpc_polled_fd->GetName();
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
@ -449,15 +456,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn,
grpc_schedule_on_exec_ctx);
if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) {
GRPC_CARES_TRACE_LOG("request:%p schedule direct read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " schedule direct read on: "
<< fdn->grpc_polled_fd->GetName();
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure,
absl::OkStatus());
} else {
GRPC_CARES_TRACE_LOG("request:%p notify read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " notify read on: " << fdn->grpc_polled_fd->GetName();
fdn->grpc_polled_fd->RegisterForOnReadableLocked(
&fdn->read_closure);
}
@ -467,9 +475,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
// has not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
GRPC_CARES_TRACE_LOG("request:%p notify write on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " notify write on: " << fdn->grpc_polled_fd->GetName();
grpc_ares_ev_driver_ref(ev_driver);
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn,
grpc_schedule_on_exec_ctx);
@ -505,10 +513,11 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver)
ev_driver->query_timeout_ms == 0
? grpc_core::Duration::Infinity()
: grpc_core::Duration::Milliseconds(ev_driver->query_timeout_ms);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in "
"%" PRId64 " ms",
ev_driver->request, ev_driver, timeout.millis());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " ev_driver=" << ev_driver
<< " grpc_ares_ev_driver_start_locked. timeout in " << timeout.millis()
<< " ms";
grpc_ares_ev_driver_ref(ev_driver);
GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver,
grpc_schedule_on_exec_ctx);
@ -547,7 +556,8 @@ grpc_error_handle grpc_ares_ev_driver_create_locked(
}
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
grpc_ares_test_only_inject_config(&(*ev_driver)->channel);
GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request);
GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << request
<< " grpc_ares_ev_driver_create_locked";
if (status != ARES_SUCCESS) {
grpc_error_handle err = GRPC_ERROR_CREATE(absl::StrCat(
"Failed to init ares channel. C-ares error: ", ares_strerror(status)));
@ -645,10 +655,10 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, const char* host, uint16_t port,
bool is_balancer, const char* qtype)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) {
GRPC_CARES_TRACE_LOG(
"request:%p create_hostbyname_request_locked host:%s port:%d "
"is_balancer:%d qtype:%s",
parent_request, host, port, is_balancer, qtype);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << parent_request
<< " create_hostbyname_request_locked host:" << host << " port:" << port
<< " is_balancer:" << is_balancer << " qtype:" << qtype;
grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request();
hr->parent_request = parent_request;
hr->host = gpr_strdup(host);
@ -675,9 +685,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request;
if (status == ARES_SUCCESS) {
GRPC_CARES_TRACE_LOG(
"request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r,
hr->qtype, hr->host);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_hostbyname_done_locked qtype=" << hr->qtype
<< " host=" << hr->host << " ARES_SUCCESS";
std::unique_ptr<EndpointAddressesList>* address_list_ptr =
hr->is_balancer ? r->balancer_addresses_out : r->addresses_out;
if (*address_list_ptr == nullptr) {
@ -701,10 +712,11 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
addr->sin6_port = hr->port;
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_CARES_TRACE_LOG(
"request:%p c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
r, output, ntohs(hr->port), addr->sin6_scope_id);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares resolver gets a AF_INET6 result: \n"
<< " addr: " << output << "\n port: " << ntohs(hr->port)
<< "\n sin6_scope_id: " << addr->sin6_scope_id << "\n";
break;
}
case AF_INET: {
@ -716,10 +728,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
addr->sin_port = hr->port;
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN);
GRPC_CARES_TRACE_LOG(
"request:%p c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
r, output, ntohs(hr->port));
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares resolver gets a AF_INET result: \n addr: " << output
<< "\n port: " << ntohs(hr->port) << "\n";
break;
}
}
@ -729,8 +741,9 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
std::string error_msg = absl::StrFormat(
"C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s",
hr->qtype, hr->host, hr->is_balancer, ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r,
error_msg.c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_hostbyname_done_locked: " << error_msg;
r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg),
r->error);
}
@ -745,13 +758,14 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
GrpcAresQuery* q = static_cast<GrpcAresQuery*>(arg);
grpc_ares_request* r = q->parent_request();
if (status == ARES_SUCCESS) {
GRPC_CARES_TRACE_LOG(
"request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r,
q->name().c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_srv_query_done_locked name=" << q->name() << " ARES_SUCCESS";
struct ares_srv_reply* reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r,
parse_status);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " ares_parse_srv_reply: " << parse_status;
if (parse_status == ARES_SUCCESS) {
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) {
@ -775,8 +789,9 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
std::string error_msg = absl::StrFormat(
"C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(),
ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r,
error_msg.c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_srv_query_done_locked: " << error_msg;
r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg),
r->error);
}
@ -797,8 +812,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
struct ares_txt_ext* result = nullptr;
struct ares_txt_ext* reply = nullptr;
if (status != ARES_SUCCESS) goto fail;
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r,
q->name().c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_txt_done_locked name=" << q->name() << " ARES_SUCCESS";
status = ares_parse_txt_reply_ext(buf, len, &reply);
if (status != ARES_SUCCESS) goto fail;
// Find service config in TXT record.
@ -826,8 +842,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
service_config_len += result->length;
}
(*r->service_config_json_out)[service_config_len] = '\0';
GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r,
*r->service_config_json_out);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " found service config: " << *r->service_config_json_out;
}
// Clean up.
ares_free_data(reply);
@ -837,8 +854,8 @@ fail:
std::string error_msg =
absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s",
q->name(), ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r,
error_msg.c_str());
GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << r
<< " on_txt_done_locked " << error_msg;
r->error =
grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error);
}
@ -847,8 +864,9 @@ grpc_error_handle set_request_dns_server(grpc_ares_request* r,
absl::string_view dns_server)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
if (!dns_server.empty()) {
GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r,
dns_server.data());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r << " Using DNS server "
<< dns_server.data();
grpc_resolved_address addr;
if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
r->dns_server_addr.family = AF_INET;
@ -1043,10 +1061,10 @@ static grpc_ares_request* grpc_dns_lookup_hostname_ares_impl(
r->ev_driver = nullptr;
r->on_done = on_done;
r->addresses_out = addrs;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_hostname_ares_impl name=%s, "
"default_port=%s",
r, name, default_port);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_hostname_ares_impl name=" << name
<< ", default_port=" << default_port;
// Early out if the target is an ipv4 or ipv6 literal.
if (resolve_as_ip_literal_locked(name, default_port, addrs)) {
grpc_ares_complete_request_locked(r);
@ -1097,8 +1115,9 @@ grpc_ares_request* grpc_dns_lookup_srv_ares_impl(
r->ev_driver = nullptr;
r->on_done = on_done;
r->balancer_addresses_out = balancer_addresses;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_srv_ares_impl name=%s", r, name);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_srv_ares_impl name=" << name;
grpc_error_handle error;
// Don't query for SRV records if the target is "localhost"
if (target_matches_localhost(name)) {
@ -1135,8 +1154,9 @@ grpc_ares_request* grpc_dns_lookup_txt_ares_impl(
r->ev_driver = nullptr;
r->on_done = on_done;
r->service_config_json_out = service_config_json;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_txt_ares_impl name=%s", r, name);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_txt_ares_impl name=" << name;
grpc_error_handle error;
// Don't query for TXT records if the target is "localhost"
if (target_matches_localhost(name)) {
@ -1185,8 +1205,9 @@ grpc_ares_request* (*grpc_dns_lookup_txt_ares)(
static void grpc_cancel_ares_request_impl(grpc_ares_request* r) {
CHECK_NE(r, nullptr);
grpc_core::MutexLock lock(&r->mu);
GRPC_CARES_TRACE_LOG("request:%p grpc_cancel_ares_request ev_driver:%p", r,
r->ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " grpc_cancel_ares_request ev_driver:" << r->ev_driver;
if (r->ev_driver != nullptr) {
grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
}

@ -39,13 +39,6 @@
#define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000
#define GRPC_CARES_TRACE_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) { \
VLOG(2) << "(c-ares resolver) " << absl::StrFormat(format, __VA_ARGS__); \
} \
} while (0)
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
struct grpc_ares_request {

@ -38,15 +38,14 @@
#include "src/core/handshaker/handshaker.h"
#include "src/core/handshaker/handshaker_registry.h"
#include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
@ -59,6 +58,9 @@ namespace grpc_core {
namespace {
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::ResolvedAddressToURI;
grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override;
grpc_httpcli_put_override g_put_override;
@ -173,7 +175,10 @@ HttpRequest::HttpRequest(
pollent_(pollent),
pollset_set_(grpc_pollset_set_create()),
test_only_generate_response_(std::move(test_only_generate_response)),
resolver_(GetDNSResolver()) {
resolver_(
ChannelArgs::FromC(channel_args_)
.GetObjectRef<EventEngine>()
->GetDNSResolver(EventEngine::DNSResolver::ResolverOptions())) {
grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
grpc_slice_buffer_init(&incoming_);
grpc_slice_buffer_init(&outgoing_);
@ -207,11 +212,14 @@ void HttpRequest::Start() {
test_only_generate_response_.value()();
return;
}
if (!resolver_.ok()) {
Finish(resolver_.status());
return;
}
Ref().release(); // ref held by pending DNS resolution
dns_request_handle_ = resolver_->LookupHostname(
absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(),
uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_,
/*name_server=*/"");
(*resolver_)
->LookupHostname(absl::bind_front(&HttpRequest::OnResolved, this),
uri_.authority(), uri_.scheme());
}
void HttpRequest::Orphan() {
@ -220,10 +228,8 @@ void HttpRequest::Orphan() {
CHECK(!cancelled_);
cancelled_ = true;
// cancel potentially pending DNS resolution.
if (dns_request_handle_.has_value() &&
resolver_->Cancel(dns_request_handle_.value())) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
Unref();
if (*resolver_ != nullptr) {
resolver_->reset();
}
if (handshake_mgr_ != nullptr) {
// Shutdown will cancel any ongoing tcp connect.
@ -239,8 +245,7 @@ void HttpRequest::AppendError(grpc_error_handle error) {
if (overall_error_.ok()) {
overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
}
const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
auto addr_text = grpc_sockaddr_to_uri(addr);
auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]);
if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error));
overall_error_ = grpc_error_add_child(overall_error_, std::move(error));
}
@ -310,7 +315,7 @@ void HttpRequest::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
StartWrite();
}
void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
// Create the security connector using the credentials and target name.
ChannelArgs args = ChannelArgs::FromC(channel_args_);
RefCountedPtr<grpc_channel_security_connector> sc =
@ -321,7 +326,7 @@ void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
&overall_error_, 1));
return;
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr);
absl::StatusOr<std::string> address = ResolvedAddressToURI(addr);
if (!address.ok()) {
Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
&overall_error_, 1));
@ -354,15 +359,16 @@ void HttpRequest::NextAddress(grpc_error_handle error) {
&overall_error_, 1));
return;
}
const grpc_resolved_address* addr = &addresses_[next_address_++];
DoHandshake(addr);
DoHandshake(addresses_[next_address_++]);
}
void HttpRequest::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or) {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_);
dns_request_handle_.reset();
resolver_->reset();
if (cancelled_) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
return;

@ -32,6 +32,7 @@
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@ -48,8 +49,6 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/core/util/http_client/parser.h"
@ -223,13 +222,16 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result);
void DoHandshake(const grpc_resolved_address* addr)
void DoHandshake(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or);
absl::StatusOr<std::vector<
grpc_event_engine::experimental::EventEngine::ResolvedAddress>>
addresses_or);
const URI uri_;
const grpc_slice request_text_;
@ -250,16 +252,17 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
RefCountedPtr<HandshakeManager> handshake_mgr_ ABSL_GUARDED_BY(mu_);
bool cancelled_ ABSL_GUARDED_BY(mu_) = false;
grpc_http_parser parser_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_resolved_address> addresses_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_event_engine::experimental::EventEngine::ResolvedAddress>
addresses_ ABSL_GUARDED_BY(mu_);
size_t next_address_ ABSL_GUARDED_BY(mu_) = 0;
int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0;
grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_);
grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus();
std::shared_ptr<DNSResolver> resolver_;
absl::optional<DNSResolver::TaskHandle> dns_request_handle_
ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle;
absl::StatusOr<std::unique_ptr<
grpc_event_engine::experimental::EventEngine::DNSResolver>>
resolver_;
};
} // namespace grpc_core

@ -30,7 +30,6 @@ cdef class _CallState:
cdef object call_tracer_capsule
cdef void maybe_save_registered_method(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *
cdef void delete_call(self) except *

@ -76,12 +76,6 @@ cdef class _CallState:
with nogil:
grpc_call_unref(self.c_call)
self.c_call = NULL
self.maybe_delete_call_tracer()
cdef void maybe_delete_call_tracer(self) except *:
if not self.call_tracer_capsule:
return
_observability.delete_call_tracer(self.call_tracer_capsule)
cdef void maybe_save_registered_method(self, bytes method_name) except *:
with _observability.get_plugin() as plugin:

@ -76,7 +76,7 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core":
void RegisterGlobal(ServerCallTracerFactory* factory) nogil
cdef extern from "src/core/lib/surface/call.h":
void grpc_call_tracer_set(grpc_call* call, void* value) nogil
void grpc_call_tracer_set_and_manage(grpc_call* call, void* value) nogil
void* grpc_call_tracer_get(grpc_call* call) nogil

@ -50,7 +50,7 @@ def maybe_save_server_trace_context(RequestCallEvent event) -> None:
cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
grpc_call_tracer_set(call, call_tracer)
grpc_call_tracer_set_and_manage(call, call_tracer)
cdef void* _get_call_tracer(grpc_call* call):

@ -100,23 +100,6 @@ class ObservabilityPlugin(
"""
raise NotImplementedError()
@abc.abstractmethod
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as name.
Args:
client_call_tracer: A PyCapsule which stores a ClientCallTracer object.
"""
raise NotImplementedError()
@abc.abstractmethod
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
@ -276,22 +259,6 @@ def observability_deinit() -> None:
_cygrpc.clear_server_call_tracer_factory()
def delete_call_tracer(client_call_tracer_capsule: Any) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
This method will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as the name.
Args:
client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object.
"""
with get_plugin() as plugin:
if plugin and plugin.observability_enabled:
plugin.delete_client_call_tracer(client_call_tracer_capsule)
def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
"""Record the latency of the RPC, if the plugin is registered and stats is enabled.

@ -155,15 +155,6 @@ def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifi
return capsule
def delete_client_call_tracer(object client_call_tracer) -> None:
client_call_tracer: grpc._observability.ClientCallTracerCapsule
if cpython.PyCapsule_IsValid(client_call_tracer, CLIENT_CALL_TRACER):
capsule_ptr = cpython.PyCapsule_GetPointer(client_call_tracer, CLIENT_CALL_TRACER)
call_tracer_ptr = <ClientCallTracer*>capsule_ptr
del call_tracer_ptr
def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]:
py_labels = {}
for label in c_labels:

@ -438,11 +438,6 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
)
return capsule
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
) -> None:

@ -127,9 +127,8 @@ void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
void AddCensusDataToBuffer(const CensusData& data) {
std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
gpr_log(GPR_DEBUG,
"Reached maximum census data buffer size, discarding this "
"CensusData entry");
VLOG(2) << "Reached maximum census data buffer size, discarding this "
"CensusData entry";
} else {
g_census_data_buffer->push(data);
}

Loading…
Cancel
Save