From 888d9666fec396d30760751798947b25cf0adc30 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 14 Aug 2024 12:16:49 -0700 Subject: [PATCH 01/12] [experiments] extend expiration of pick_first_new (#37480) Closes #37480 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37480 from markdroth:pf_experiment_expiry b9a663fc91e529d0d62879c7bc696bbf2ad9e4d4 PiperOrigin-RevId: 663011420 --- src/core/lib/experiments/experiments.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index a244a416e98..13c466a7a34 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -105,7 +105,7 @@ test_tags: [flow_control_test] - name: pick_first_new description: New pick_first impl with memory reduction. - expiry: 2024/07/30 + expiry: 2024/10/30 owner: roth@google.com test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"] - name: promise_based_inproc_transport From 2eca2927f31e4f46ca5a793858c994974f88fa6b Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 14 Aug 2024 16:41:12 -0700 Subject: [PATCH 02/12] [secure endpoint] fix race condition from #37358 (#37482) Closes #37482 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37482 from markdroth:secure_endpoint_fix 57bf3d97a722a11c42eab766e5315a8bf4dd14f9 PiperOrigin-RevId: 663097744 --- src/core/handshaker/security/secure_endpoint.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/core/handshaker/security/secure_endpoint.cc b/src/core/handshaker/security/secure_endpoint.cc index 490110b78b5..cf720f19d21 100644 --- a/src/core/handshaker/security/secure_endpoint.cc +++ b/src/core/handshaker/security/secure_endpoint.cc @@ -252,6 +252,13 @@ static void on_read(void* user_data, grpc_error_handle error) { { grpc_core::MutexLock l(&ep->read_mu); + + // If we were shut down after this callback was scheduled with OK + // status but before it was invoked, we need to treat that as an error. + if (ep->wrapped_ep == nullptr && error.ok()) { + error = absl::CancelledError("secure endpoint shutdown"); + } + uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer); uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); @@ -505,8 +512,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, static void endpoint_destroy(grpc_endpoint* secure_ep) { secure_endpoint* ep = reinterpret_cast(secure_ep); + ep->read_mu.Lock(); ep->wrapped_ep.reset(); ep->memory_owner.Reset(); + ep->read_mu.Unlock(); SECURE_ENDPOINT_UNREF(ep, "destroy"); } From 65628a4b64eacf4146d344d0abb3d3298d88a964 Mon Sep 17 00:00:00 2001 From: Tanvi Jagtap <139093547+tanvi-jagtap@users.noreply.github.com> Date: Wed, 14 Aug 2024 20:37:57 -0700 Subject: [PATCH 03/12] [Gpr_To_Absl_Logging] Replace GRPC_CARES_TRACE_LOG with GRPC_TRACE_LOG (#37463) [Gpr_To_Absl_Logging] Replace GRPC_CARES_TRACE_LOG with GRPC_TRACE_LOG Closes #37463 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37463 from tanvi-jagtap:remove_GRPC_CARES_TRACE_LOG 471e1e9ddf87d67fcd6c18c7fc02d0497139750a PiperOrigin-RevId: 663156488 --- .../resolver/dns/c_ares/dns_resolver_ares.cc | 105 +++++---- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 196 +++++++++-------- .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 199 ++++++++++-------- .../resolver/dns/c_ares/grpc_ares_wrapper.h | 7 - 4 files changed, 281 insertions(+), 226 deletions(-) diff --git a/src/core/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/resolver/dns/c_ares/dns_resolver_ares.cc index 87f4d7f3f24..67993abc5b7 100644 --- a/src/core/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/resolver/dns/c_ares/dns_resolver_ares.cc @@ -106,9 +106,10 @@ class AresClientChannelDNSResolver final : public PollingResolver { resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(), kDefaultSecurePort, resolver_->interested_parties(), &on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving hostnames. hostname_request_:%p", - resolver_.get(), hostname_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << resolver_.get() + << " Started resolving hostnames. hostname_request_:" + << hostname_request_.get(); if (resolver_->enable_srv_queries_) { Ref(DEBUG_LOCATION, "OnSRVResolved").release(); GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr); @@ -117,9 +118,10 @@ class AresClientChannelDNSResolver final : public PollingResolver { resolver_->name_to_resolve().c_str(), resolver_->interested_parties(), &on_srv_resolved_, &balancer_addresses_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving SRV records. srv_request_:%p", - resolver_.get(), srv_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << resolver_.get() + << " Started resolving SRV records. srv_request_:" + << srv_request_.get(); } if (resolver_->request_service_config_) { Ref(DEBUG_LOCATION, "OnTXTResolved").release(); @@ -129,9 +131,10 @@ class AresClientChannelDNSResolver final : public PollingResolver { resolver_->name_to_resolve().c_str(), resolver_->interested_parties(), &on_txt_resolved_, &service_config_json_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving TXT records. txt_request_:%p", - resolver_.get(), txt_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << resolver_.get() + << " Started resolving TXT records. txt_request_:" + << txt_request_.get(); } } @@ -219,8 +222,9 @@ AresClientChannelDNSResolver::AresClientChannelDNSResolver( .value_or(GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS))) {} AresClientChannelDNSResolver::~AresClientChannelDNSResolver() { - GRPC_CARES_TRACE_LOG("resolver:%p destroying AresClientChannelDNSResolver", - this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " destroying AresClientChannelDNSResolver"; } OrphanablePtr AresClientChannelDNSResolver::StartRequest() { @@ -283,15 +287,16 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) { if (hostname_request_ != nullptr || srv_request_ != nullptr || txt_request_ != nullptr) { - GRPC_CARES_TRACE_LOG( - "resolver:%p OnResolved() waiting for results (hostname: %s, srv: %s, " - "txt: %s)", - this, hostname_request_ != nullptr ? "waiting" : "done", - srv_request_ != nullptr ? "waiting" : "done", - txt_request_ != nullptr ? "waiting" : "done"); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " OnResolved() waiting for results (hostname: " + << (hostname_request_ != nullptr ? "waiting" : "done") + << ", srv: " << (srv_request_ != nullptr ? "waiting" : "done") + << ", txt: " << (txt_request_ != nullptr ? "waiting" : "done") << ")"; return absl::nullopt; } - GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this << " OnResolved() proceeding"; Result result; result.args = resolver_->channel_args(); // TODO(roth): Change logic to be able to report failures for addresses @@ -309,8 +314,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( absl::StrCat("failed to parse service config: ", StatusToString(service_config_string.status()))); } else if (!service_config_string->empty()) { - GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", - this, service_config_string->c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " selected service config choice: " << *service_config_string; result.service_config = ServiceConfigImpl::Create( resolver_->channel_args(), *service_config_string); if (!result.service_config.ok()) { @@ -325,8 +331,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_); } } else { - GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this, - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " dns resolution failed: " << StatusToString(error); std::string error_message; grpc_error_get_str(error, StatusStrProperty::kDescription, &error_message); absl::Status status = absl::UnavailableError( @@ -375,8 +382,9 @@ class AresDNSResolver final : public DNSResolver { class AresRequest { public: virtual ~AresRequest() { - GRPC_CARES_TRACE_LOG("AresRequest:%p dtor ares_request_:%p", this, - grpc_ares_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresRequest:" << this + << " dtor ares_request_:" << grpc_ares_request_.get(); resolver_->UnregisterRequest(task_handle()); grpc_pollset_set_destroy(pollset_set_); } @@ -397,8 +405,9 @@ class AresDNSResolver final : public DNSResolver { bool Cancel() { MutexLock lock(&mu_); if (grpc_ares_request_ != nullptr) { - GRPC_CARES_TRACE_LOG("AresRequest:%p Cancel ares_request_:%p", this, - grpc_ares_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresRequest:" << this + << " Cancel ares_request_:" << grpc_ares_request_.get(); if (completed_) return false; // OnDnsLookupDone will still be run completed_ = true; @@ -499,7 +508,8 @@ class AresDNSResolver final : public DNSResolver { aba_token), default_port_(default_port), on_resolve_address_done_(std::move(on_resolve_address_done)) { - GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p ctor", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresHostnameRequest:" << this << " ctor"; } std::unique_ptr MakeRequestLocked() override { @@ -508,13 +518,15 @@ class AresDNSResolver final : public DNSResolver { name_server().c_str(), name().c_str(), default_port_.c_str(), pollset_set(), on_dns_lookup_done(), &addresses_, timeout().millis())); - GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p Start ares_request_:%p", - this, ares_request.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresHostnameRequest:" << this + << " Start ares_request_:" << ares_request.get(); return ares_request; } void OnComplete(grpc_error_handle error) override { - GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p OnComplete", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresHostnameRequest:" << this << " OnComplete"; if (!error.ok()) { on_resolve_address_done_(grpc_error_to_absl_status(error)); return; @@ -550,7 +562,8 @@ class AresDNSResolver final : public DNSResolver { : AresRequest(name, name_server, timeout, interested_parties, resolver, aba_token), on_resolve_address_done_(std::move(on_resolve_address_done)) { - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p ctor", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this << " ctor"; } std::unique_ptr MakeRequestLocked() override { @@ -558,13 +571,15 @@ class AresDNSResolver final : public DNSResolver { std::unique_ptr(grpc_dns_lookup_srv_ares( name_server().c_str(), name().c_str(), pollset_set(), on_dns_lookup_done(), &balancer_addresses_, timeout().millis())); - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this, - ares_request.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this + << " Start ares_request_:" << ares_request.get(); return ares_request; } void OnComplete(grpc_error_handle error) override { - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this << " OnComplete"; if (!error.ok()) { on_resolve_address_done_(grpc_error_to_absl_status(error)); return; @@ -596,7 +611,8 @@ class AresDNSResolver final : public DNSResolver { : AresRequest(name, name_server, timeout, interested_parties, resolver, aba_token), on_resolved_(std::move(on_resolved)) { - GRPC_CARES_TRACE_LOG("AresTXTRequest:%p ctor", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresTXTRequest:" << this << " ctor"; } ~AresTXTRequest() override { gpr_free(service_config_json_); } @@ -606,13 +622,15 @@ class AresDNSResolver final : public DNSResolver { std::unique_ptr(grpc_dns_lookup_txt_ares( name_server().c_str(), name().c_str(), pollset_set(), on_dns_lookup_done(), &service_config_json_, timeout().millis())); - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this, - ares_request.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this + << " Start ares_request_:" << ares_request.get(); return ares_request; } void OnComplete(grpc_error_handle error) override { - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this << " OnComplete"; if (!error.ok()) { on_resolved_(grpc_error_to_absl_status(error)); return; @@ -684,14 +702,15 @@ class AresDNSResolver final : public DNSResolver { MutexLock lock(&mu_); if (!open_requests_.contains(handle)) { // Unknown request, possibly completed already, or an invalid handle. - GRPC_CARES_TRACE_LOG( - "AresDNSResolver:%p attempt to cancel unknown TaskHandle:%s", this, - HandleToString(handle).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresDNSResolver:" << this + << " attempt to cancel unknown TaskHandle:" << HandleToString(handle); return false; } auto* request = reinterpret_cast(handle.keys[0]); - GRPC_CARES_TRACE_LOG("AresDNSResolver:%p cancel ares_request:%p", this, - request); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresDNSResolver:" << this + << " cancel ares_request:" << request; return request->Cancel(); } diff --git a/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index ef70bfc5b67..142958f104f 100644 --- a/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -133,8 +133,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } ~GrpcPolledFdWindows() override { - GRPC_CARES_TRACE_LOG("fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ", - GetName(), shutdown_called_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| ~GrpcPolledFdWindows shutdown_called_: " << shutdown_called_; CSliceUnref(read_buf_); CSliceUnref(write_buf_); CHECK_EQ(read_closure_, nullptr); @@ -173,10 +174,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void ContinueRegisterForOnReadableLocked() { - GRPC_CARES_TRACE_LOG( - "fd:|%s| ContinueRegisterForOnReadableLocked " - "wsa_connect_error_:%d", - GetName(), wsa_connect_error_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| ContinueRegisterForOnReadableLocked " + << "wsa_connect_error_:" << wsa_connect_error_; CHECK(connect_done_); if (wsa_connect_error_ != 0) { ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect")); @@ -194,10 +195,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { &winsocket_->read_info.overlapped, nullptr)) { int wsa_last_error = WSAGetLastError(); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG( - "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| " - "msg:|%s|", - GetName(), wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| RegisterForOnReadableLocked WSARecvFrom error code:|" + << wsa_last_error << "| msg:|" << msg << "|"; gpr_free(msg); if (wsa_last_error != WSA_IO_PENDING) { ScheduleAndNullReadClosure( @@ -210,14 +211,15 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { if (socket_type_ == SOCK_DGRAM) { - GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called", - GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| RegisterForOnWriteableLocked called"; } else { CHECK(socket_type_ == SOCK_STREAM); - GRPC_CARES_TRACE_LOG( - "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d " - "connect_done_: %d", - GetName(), tcp_write_state_, connect_done_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| RegisterForOnWriteableLocked called tcp_write_state_: " + << tcp_write_state_ << " connect_done_: " << connect_done_; } CHECK_EQ(write_closure_, nullptr); write_closure_ = write_closure; @@ -234,10 +236,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void ContinueRegisterForOnWriteableLocked() { - GRPC_CARES_TRACE_LOG( - "fd:|%s| ContinueRegisterForOnWriteableLocked " - "wsa_connect_error_:%d", - GetName(), wsa_connect_error_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| ContinueRegisterForOnWriteableLocked " + << "wsa_connect_error_:" << wsa_connect_error_; CHECK(connect_done_); if (wsa_connect_error_ != 0) { ScheduleAndNullWriteClosure( @@ -288,10 +290,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, ares_socket_t data_len, int /* flags */, struct sockaddr* from, ares_socklen_t* from_len) { - GRPC_CARES_TRACE_LOG( - "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf " - "length:|%d|", - GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_)); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " RecvFrom called read_buf_has_data:" << read_buf_has_data_ + << " Current read buf length:" << GRPC_SLICE_LENGTH(read_buf_); if (!read_buf_has_data_) { wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); return -1; @@ -340,20 +342,21 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1, bytes_sent_ptr, flags, overlapped, nullptr); *wsa_error_code = WSAGetLastError(); - GRPC_CARES_TRACE_LOG( - "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d " - "overlapped:%p " - "return:%d *wsa_error_code:%d", - GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0, - overlapped, out, *wsa_error_code); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendWriteBuf WSASend buf.len:" << buf.len << " *bytes_sent_ptr:" + << (bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0) + << " overlapped:" << overlapped << " return:" << out + << " *wsa_error_code:" << *wsa_error_code; return out; } ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov, int iov_count) { - GRPC_CARES_TRACE_LOG( - "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d", - GetName(), connect_done_, wsa_connect_error_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendV called connect_done_:" << connect_done_ + << " wsa_connect_error_:" << wsa_connect_error_; if (!connect_done_) { wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); return -1; @@ -377,7 +380,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { // c-ares doesn't handle retryable errors on writes of UDP sockets. // Therefore, the sendv handler for UDP sockets must only attempt // to write everything inline. - GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " SendVUDP called"; CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0); CSliceUnref(write_buf_); write_buf_ = FlattenIovec(iov, iov_count); @@ -388,9 +392,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { write_buf_ = grpc_empty_slice(); wsa_error_ctx->SetWSAError(wsa_error_code); char* msg = gpr_format_message(wsa_error_code); - GRPC_CARES_TRACE_LOG( - "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(), - wsa_error_code, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendVUDP SendWriteBuf error code:" << wsa_error_code + << " msg:" << msg; gpr_free(msg); return -1; } @@ -406,8 +411,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { // out in the background, and making further send progress in general, will // happen as long as c-ares continues to show interest in writeability on // this fd. - GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d", - GetName(), tcp_write_state_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendVTCP called tcp_write_state_:" << tcp_write_state_; switch (tcp_write_state_) { case WRITE_IDLE: tcp_write_state_ = WRITE_REQUESTED; @@ -450,13 +456,13 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void OnTcpConnectLocked(grpc_error_handle error) { - GRPC_CARES_TRACE_LOG( - "fd:%s InnerOnTcpConnectLocked error:|%s| " - "pending_register_for_readable:%d" - " pending_register_for_writeable:%d", - GetName(), StatusToString(error).c_str(), - pending_continue_register_for_on_readable_locked_, - pending_continue_register_for_on_writeable_locked_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " InnerOnTcpConnectLocked error:" << StatusToString(error) + << " pending_register_for_readable:" + << pending_continue_register_for_on_readable_locked_ + << " pending_register_for_writeable:" + << pending_continue_register_for_on_writeable_locked_; CHECK(!connect_done_); connect_done_ = true; CHECK_EQ(wsa_connect_error_, 0); @@ -473,10 +479,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { if (!wsa_success) { wsa_connect_error_ = WSAGetLastError(); char* msg = gpr_format_message(wsa_connect_error_); - GRPC_CARES_TRACE_LOG( - "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d " - "msg:|%s|", - GetName(), wsa_connect_error_, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " InnerOnTcpConnectLocked WSA overlapped result code:" + << wsa_connect_error_ << " msg:" << msg; gpr_free(msg); } } @@ -502,7 +508,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, ares_socklen_t target_len) { - GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " ConnectUDP"; CHECK(!connect_done_); CHECK_EQ(wsa_connect_error_, 0); SOCKET s = grpc_winsocket_wrapped_socket(winsocket_); @@ -512,8 +519,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { wsa_error_ctx->SetWSAError(wsa_connect_error_); connect_done_ = true; char* msg = gpr_format_message(wsa_connect_error_); - GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(), - wsa_connect_error_, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " WSAConnect error code:|" + << wsa_connect_error_ << "| msg:|" << msg << "|"; gpr_free(msg); // c-ares expects a posix-style connect API return out == 0 ? 0 : -1; @@ -521,7 +529,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, ares_socklen_t target_len) { - GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " ConnectTCP"; LPFN_CONNECTEX ConnectEx; GUID guid = WSAID_CONNECTEX; DWORD ioctl_num_bytes; @@ -532,10 +541,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int wsa_last_error = WSAGetLastError(); wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG( - "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d " - "msg:|%s|", - GetName(), wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:" + << wsa_last_error << " msg:|" << msg << "|"; gpr_free(msg); connect_done_ = true; wsa_connect_error_ = wsa_last_error; @@ -555,8 +564,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int wsa_last_error = WSAGetLastError(); wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(), - wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " bind error code:" << wsa_last_error << " msg:|" << msg << "|"; gpr_free(msg); connect_done_ = true; wsa_connect_error_ = wsa_last_error; @@ -569,8 +579,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int wsa_last_error = WSAGetLastError(); wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(), - wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " ConnectEx error code:" << wsa_last_error << " msg:|" << msg + << "|"; gpr_free(msg); if (wsa_last_error == WSA_IO_PENDING) { // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on @@ -610,11 +622,12 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) { error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error, "OnIocpReadableInner"); - GRPC_CARES_TRACE_LOG( - "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error " - "code:|%d| msg:|%s|", - GetName(), winsocket_->read_info.wsa_error, - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpReadableInner winsocket_->read_info.wsa_error " + "code:|" + << winsocket_->read_info.wsa_error << "| msg:|" + << StatusToString(error) << "|"; } } } @@ -626,9 +639,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { CSliceUnref(read_buf_); read_buf_ = grpc_empty_slice(); } - GRPC_CARES_TRACE_LOG( - "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(), - GRPC_SLICE_LENGTH(read_buf_)); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpReadable finishing. read buf length now:|" + << GRPC_SLICE_LENGTH(read_buf_) << "|"; ScheduleAndNullReadClosure(error); } @@ -639,17 +653,19 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void OnIocpWriteableLocked(grpc_error_handle error) { - GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) OnIocpWriteableInner. fd:|" << GetName() << "|"; CHECK(socket_type_ == SOCK_STREAM); if (error.ok()) { if (winsocket_->write_info.wsa_error != 0) { error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error, "OnIocpWriteableInner"); - GRPC_CARES_TRACE_LOG( - "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error " - "code:|%d| msg:|%s|", - GetName(), winsocket_->write_info.wsa_error, - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpWriteableInner. winsocket_->write_info.wsa_error " + "code:|" + << winsocket_->write_info.wsa_error << "| msg:|" + << StatusToString(error) << "|"; } } CHECK(tcp_write_state_ == WRITE_PENDING); @@ -657,8 +673,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY; write_buf_ = grpc_slice_sub_no_ref( write_buf_, 0, winsocket_->write_info.bytes_transferred); - GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d", - GetName(), winsocket_->write_info.bytes_transferred); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpWriteableInner. bytes transferred:" + << winsocket_->write_info.bytes_transferred; } else { CSliceUnref(write_buf_); write_buf_ = grpc_empty_slice(); @@ -728,7 +746,9 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory { // static ares_socket_t Socket(int af, int type, int protocol, void* user_data) { if (type != SOCK_DGRAM && type != SOCK_STREAM) { - GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) Socket called with invalid socket type:" + << type; return INVALID_SOCKET; } GrpcPolledFdFactoryWindows* self = @@ -736,15 +756,16 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory { SOCKET s = WSASocket(af, type, protocol, nullptr, 0, grpc_get_default_wsa_socket_flags()); if (s == INVALID_SOCKET) { - GRPC_CARES_TRACE_LOG( - "WSASocket failed with params af:%d type:%d protocol:%d", af, type, - protocol); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) WSASocket failed with params af:" << af + << " type:" << type << " protocol:" << protocol; return s; } grpc_error_handle error = grpc_tcp_set_non_block(s); if (!error.ok()) { - GRPC_CARES_TRACE_LOG("WSAIoctl failed with error: %s", - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) WSAIoctl failed with error: " + << StatusToString(error); return INVALID_SOCKET; } auto on_shutdown_locked = [self, s]() { @@ -755,9 +776,10 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory { }; auto polled_fd = new GrpcPolledFdWindows(s, self->mu_, af, type, std::move(on_shutdown_locked)); - GRPC_CARES_TRACE_LOG( - "fd:|%s| created with params af:%d type:%d protocol:%d", - polled_fd->GetName(), af, type, protocol); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << polled_fd->GetName() + << " created with params af:" << af << " type:" << type + << " protocol:" << protocol; CHECK(self->sockets_.insert({s, polled_fd}).second); return s; } diff --git a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc index 0dcd1667d1e..50161653ea4 100644 --- a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -200,8 +200,9 @@ static absl::Status AresStatusToAbslStatus(int status, static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( grpc_ares_ev_driver* ev_driver) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { - GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, - ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request << " Ref ev_driver " + << ev_driver; gpr_ref(&ev_driver->refs); return ev_driver; } @@ -211,11 +212,13 @@ static void grpc_ares_complete_request_locked(grpc_ares_request* r) static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { - GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, - ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " Unref ev_driver " << ev_driver; if (gpr_unref(&ev_driver->refs)) { - GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, - ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " destroy ev_driver " << ev_driver; CHECK_EQ(ev_driver->fds, nullptr); ares_destroy(ev_driver->channel); grpc_ares_complete_request_locked(ev_driver->request); @@ -225,8 +228,9 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) static void fd_node_destroy_locked(fd_node* fdn) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { - GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << fdn->ev_driver->request + << " delete fd: " << fdn->grpc_polled_fd->GetName(); CHECK(!fdn->readable_registered); CHECK(!fdn->writable_registered); CHECK(fdn->already_shutdown); @@ -292,21 +296,21 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm( // by the c-ares code comments. grpc_core::Duration until_next_ares_backup_poll_alarm = grpc_core::Duration::Seconds(1); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p. next ares process poll time in " - "%" PRId64 " ms", - driver->request, driver, until_next_ares_backup_poll_alarm.millis()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver << ". next ares process poll time in " + << until_next_ares_backup_poll_alarm.millis() << " ms"; return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm; } static void on_timeout(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* driver = static_cast(arg); grpc_core::MutexLock lock(&driver->request->mu); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " - "err=%s", - driver->request, driver, driver->shutting_down, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver + << " on_timeout_locked. driver->shutting_down=" << driver->shutting_down + << ". err=" << grpc_core::StatusToString(error); if (!driver->shutting_down && error.ok()) { grpc_ares_ev_driver_shutdown_locked(driver); } @@ -327,20 +331,20 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* driver = static_cast(arg); grpc_core::MutexLock lock(&driver->request->mu); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " - "driver->shutting_down=%d. " - "err=%s", - driver->request, driver, driver->shutting_down, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver + << " on_ares_backup_poll_alarm_locked. driver->shutting_down=" + << driver->shutting_down << ". err=" << grpc_core::StatusToString(error); if (!driver->shutting_down && error.ok()) { fd_node* fdn = driver->fds; while (fdn != nullptr) { if (!fdn->already_shutdown) { - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; " - "ares_process_fd. fd=%s", - driver->request, driver, fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver + << " on_ares_backup_poll_alarm_locked; ares_process_fd. fd=" + << fdn->grpc_polled_fd->GetName(); ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); ares_process_fd(driver->channel, as, as); } @@ -373,8 +377,9 @@ static void on_readable(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->readable_registered = false; - GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << fdn->ev_driver->request + << " readable on " << fdn->grpc_polled_fd->GetName(); if (error.ok() && !ev_driver->shutting_down) { ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); } else { @@ -397,8 +402,9 @@ static void on_writable(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->writable_registered = false; - GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request << " writable on " + << fdn->grpc_polled_fd->GetName(); if (error.ok() && !ev_driver->shutting_down) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); } else { @@ -433,8 +439,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( socks[i], ev_driver->pollset_set); - GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " new fd: " << fdn->grpc_polled_fd->GetName(); fdn->readable_registered = false; fdn->writable_registered = false; fdn->already_shutdown = false; @@ -449,15 +456,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn, grpc_schedule_on_exec_ctx); if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) { - GRPC_CARES_TRACE_LOG("request:%p schedule direct read on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " schedule direct read on: " + << fdn->grpc_polled_fd->GetName(); grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure, absl::OkStatus()); } else { - GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " notify read on: " << fdn->grpc_polled_fd->GetName(); fdn->grpc_polled_fd->RegisterForOnReadableLocked( &fdn->read_closure); } @@ -467,9 +475,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) // has not been registered with this socket. if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { - GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " notify write on: " << fdn->grpc_polled_fd->GetName(); grpc_ares_ev_driver_ref(ev_driver); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, grpc_schedule_on_exec_ctx); @@ -505,10 +513,11 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) ev_driver->query_timeout_ms == 0 ? grpc_core::Duration::Infinity() : grpc_core::Duration::Milliseconds(ev_driver->query_timeout_ms); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " - "%" PRId64 " ms", - ev_driver->request, ev_driver, timeout.millis()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " ev_driver=" << ev_driver + << " grpc_ares_ev_driver_start_locked. timeout in " << timeout.millis() + << " ms"; grpc_ares_ev_driver_ref(ev_driver); GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver, grpc_schedule_on_exec_ctx); @@ -547,7 +556,8 @@ grpc_error_handle grpc_ares_ev_driver_create_locked( } int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); grpc_ares_test_only_inject_config(&(*ev_driver)->channel); - GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); + GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << request + << " grpc_ares_ev_driver_create_locked"; if (status != ARES_SUCCESS) { grpc_error_handle err = GRPC_ERROR_CREATE(absl::StrCat( "Failed to init ares channel. C-ares error: ", ares_strerror(status))); @@ -645,10 +655,10 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, const char* host, uint16_t port, bool is_balancer, const char* qtype) ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) { - GRPC_CARES_TRACE_LOG( - "request:%p create_hostbyname_request_locked host:%s port:%d " - "is_balancer:%d qtype:%s", - parent_request, host, port, is_balancer, qtype); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << parent_request + << " create_hostbyname_request_locked host:" << host << " port:" << port + << " is_balancer:" << is_balancer << " qtype:" << qtype; grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request(); hr->parent_request = parent_request; hr->host = gpr_strdup(host); @@ -675,9 +685,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, static_cast(arg); grpc_ares_request* r = hr->parent_request; if (status == ARES_SUCCESS) { - GRPC_CARES_TRACE_LOG( - "request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r, - hr->qtype, hr->host); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_hostbyname_done_locked qtype=" << hr->qtype + << " host=" << hr->host << " ARES_SUCCESS"; std::unique_ptr* address_list_ptr = hr->is_balancer ? r->balancer_addresses_out : r->addresses_out; if (*address_list_ptr == nullptr) { @@ -701,10 +712,11 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, addr->sin6_port = hr->port; char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN); - GRPC_CARES_TRACE_LOG( - "request:%p c-ares resolver gets a AF_INET6 result: \n" - " addr: %s\n port: %d\n sin6_scope_id: %d\n", - r, output, ntohs(hr->port), addr->sin6_scope_id); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares resolver gets a AF_INET6 result: \n" + << " addr: " << output << "\n port: " << ntohs(hr->port) + << "\n sin6_scope_id: " << addr->sin6_scope_id << "\n"; break; } case AF_INET: { @@ -716,10 +728,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, addr->sin_port = hr->port; char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN); - GRPC_CARES_TRACE_LOG( - "request:%p c-ares resolver gets a AF_INET result: \n" - " addr: %s\n port: %d\n", - r, output, ntohs(hr->port)); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares resolver gets a AF_INET result: \n addr: " << output + << "\n port: " << ntohs(hr->port) << "\n"; break; } } @@ -729,8 +741,9 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, std::string error_msg = absl::StrFormat( "C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s", hr->qtype, hr->host, hr->is_balancer, ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r, - error_msg.c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_hostbyname_done_locked: " << error_msg; r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); } @@ -745,13 +758,14 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, GrpcAresQuery* q = static_cast(arg); grpc_ares_request* r = q->parent_request(); if (status == ARES_SUCCESS) { - GRPC_CARES_TRACE_LOG( - "request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r, - q->name().c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_srv_query_done_locked name=" << q->name() << " ARES_SUCCESS"; struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); - GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r, - parse_status); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " ares_parse_srv_reply: " << parse_status; if (parse_status == ARES_SUCCESS) { for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { @@ -775,8 +789,9 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, std::string error_msg = absl::StrFormat( "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(), ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r, - error_msg.c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_srv_query_done_locked: " << error_msg; r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); } @@ -797,8 +812,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; if (status != ARES_SUCCESS) goto fail; - GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r, - q->name().c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_txt_done_locked name=" << q->name() << " ARES_SUCCESS"; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; // Find service config in TXT record. @@ -826,8 +842,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, service_config_len += result->length; } (*r->service_config_json_out)[service_config_len] = '\0'; - GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r, - *r->service_config_json_out); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " found service config: " << *r->service_config_json_out; } // Clean up. ares_free_data(reply); @@ -837,8 +854,8 @@ fail: std::string error_msg = absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", q->name(), ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r, - error_msg.c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << r + << " on_txt_done_locked " << error_msg; r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); } @@ -847,8 +864,9 @@ grpc_error_handle set_request_dns_server(grpc_ares_request* r, absl::string_view dns_server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { if (!dns_server.empty()) { - GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r, - dns_server.data()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r << " Using DNS server " + << dns_server.data(); grpc_resolved_address addr; if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) { r->dns_server_addr.family = AF_INET; @@ -1043,10 +1061,10 @@ static grpc_ares_request* grpc_dns_lookup_hostname_ares_impl( r->ev_driver = nullptr; r->on_done = on_done; r->addresses_out = addrs; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_hostname_ares_impl name=%s, " - "default_port=%s", - r, name, default_port); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares grpc_dns_lookup_hostname_ares_impl name=" << name + << ", default_port=" << default_port; // Early out if the target is an ipv4 or ipv6 literal. if (resolve_as_ip_literal_locked(name, default_port, addrs)) { grpc_ares_complete_request_locked(r); @@ -1097,8 +1115,9 @@ grpc_ares_request* grpc_dns_lookup_srv_ares_impl( r->ev_driver = nullptr; r->on_done = on_done; r->balancer_addresses_out = balancer_addresses; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_srv_ares_impl name=%s", r, name); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares grpc_dns_lookup_srv_ares_impl name=" << name; grpc_error_handle error; // Don't query for SRV records if the target is "localhost" if (target_matches_localhost(name)) { @@ -1135,8 +1154,9 @@ grpc_ares_request* grpc_dns_lookup_txt_ares_impl( r->ev_driver = nullptr; r->on_done = on_done; r->service_config_json_out = service_config_json; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_txt_ares_impl name=%s", r, name); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares grpc_dns_lookup_txt_ares_impl name=" << name; grpc_error_handle error; // Don't query for TXT records if the target is "localhost" if (target_matches_localhost(name)) { @@ -1185,8 +1205,9 @@ grpc_ares_request* (*grpc_dns_lookup_txt_ares)( static void grpc_cancel_ares_request_impl(grpc_ares_request* r) { CHECK_NE(r, nullptr); grpc_core::MutexLock lock(&r->mu); - GRPC_CARES_TRACE_LOG("request:%p grpc_cancel_ares_request ev_driver:%p", r, - r->ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " grpc_cancel_ares_request ev_driver:" << r->ev_driver; if (r->ev_driver != nullptr) { grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } diff --git a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h index 9fdb94c0da8..bd4fbe2720f 100644 --- a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -39,13 +39,6 @@ #define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000 -#define GRPC_CARES_TRACE_LOG(format, ...) \ - do { \ - if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) { \ - VLOG(2) << "(c-ares resolver) " << absl::StrFormat(format, __VA_ARGS__); \ - } \ - } while (0) - typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; struct grpc_ares_request { From 246e7d6436c9d983c6d2880de2602378088becf1 Mon Sep 17 00:00:00 2001 From: Tanvi Jagtap Date: Wed, 14 Aug 2024 22:31:11 -0700 Subject: [PATCH 04/12] Remove the comment in gpr_log_verbosity_init PiperOrigin-RevId: 663179481 --- src/core/util/log.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/core/util/log.cc b/src/core/util/log.cc index c34c1503d3c..8e104f6bd65 100644 --- a/src/core/util/log.cc +++ b/src/core/util/log.cc @@ -85,10 +85,6 @@ void gpr_log_message(const char* file, int line, gpr_log_severity severity, } void gpr_log_verbosity_init(void) { -// This is enabled in Github only. -// This ifndef is converted to ifdef internally by copybara. -// Internally grpc verbosity is managed using absl settings. -// So internally we avoid setting it like this. #ifndef GRPC_VERBOSITY_MACRO // SetMinLogLevel sets the value for the entire binary, not just gRPC. // This setting will change things for other libraries/code that is unrelated From d427a5c3b4465bc908dec5159efa2d791937ae2d Mon Sep 17 00:00:00 2001 From: Tanvi Jagtap <139093547+tanvi-jagtap@users.noreply.github.com> Date: Wed, 14 Aug 2024 23:41:57 -0700 Subject: [PATCH 05/12] [Gpr_To_Absl_Logging] Removing absl_vlog2_enabled (#37476) [Gpr_To_Absl_Logging] Removing absl_vlog2_enabled . @apolcyn : Please review the Ruby code. @yashykt : Please review the C++ code. And the python sanity test. Closes #37476 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37476 from tanvi-jagtap:remove_should_fn c05fd6445e60f02d9b1863329d4d5f7d57190456 PiperOrigin-RevId: 663193927 --- grpc.def | 1 - include/grpc/support/log.h | 3 -- src/core/util/log.cc | 2 - src/ruby/ext/grpc/rb_call_credentials.c | 46 +++++++++---------- src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 - src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 -- tools/run_tests/sanity/banned_functions.py | 5 -- 7 files changed, 23 insertions(+), 39 deletions(-) diff --git a/grpc.def b/grpc.def index 4820cf14213..59bc8f2aa78 100644 --- a/grpc.def +++ b/grpc.def @@ -231,7 +231,6 @@ EXPORTS gpr_cpu_num_cores gpr_cpu_current_cpu gpr_log - absl_vlog2_enabled gpr_log_verbosity_init gpr_format_message gpr_strdup diff --git a/include/grpc/support/log.h b/include/grpc/support/log.h index 82e64a8c56f..d536bc85922 100644 --- a/include/grpc/support/log.h +++ b/include/grpc/support/log.h @@ -51,9 +51,6 @@ typedef enum gpr_log_severity { GPRAPI void gpr_log(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); -/** Deprecated. **/ -GPRAPI int absl_vlog2_enabled(); - GPRAPI void gpr_log_verbosity_init(void); #ifdef __cplusplus diff --git a/src/core/util/log.cc b/src/core/util/log.cc index 8e104f6bd65..d32c8373787 100644 --- a/src/core/util/log.cc +++ b/src/core/util/log.cc @@ -40,8 +40,6 @@ void gpr_unreachable_code(const char* reason, const char* file, int line) { grpc_core::SourceLocation(file, line)); } -int absl_vlog2_enabled() { return ABSL_VLOG_IS_ON(2); } - int gpr_should_log(gpr_log_severity severity) { switch (severity) { case GPR_LOG_SEVERITY_ERROR: diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 8dd2530cbc8..14bce49d37f 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -60,30 +60,30 @@ static VALUE grpc_rb_call_credentials_callback(VALUE args) { VALUE callback_func = rb_ary_entry(args, 0); VALUE callback_args = rb_ary_entry(args, 1); VALUE md_ary_obj = rb_ary_entry(args, 2); - if (absl_vlog2_enabled()) { - VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0); - VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0); - VALUE callback_source_info = - rb_funcall(callback_func, rb_intern("source_location"), 0); - if (callback_source_info != Qnil) { - VALUE source_filename = rb_ary_entry(callback_source_info, 0); - VALUE source_line_number = rb_funcall( - rb_ary_entry(callback_source_info, 1), rb_intern("to_s"), 0); - gpr_log(GPR_DEBUG, - "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " - "source_filename:%s line_number:%s with arguments:|%s|", - StringValueCStr(callback_func_str), - StringValueCStr(source_filename), - StringValueCStr(source_line_number), - StringValueCStr(callback_args_str)); - } else { - gpr_log(GPR_DEBUG, - "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " - "(failed to get source filename and line) with arguments:|%s|", - StringValueCStr(callback_func_str), - StringValueCStr(callback_args_str)); - } + + VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0); + VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0); + VALUE callback_source_info = + rb_funcall(callback_func, rb_intern("source_location"), 0); + if (callback_source_info != Qnil) { + VALUE source_filename = rb_ary_entry(callback_source_info, 0); + VALUE source_line_number = + rb_funcall(rb_ary_entry(callback_source_info, 1), rb_intern("to_s"), 0); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " + "source_filename:%s line_number:%s with arguments:|%s|", + StringValueCStr(callback_func_str), + StringValueCStr(source_filename), + StringValueCStr(source_line_number), + StringValueCStr(callback_args_str)); + } else { + gpr_log(GPR_DEBUG, + "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " + "(failed to get source filename and line) with arguments:|%s|", + StringValueCStr(callback_func_str), + StringValueCStr(callback_args_str)); } + VALUE metadata = rb_funcall(callback_func, rb_intern("call"), 1, callback_args); grpc_metadata_array* md_ary = NULL; diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 36e66f82b86..31cb6523097 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -254,7 +254,6 @@ gpr_free_aligned_type gpr_free_aligned_import; gpr_cpu_num_cores_type gpr_cpu_num_cores_import; gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import; gpr_log_type gpr_log_import; -absl_vlog2_enabled_type absl_vlog2_enabled_import; gpr_log_verbosity_init_type gpr_log_verbosity_init_import; gpr_format_message_type gpr_format_message_import; gpr_strdup_type gpr_strdup_import; @@ -539,7 +538,6 @@ void grpc_rb_load_imports(HMODULE library) { gpr_cpu_num_cores_import = (gpr_cpu_num_cores_type) GetProcAddress(library, "gpr_cpu_num_cores"); gpr_cpu_current_cpu_import = (gpr_cpu_current_cpu_type) GetProcAddress(library, "gpr_cpu_current_cpu"); gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log"); - absl_vlog2_enabled_import = (absl_vlog2_enabled_type) GetProcAddress(library, "absl_vlog2_enabled"); gpr_log_verbosity_init_import = (gpr_log_verbosity_init_type) GetProcAddress(library, "gpr_log_verbosity_init"); gpr_format_message_import = (gpr_format_message_type) GetProcAddress(library, "gpr_format_message"); gpr_strdup_import = (gpr_strdup_type) GetProcAddress(library, "gpr_strdup"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 8ddd62a7ce9..02aee5008a7 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -738,9 +738,6 @@ extern gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import; typedef void(*gpr_log_type)(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); extern gpr_log_type gpr_log_import; #define gpr_log gpr_log_import -typedef int(*absl_vlog2_enabled_type)(); -extern absl_vlog2_enabled_type absl_vlog2_enabled_import; -#define absl_vlog2_enabled absl_vlog2_enabled_import typedef void(*gpr_log_verbosity_init_type)(void); extern gpr_log_verbosity_init_type gpr_log_verbosity_init_import; #define gpr_log_verbosity_init gpr_log_verbosity_init_import diff --git a/tools/run_tests/sanity/banned_functions.py b/tools/run_tests/sanity/banned_functions.py index ec8e8b2ec14..9db09d82699 100755 --- a/tools/run_tests/sanity/banned_functions.py +++ b/tools/run_tests/sanity/banned_functions.py @@ -37,11 +37,6 @@ os.chdir(os.path.join(os.path.dirname(sys.argv[0]), "../../..")) # Map of deprecated functions to allowlist files DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = { - "absl_vlog2_enabled(": [ - "./include/grpc/support/log.h", - "./src/core/util/log.cc", - "./src/ruby/ext/grpc/rb_call_credentials.c", - ], "gpr_log_severity": [ "./include/grpc/support/log.h", "./src/core/util/android/log.cc", From 394ce688cb3abc60f5f423574a2f4666b245ba02 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 15 Aug 2024 08:45:44 -0700 Subject: [PATCH 06/12] [RLS] don't register metric callback until we receive initial update (#37481) Closes #37481 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37481 from markdroth:rls_metric_fix bcbc1d2a4e6b1cb113049db67ff9e6f009fcf689 PiperOrigin-RevId: 663320344 --- src/core/load_balancing/rls/rls.cc | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/core/load_balancing/rls/rls.cc b/src/core/load_balancing/rls/rls.cc index ef2d44c4d28..d2cdca5c14c 100644 --- a/src/core/load_balancing/rls/rls.cc +++ b/src/core/load_balancing/rls/rls.cc @@ -1919,14 +1919,7 @@ RlsLb::RlsLb(Args args) instance_uuid_(channel_args() .GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID) .value_or(GenerateUUID())), - cache_(this), - registered_metric_callback_( - channel_control_helper()->GetStatsPluginGroup().RegisterCallback( - [this](CallbackMetricReporter& reporter) { - MutexLock lock(&mu_); - cache_.ReportMetricsLocked(reporter); - }, - Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries)) { + cache_(this) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created"; } @@ -2054,6 +2047,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { } } update_in_progress_ = false; + // On the initial update only, we set the gauge metric callback. We + // can't do this before the initial update, because otherwise the + // callback could be invoked before we've set state that we need for + // the label values (e.g., we'd add metrics with empty string for the + // RLS server name). + if (registered_metric_callback_ == nullptr) { + registered_metric_callback_ = + channel_control_helper()->GetStatsPluginGroup().RegisterCallback( + [this](CallbackMetricReporter& reporter) { + MutexLock lock(&mu_); + cache_.ReportMetricsLocked(reporter); + }, + Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries); + } // In principle, we need to update the picker here only if the config // fields used by the picker have changed. However, it seems fragile // to check individual fields, since the picker logic could change in From e5f5f6e039f5b312d184ff824cf17268c708a956 Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Thu, 15 Aug 2024 09:04:48 -0700 Subject: [PATCH 07/12] [Release] Add 1.65.0 to interop matrix (#37097) Closes #37097 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37097 from XuanWang-Amos:add_1_65_to_interop 870966224493edbd87ebd44cd1982824ec5a98ea PiperOrigin-RevId: 663325868 --- tools/interop_matrix/client_matrix.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index ab43a6c0868..4363c0482dc 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -134,6 +134,7 @@ LANG_RELEASE_MATRIX = { ("v1.62.0", ReleaseInfo()), ("v1.63.1", ReleaseInfo()), ("v1.64.1", ReleaseInfo()), + ("v1.65.0", ReleaseInfo()), ] ), "go": OrderedDict( @@ -799,6 +800,12 @@ LANG_RELEASE_MATRIX = { runtimes=["python"], testcases_file="python__master" ), ), + ( + "v1.65.0", + ReleaseInfo( + runtimes=["python"], testcases_file="python__master" + ), + ), ] ), "node": OrderedDict( @@ -897,6 +904,7 @@ LANG_RELEASE_MATRIX = { ("v1.62.0", ReleaseInfo()), ("v1.63.0", ReleaseInfo()), ("v1.64.0", ReleaseInfo()), + ("v1.65.0", ReleaseInfo()), ] ), "php": OrderedDict( @@ -959,6 +967,7 @@ LANG_RELEASE_MATRIX = { ("v1.62.0", ReleaseInfo()), ("v1.63.0", ReleaseInfo()), ("v1.64.0", ReleaseInfo()), + ("v1.65.0", ReleaseInfo()), ] ), "csharp": OrderedDict( From a09aaf06ecd52ed4c2d623dc9a617d9411a1f3c3 Mon Sep 17 00:00:00 2001 From: Hannah Shi Date: Thu, 15 Aug 2024 09:16:58 -0700 Subject: [PATCH 08/12] [ObjC] fix error handling in CFWriteStreamWrite (#37344) follow up of #37255 Closes #37344 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37344 from HannahShiSFB:cfstream-write-error-handling c9a751d9d73debb53c1d2f8b81808ca874e76836 PiperOrigin-RevId: 663329438 --- .../lib/event_engine/cf_engine/cfstream_endpoint.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc index 47e67d8e4a4..4721437860b 100644 --- a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc @@ -335,9 +335,18 @@ void CFStreamEndpointImpl::DoWrite( continue; } - size_t written_size = + CFIndex written_size = CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size()); + if (written_size < 0) { + auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_)); + GRPC_TRACE_LOG(event_engine_endpoint, INFO) + << "CFStream write error: " << status + << ", written_size: " << written_size; + on_writable(status); + return; + } + total_written_size += written_size; if (written_size < slice.size()) { SliceBuffer written; From 7407dbf21dbab125ab5eb778daab6182c009b069 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 15 Aug 2024 10:37:25 -0700 Subject: [PATCH 09/12] [RlsLB] Fix Deadlock (#37459) Internal bug: b/357864682 A lock ordering inversion was noticed with the following stacks - ``` [mutex.cc : 1418] RAW: Potential Mutex deadlock: @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck() @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock() @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock() @ 0x564f4be968c5 grpc::internal::OpenTelemetryPluginImpl::RemoveCallback() @ 0x564f4cd097b8 grpc_core::RegisteredMetricCallback::~RegisteredMetricCallback() @ 0x564f4c1f1216 std::default_delete<>::operator()() @ 0x564f4c1f157f std::__uniq_ptr_impl<>::reset() @ 0x564f4c1ee967 std::unique_ptr<>::reset() @ 0x564f4c352f44 grpc_core::GrpcXdsClient::Orphaned() @ 0x564f4c25dad1 grpc_core::DualRefCounted<>::Unref() @ 0x564f4c4653ed grpc_core::RefCountedPtr<>::reset() @ 0x564f4c463c73 grpc_core::XdsClusterDropStats::~XdsClusterDropStats() @ 0x564f4c463d02 grpc_core::XdsClusterDropStats::~XdsClusterDropStats() @ 0x564f4c25efa9 grpc_core::UnrefDelete::operator()<>() @ 0x564f4c25d5f0 grpc_core::RefCounted<>::Unref() @ 0x564f4c25c2d9 grpc_core::RefCountedPtr<>::~RefCountedPtr() @ 0x564f4c25b1d8 grpc_core::(anonymous namespace)::XdsClusterImplLb::Picker::~Picker() @ 0x564f4c25b240 grpc_core::(anonymous namespace)::XdsClusterImplLb::Picker::~Picker() @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>() @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref() @ 0x564f4c124fb8 grpc_core::DualRefCounted<>::Unref() @ 0x564f4c11f029 grpc_core::RefCountedPtr<>::~RefCountedPtr() @ 0x564f4c14e958 grpc_core::(anonymous namespace)::OutlierDetectionLb::Picker::~Picker() @ 0x564f4c14e980 grpc_core::(anonymous namespace)::OutlierDetectionLb::Picker::~Picker() @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>() @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref() @ 0x564f4c124fb8 grpc_core::DualRefCounted<>::Unref() @ 0x564f4c11f029 grpc_core::RefCountedPtr<>::~RefCountedPtr() @ 0x564f4c26bafc std::pair<>::~pair() @ 0x564f4c26bb28 __gnu_cxx::new_allocator<>::destroy<>() @ 0x564f4c26b88f std::allocator_traits<>::destroy<>() @ 0x564f4c26b297 std::_Rb_tree<>::_M_destroy_node() @ 0x564f4c26abfb std::_Rb_tree<>::_M_drop_node() @ 0x564f4c26a926 std::_Rb_tree<>::_M_erase() @ 0x564f4c26a6f0 std::_Rb_tree<>::~_Rb_tree() @ 0x564f4c26a62a std::map<>::~map() @ 0x564f4c2691a4 grpc_core::(anonymous namespace)::XdsClusterManagerLb::ClusterPicker::~ClusterPicker() @ 0x564f4c2691cc grpc_core::(anonymous namespace)::XdsClusterManagerLb::ClusterPicker::~ClusterPicker() @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>() @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref() [mutex.cc : 1428] RAW: Acquiring absl::Mutex 0x564f4f22ad40 while holding 0x7f939834bb70; a cycle in the historical lock ordering graph has been observed [mutex.cc : 1432] RAW: Cycle: [mutex.cc : 1446] RAW: mutex@0x564f4f22ad40 stack: @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck() @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock() @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock() @ 0x564f4be96124 grpc::internal::OpenTelemetryPluginImpl::AddCallback() @ 0x564f4cd096f0 grpc_core::RegisteredMetricCallback::RegisteredMetricCallback() @ 0x564f4c1f111b std::make_unique<>() @ 0x564f4c3564b0 grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup::RegisterCallback<>() @ 0x564f4c352dea grpc_core::GrpcXdsClient::GrpcXdsClient() @ 0x564f4c355bc6 grpc_core::MakeRefCounted<>() @ 0x564f4c3525f2 grpc_core::GrpcXdsClient::GetOrCreate() @ 0x564f4c28f8f8 grpc_core::(anonymous namespace)::XdsResolver::StartLocked() @ 0x564f4c2f5f82 grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartXdsResolver() @ 0x564f4c2f515d grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::ZoneQueryDone() @ 0x564f4c2f496b grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartLocked()::{lambda()#1}::operator()()::{lambda()#1}::operator()() @ 0x564f4c2f80f6 std::__invoke_impl<>() @ 0x564f4c2f7b9d _ZSt10__invoke_rIvRZZN9grpc_core12_GLOBAL__N_124GoogleCloud2ProdResolver11StartLockedEvENUlNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEN4absl12lts_202401168StatusOrIS8_EEE_clES8_SC_EUlvE_J... @ 0x564f4c2f748c std::_Function_handler<>::_M_invoke() @ 0x564f4b8ad682 std::function<>::operator()() @ 0x564f4cd1c6bf grpc_core::WorkSerializer::LegacyWorkSerializer::Run() @ 0x564f4cd1dae4 grpc_core::WorkSerializer::Run() @ 0x564f4c2f4b0b grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartLocked()::{lambda()#1}::operator()() @ 0x564f4c2f8dc7 absl::lts_20240116::base_internal::Callable::Invoke<>() @ 0x564f4c2f8cb8 absl::lts_20240116::base_internal::invoke<>() @ 0x564f4c2f8b16 absl::lts_20240116::internal_any_invocable::InvokeR<>() @ 0x564f4c2f8a0c absl::lts_20240116::internal_any_invocable::LocalInvoker<>() @ 0x564f4c2fb88d absl::lts_20240116::internal_any_invocable::Impl<>::operator()() @ 0x564f4c2fb1f3 grpc_core::GcpMetadataQuery::OnDone() @ 0x564f4cd75a72 exec_ctx_run() @ 0x564f4cd75ba9 grpc_core::ExecCtx::Flush() @ 0x564f4cc8ee1d end_worker() @ 0x564f4cc8f304 pollset_work() @ 0x564f4cc5dcaf pollset_work() @ 0x564f4cc69220 grpc_pollset_work() @ 0x564f4cbe7733 cq_pluck() @ 0x564f4cbe7ad5 grpc_completion_queue_pluck @ 0x564f4bc61d96 grpc::CompletionQueue::Pluck() @ 0x564f4bfdb055 grpc::ClientReader<>::ClientReader<>() @ 0x564f4bfd6035 grpc::internal::ClientReaderFactory<>::Create<>() @ 0x564f4bfc322b google::storage::v2::Storage::Stub::ReadObjectRaw() @ 0x564f4bf9934b google::storage::v2::Storage::Stub::ReadObject() [mutex.cc : 1446] RAW: mutex@0x7f939834bb70 stack: @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck() @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock() @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock() @ 0x564f4c1ce9eb grpc_core::(anonymous namespace)::RlsLb::RlsLb()::{lambda()#1}::operator()() @ 0x564f4c1e794c absl::lts_20240116::base_internal::Callable::Invoke<>() @ 0x564f4c1e72c1 absl::lts_20240116::base_internal::invoke<>() @ 0x564f4c1e6af1 absl::lts_20240116::internal_any_invocable::InvokeR<>() @ 0x564f4c1e5d6c absl::lts_20240116::internal_any_invocable::LocalInvoker<>() @ 0x564f4be9d0c8 absl::lts_20240116::internal_any_invocable::Impl<>::operator()() @ 0x564f4be9b4ff grpc_core::RegisteredMetricCallback::Run() @ 0x564f4bea07ae grpc::internal::OpenTelemetryPluginImpl::CallbackGaugeState<>::CallbackGaugeCallback() @ 0x564f4bf844de opentelemetry::v1::sdk::metrics::ObservableRegistry::Observe() @ 0x564f4bf56529 opentelemetry::v1::sdk::metrics::Meter::Collect() @ 0x564f4bf8c1d5 opentelemetry::v1::sdk::metrics::MetricCollector::Collect()::{lambda()#1}::operator()() @ 0x564f4bf8c5ac opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::operator()() @ 0x564f4bf8c5e8 opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::_FUN() @ 0x564f4bf7604d opentelemetry::v1::nostd::function_ref<>::operator()() @ 0x564f4bf74ad9 opentelemetry::v1::sdk::metrics::MeterContext::ForEachMeter() @ 0x564f4bf8c457 opentelemetry::v1::sdk::metrics::MetricCollector::Collect() @ 0x564f4bf4a7fe opentelemetry::v1::sdk::metrics::MetricReader::Collect() @ 0x564f4bed5e24 opentelemetry::v1::exporter::metrics::PrometheusCollector::Collect() @ 0x564f4bef004f prometheus::detail::CollectMetrics() @ 0x564f4beec26d prometheus::detail::MetricsHandler::handleGet() @ 0x564f4bf1cd8b CivetServer::requestHandler() @ 0x564f4bf35e7b handle_request @ 0x564f4bf29534 handle_request_stat_log @ 0x564f4bf39b3f process_new_connection @ 0x564f4bf3a448 worker_thread_run @ 0x564f4bf3a57f worker_thread @ 0x7f93e9137ea7 start_thread [mutex.cc : 1454] RAW: dying due to potential deadlock Aborted ``` From the stack, it looks like we are ending up holding a lock to the `RlsLB` policy while removing a callback from the gRPC OpenTelemetry plugin, which is a lock ordering inversion. The correct order is `OpenTelemetry` -> `gRPC OpenTelemetry plugin` -> `gRPC Component like RLS/xDSClient`. A common pattern we employ for metrics is for the callbacks to be unregistered when the corresponding component object is orphaned/destroyed (unreffing). Also, note that removing callbacks requires a lock in `gRPC OpenTelemetry plugin`. To avoid deadlocks, we remove the callback inside `RlsLb` from outside the critical region, but `RlsLb` owns refs to child policies which in turn hold refs to `XdsClient`. The lock ordering inversion occurred due to unreffing child policies within the critical region. This PR is an alternative fix to this problem. Original fix in https://github.com/grpc/grpc/pull/37425. Verified that it fixes the bug. Closes #37459 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37459 from yashykt:FixDeadlocks ec7fbcf2d61e3f91043ccef33780850aa93b6cfa PiperOrigin-RevId: 663360427 --- src/core/load_balancing/rls/rls.cc | 123 +++++++++++++++++++++-------- 1 file changed, 90 insertions(+), 33 deletions(-) diff --git a/src/core/load_balancing/rls/rls.cc b/src/core/load_balancing/rls/rls.cc index d2cdca5c14c..1248cfd639e 100644 --- a/src/core/load_balancing/rls/rls.cc +++ b/src/core/load_balancing/rls/rls.cc @@ -353,7 +353,8 @@ class RlsLb final : public LoadBalancingPolicy { // is called after releasing it. // // Both methods grab the data they need from the parent object. - void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + void StartUpdate(OrphanablePtr* child_policy_to_delete) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_); void ExitIdleLocked() { @@ -397,14 +398,14 @@ class RlsLb final : public LoadBalancingPolicy { }; // Note: We are forced to disable lock analysis here because - // Orphan() is called by Unref() which is called by RefCountedPtr<>, which + // Orphaned() is called by Unref() which is called by RefCountedPtr<>, which // cannot have lock annotations for this particular caller. void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS; RefCountedPtr lb_policy_; std::string target_; - bool is_shutdown_ = false; + bool is_shutdown_ = false; // Protected by WorkSerializer OrphanablePtr child_policy_; RefCountedPtr pending_config_; @@ -503,12 +504,25 @@ class RlsLb final : public LoadBalancingPolicy { // Returns a list of child policy wrappers on which FinishUpdate() // needs to be called after releasing the lock. std::vector OnRlsResponseLocked( - ResponseInfo response, std::unique_ptr backoff_state) + ResponseInfo response, std::unique_ptr backoff_state, + OrphanablePtr* child_policy_to_delete) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Moves entry to the end of the LRU list. void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + // Takes entries from child_policy_wrappers_ and appends them to the end + // of \a child_policy_wrappers. + void TakeChildPolicyWrappers( + std::vector>* child_policy_wrappers) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { + child_policy_wrappers->insert( + child_policy_wrappers->end(), + std::make_move_iterator(child_policy_wrappers_.begin()), + std::make_move_iterator(child_policy_wrappers_.end())); + child_policy_wrappers_.clear(); + } + private: class BackoffTimer final : public InternallyRefCounted { public: @@ -566,19 +580,24 @@ class RlsLb final : public LoadBalancingPolicy { // the caller. Otherwise, the entry found is returned to the caller. The // entry returned to the user is considered recently used and its order in // the LRU list of the cache is updated. - Entry* FindOrInsert(const RequestKey& key) + Entry* FindOrInsert(const RequestKey& key, + std::vector>* + child_policy_wrappers_to_delete) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Resizes the cache. If the new cache size is greater than the current size // of the cache, do nothing. Otherwise, evict the oldest entries that // exceed the new size limit of the cache. - void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + void Resize(size_t bytes, std::vector>* + child_policy_wrappers_to_delete) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Resets backoff of all the cache entries. void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Shutdown the cache; clean-up and orphan all the stored cache entries. - void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + GRPC_MUST_USE_RESULT std::vector> + Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); void ReportMetricsLocked(CallbackMetricReporter& reporter) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); @@ -594,7 +613,9 @@ class RlsLb final : public LoadBalancingPolicy { // Evicts oversized cache elements when the current size is greater than // the specified limit. - void MaybeShrinkSize(size_t bytes) + void MaybeShrinkSize(size_t bytes, + std::vector>* + child_policy_wrappers_to_delete) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); RlsLb* lb_policy_; @@ -857,7 +878,8 @@ absl::optional InsertOrUpdateChildPolicyField(const std::string& field, return Json::FromArray(std::move(array)); } -void RlsLb::ChildPolicyWrapper::StartUpdate() { +void RlsLb::ChildPolicyWrapper::StartUpdate( + OrphanablePtr* child_policy_to_delete) { ValidationErrors errors; auto child_policy_config = InsertOrUpdateChildPolicyField( lb_policy_->config_->child_policy_config_target_field_name(), target_, @@ -880,7 +902,7 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() { pending_config_.reset(); picker_ = MakeRefCounted( absl::UnavailableError(config.status().message())); - child_policy_.reset(); + *child_policy_to_delete = std::move(child_policy_); } else { pending_config_ = std::move(*config); } @@ -934,9 +956,9 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( << ": UpdateState(state=" << ConnectivityStateName(state) << ", status=" << status << ", picker=" << picker.get() << ")"; } + if (wrapper_->is_shutdown_) return; { MutexLock lock(&wrapper_->lb_policy_->mu_); - if (wrapper_->is_shutdown_) return; // TODO(roth): It looks like this ignores subsequent TF updates that // might change the status used to fail picks, which seems wrong. if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && @@ -946,7 +968,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( wrapper_->connectivity_state_ = state; DCHECK(picker != nullptr); if (picker != nullptr) { - wrapper_->picker_ = std::move(picker); + // We want to unref the picker after we release the lock. + wrapper_->picker_.swap(picker); } } wrapper_->lb_policy_->UpdatePickerLocked(); @@ -1194,18 +1217,19 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr lb_policy, lb_policy_->cache_.lru_list_.end(), key)) {} void RlsLb::Cache::Entry::Orphan() { + // We should be holding RlsLB::mu_. GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " " << lru_iterator_->ToString() << ": cache entry evicted"; is_shutdown_ = true; lb_policy_->cache_.lru_list_.erase(lru_iterator_); lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case. + CHECK(child_policy_wrappers_.empty()); backoff_state_.reset(); if (backoff_timer_ != nullptr) { backoff_timer_.reset(); lb_policy_->UpdatePickerAsync(); } - child_policy_wrappers_.clear(); Unref(DEBUG_LOCATION, "Orphan"); } @@ -1284,7 +1308,8 @@ void RlsLb::Cache::Entry::MarkUsed() { std::vector RlsLb::Cache::Entry::OnRlsResponseLocked( - ResponseInfo response, std::unique_ptr backoff_state) { + ResponseInfo response, std::unique_ptr backoff_state, + OrphanablePtr* child_policy_to_delete) { // Move the entry to the end of the LRU list. MarkUsed(); // If the request failed, store the failed status and update the @@ -1345,7 +1370,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked( if (it == lb_policy_->child_policy_map_.end()) { auto new_child = MakeRefCounted( lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target); - new_child->StartUpdate(); + new_child->StartUpdate(child_policy_to_delete); child_policies_to_finish_update.push_back(new_child.get()); new_child_policy_wrappers.emplace_back(std::move(new_child)); } else { @@ -1382,12 +1407,15 @@ RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) { return it->second.get(); } -RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { +RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert( + const RequestKey& key, std::vector>* + child_policy_wrappers_to_delete) { auto it = map_.find(key); // If not found, create new entry. if (it == map_.end()) { size_t entry_size = EntrySizeForKey(key); - MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size)); + MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size), + child_policy_wrappers_to_delete); Entry* entry = new Entry( lb_policy_->RefAsSubclass(DEBUG_LOCATION, "CacheEntry"), key); map_.emplace(key, OrphanablePtr(entry)); @@ -1405,11 +1433,13 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { return it->second.get(); } -void RlsLb::Cache::Resize(size_t bytes) { +void RlsLb::Cache::Resize(size_t bytes, + std::vector>* + child_policy_wrappers_to_delete) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes"; size_limit_ = bytes; - MaybeShrinkSize(size_limit_); + MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete); } void RlsLb::Cache::ResetAllBackoff() { @@ -1419,7 +1449,12 @@ void RlsLb::Cache::ResetAllBackoff() { lb_policy_->UpdatePickerAsync(); } -void RlsLb::Cache::Shutdown() { +std::vector> RlsLb::Cache::Shutdown() { + std::vector> + child_policy_wrappers_to_delete; + for (auto& entry : map_) { + entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete); + } map_.clear(); lru_list_.clear(); if (cleanup_timer_handle_.has_value() && @@ -1429,6 +1464,7 @@ void RlsLb::Cache::Shutdown() { << "[rlslb " << lb_policy_ << "] cache cleanup timer canceled"; } cleanup_timer_handle_.reset(); + return child_policy_wrappers_to_delete; } void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) { @@ -1464,12 +1500,15 @@ void RlsLb::Cache::StartCleanupTimer() { void RlsLb::Cache::OnCleanupTimer() { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_ << "] cache cleanup timer fired"; + std::vector> + child_policy_wrappers_to_delete; MutexLock lock(&lb_policy_->mu_); if (!cleanup_timer_handle_.has_value()) return; if (lb_policy_->is_shutdown_) return; for (auto it = map_.begin(); it != map_.end();) { if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) { size_ -= it->second->Size(); + it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete); it = map_.erase(it); } else { ++it; @@ -1483,7 +1522,9 @@ size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) { return (key.Size() * 2) + sizeof(Entry); } -void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { +void RlsLb::Cache::MaybeShrinkSize( + size_t bytes, std::vector>* + child_policy_wrappers_to_delete) { while (size_ > bytes) { auto lru_it = lru_list_.begin(); if (GPR_UNLIKELY(lru_it == lru_list_.end())) break; @@ -1494,6 +1535,7 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { << "[rlslb " << lb_policy_ << "] LRU eviction: removing entry " << map_it->second.get() << " " << lru_it->ToString(); size_ -= map_it->second->Size(); + map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete); map_.erase(map_it); } GRPC_TRACE_LOG(rls_lb, INFO) @@ -1814,13 +1856,18 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) { << "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " " << key_.ToString() << ": response info: " << response.ToString(); std::vector child_policies_to_finish_update; + std::vector> + child_policy_wrappers_to_delete; + OrphanablePtr child_policy_to_delete; { MutexLock lock(&lb_policy_->mu_); if (lb_policy_->is_shutdown_) return; rls_channel_->ReportResponseLocked(response.status.ok()); - Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_); + Cache::Entry* cache_entry = + lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete); child_policies_to_finish_update = cache_entry->OnRlsResponseLocked( - std::move(response), std::move(backoff_state_)); + std::move(response), std::move(backoff_state_), + &child_policy_to_delete); lb_policy_->request_map_.erase(key_); } // Now that we've released the lock, finish the update on any newly @@ -1999,6 +2046,9 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { } } // Now grab the lock to swap out the state it guards. + std::vector> + child_policy_wrappers_to_delete; + OrphanablePtr child_policy_to_delete; { MutexLock lock(&mu_); // Swap out RLS channel if needed. @@ -2010,19 +2060,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { // Resize cache if needed. if (old_config == nullptr || config_->cache_size_bytes() != old_config->cache_size_bytes()) { - cache_.Resize(static_cast(config_->cache_size_bytes())); + cache_.Resize(static_cast(config_->cache_size_bytes()), + &child_policy_wrappers_to_delete); } // Start update of child policies if needed. if (update_child_policies) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] starting child policy updates"; for (auto& p : child_policy_map_) { - p.second->StartUpdate(); + p.second->StartUpdate(&child_policy_to_delete); } } else if (created_default_child) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] starting default child policy update"; - default_child_policy_->StartUpdate(); + default_child_policy_->StartUpdate(&child_policy_to_delete); } } // Now that we've released the lock, finish update of child policies. @@ -2097,14 +2148,20 @@ void RlsLb::ResetBackoffLocked() { void RlsLb::ShutdownLocked() { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown"; registered_metric_callback_.reset(); - MutexLock lock(&mu_); - is_shutdown_ = true; - config_.reset(DEBUG_LOCATION, "ShutdownLocked"); + RefCountedPtr child_policy_to_delete; + std::vector> + child_policy_wrappers_to_delete; + OrphanablePtr rls_channel_to_delete; + { + MutexLock lock(&mu_); + is_shutdown_ = true; + config_.reset(DEBUG_LOCATION, "ShutdownLocked"); + child_policy_wrappers_to_delete = cache_.Shutdown(); + request_map_.clear(); + rls_channel_to_delete = std::move(rls_channel_); + child_policy_to_delete = std::move(default_child_policy_); + } channel_args_ = ChannelArgs(); - cache_.Shutdown(); - request_map_.clear(); - rls_channel_.reset(); - default_child_policy_.reset(); } void RlsLb::UpdatePickerAsync() { From 910953e7ef679767075c13c6879e063edf9cdd1d Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 15 Aug 2024 14:35:33 -0700 Subject: [PATCH 10/12] [OTel C++] Simplify tests (#37486) It is hard to reason about tests if multiple callbacks record values for the same metrics with the same label sets. Closes #37486 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37486 from yashykt:SimplifyOTelPluginTest 3b4d7f90b3a988b1d837792fc5e3fabcdff45865 PiperOrigin-RevId: 663457107 --- test/cpp/ext/otel/otel_plugin_test.cc | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc index cbc9b18fc42..75ff7daf0e7 100644 --- a/test/cpp/ext/otel/otel_plugin_test.cc +++ b/test/cpp/ext/otel/otel_plugin_test.cc @@ -1775,14 +1775,11 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_1 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_1; - reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1, + reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1, + reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); + ; }, grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), integer_gauge_handle, double_gauge_handle); @@ -1792,12 +1789,8 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_2 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_2; - reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); }, @@ -1910,14 +1903,10 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_1 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_1; - reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1, + reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1, + reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); }, grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(), integer_gauge_handle, double_gauge_handle); @@ -1927,12 +1916,8 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_2 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_2; - reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); }, From 8b4a8ebdf96ac7771b13a3e50b4974532d787d4c Mon Sep 17 00:00:00 2001 From: Nana Pang Date: Thu, 15 Aug 2024 14:47:14 -0700 Subject: [PATCH 11/12] [C++ BUILD] Internal changes for base context propagation. PiperOrigin-RevId: 663460841 --- BUILD | 21 +++++++ CMakeLists.txt | 13 +++++ build_autogenerated.yaml | 13 +++++ gRPC-C++.podspec | 2 + include/grpcpp/support/callback_common.h | 26 ++++++++- include/grpcpp/support/global_callback_hook.h | 58 +++++++++++++++++++ src/cpp/client/global_callback_hook.cc | 36 ++++++++++++ tools/doxygen/Doxyfile.c++ | 1 + tools/doxygen/Doxyfile.c++.internal | 2 + 9 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 include/grpcpp/support/global_callback_hook.h create mode 100644 src/cpp/client/global_callback_hook.cc diff --git a/BUILD b/BUILD index eb4bc77f081..7cd081fb527 100644 --- a/BUILD +++ b/BUILD @@ -907,6 +907,7 @@ grpc_cc_library( ], visibility = ["@grpc:grpc++_public_hdrs"], deps = [ + "global_callback_hook", "grpc_public_hdrs", "//src/core:gpr_atm", ], @@ -951,6 +952,7 @@ grpc_cc_library( tags = ["nofixdeps"], visibility = ["@grpc:public"], deps = [ + "global_callback_hook", "grpc++_base", "//src/core:gpr_atm", "//src/core:slice", @@ -1260,6 +1262,7 @@ grpc_cc_library( deps = [ "channel_arg_names", "generic_stub_internal", + "global_callback_hook", "gpr", "grpc++_base_unsecure", "grpc++_codegen_proto", @@ -2455,6 +2458,7 @@ grpc_cc_library( "config", "exec_ctx", "generic_stub_internal", + "global_callback_hook", "gpr", "grpc", "grpc++_codegen_proto", @@ -2544,6 +2548,7 @@ grpc_cc_library( "config", "exec_ctx", "generic_stub_internal", + "global_callback_hook", "gpr", "grpc_base", "grpc_core_credentials_header", @@ -4913,6 +4918,22 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "global_callback_hook", + srcs = [ + "src/cpp/client/global_callback_hook.cc", + ], + hdrs = [ + "include/grpcpp/support/global_callback_hook.h", + ], + external_deps = [ + "absl/base:no_destructor", + "absl/log:check", + "absl/functional:function_ref", + ], + language = "c++", +) + # TODO(yashykt): Remove the UPB definitions from here once they are no longer needed ### UPB Targets diff --git a/CMakeLists.txt b/CMakeLists.txt index da48ca9f836..99a85a30376 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4182,6 +4182,7 @@ add_library(grpc++ src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/client/xds_credentials.cc @@ -4465,6 +4466,7 @@ foreach(_hdr include/grpcpp/support/client_callback.h include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h + include/grpcpp/support/global_callback_hook.h include/grpcpp/support/interceptor.h include/grpcpp/support/message_allocator.h include/grpcpp/support/method_handler.h @@ -4938,6 +4940,7 @@ add_library(grpc++_unsecure src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/common/alarm.cc src/cpp/common/channel_arguments.cc @@ -5209,6 +5212,7 @@ foreach(_hdr include/grpcpp/support/client_callback.h include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h + include/grpcpp/support/global_callback_hook.h include/grpcpp/support/interceptor.h include/grpcpp/support/message_allocator.h include/grpcpp/support/method_handler.h @@ -8266,6 +8270,7 @@ add_executable(binder_transport_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -13405,6 +13410,7 @@ add_executable(endpoint_binder_pool_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -14260,6 +14266,7 @@ add_executable(fake_binder_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -32271,6 +32278,7 @@ add_executable(transport_stream_receiver_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -33157,6 +33165,7 @@ add_executable(wire_reader_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -33267,6 +33276,7 @@ add_executable(wire_writer_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -33847,6 +33857,7 @@ add_executable(xds_client_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + src/cpp/client/global_callback_hook.cc src/cpp/util/status.cc test/core/xds/xds_client_test.cc test/core/xds/xds_transport_fake.cc @@ -34142,6 +34153,7 @@ add_executable(xds_cluster_resource_type_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h + src/cpp/client/global_callback_hook.cc src/cpp/util/status.cc test/core/xds/xds_cluster_resource_type_test.cc ) @@ -35138,6 +35150,7 @@ add_executable(xds_endpoint_resource_type_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + src/cpp/client/global_callback_hook.cc src/cpp/util/status.cc test/core/xds/xds_endpoint_resource_type_test.cc ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b95f89cafd6..abcef83cc2a 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -3836,6 +3836,7 @@ libs: - include/grpcpp/support/client_callback.h - include/grpcpp/support/client_interceptor.h - include/grpcpp/support/config.h + - include/grpcpp/support/global_callback_hook.h - include/grpcpp/support/interceptor.h - include/grpcpp/support/message_allocator.h - include/grpcpp/support/method_handler.h @@ -3915,6 +3916,7 @@ libs: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/client/xds_credentials.cc @@ -4267,6 +4269,7 @@ libs: - include/grpcpp/support/client_callback.h - include/grpcpp/support/client_interceptor.h - include/grpcpp/support/config.h + - include/grpcpp/support/global_callback_hook.h - include/grpcpp/support/interceptor.h - include/grpcpp/support/message_allocator.h - include/grpcpp/support/method_handler.h @@ -4303,6 +4306,7 @@ libs: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/common/alarm.cc - src/cpp/common/channel_arguments.cc @@ -6182,6 +6186,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -9668,6 +9673,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -10143,6 +10149,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -20379,6 +20386,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -20798,6 +20806,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -20909,6 +20918,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -21232,6 +21242,7 @@ targets: - src/proto/grpc/testing/xds/v3/base.proto - src/proto/grpc/testing/xds/v3/discovery.proto - src/proto/grpc/testing/xds/v3/percent.proto + - src/cpp/client/global_callback_hook.cc - src/cpp/util/status.cc - test/core/xds/xds_client_test.cc - test/core/xds/xds_transport_fake.cc @@ -21329,6 +21340,7 @@ targets: - src/proto/grpc/testing/xds/v3/tls.proto - src/proto/grpc/testing/xds/v3/typed_struct.proto - src/proto/grpc/testing/xds/v3/wrr_locality.proto + - src/cpp/client/global_callback_hook.cc - src/cpp/util/status.cc - test/core/xds/xds_cluster_resource_type_test.cc deps: @@ -21676,6 +21688,7 @@ targets: - src/proto/grpc/testing/xds/v3/endpoint.proto - src/proto/grpc/testing/xds/v3/health_check.proto - src/proto/grpc/testing/xds/v3/percent.proto + - src/cpp/client/global_callback_hook.cc - src/cpp/util/status.cc - test/core/xds/xds_endpoint_resource_type_test.cc deps: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index fe2176500b4..122a26ff39d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -206,6 +206,7 @@ Pod::Spec.new do |s| 'include/grpcpp/support/client_callback.h', 'include/grpcpp/support/client_interceptor.h', 'include/grpcpp/support/config.h', + 'include/grpcpp/support/global_callback_hook.h', 'include/grpcpp/support/interceptor.h', 'include/grpcpp/support/message_allocator.h', 'include/grpcpp/support/method_handler.h', @@ -1378,6 +1379,7 @@ Pod::Spec.new do |s| 'src/cpp/client/create_channel_internal.cc', 'src/cpp/client/create_channel_internal.h', 'src/cpp/client/create_channel_posix.cc', + 'src/cpp/client/global_callback_hook.cc', 'src/cpp/client/insecure_credentials.cc', 'src/cpp/client/secure_credentials.cc', 'src/cpp/client/secure_credentials.h', diff --git a/include/grpcpp/support/callback_common.h b/include/grpcpp/support/callback_common.h index 0de9cbcf050..a333407bb4e 100644 --- a/include/grpcpp/support/callback_common.h +++ b/include/grpcpp/support/callback_common.h @@ -30,6 +30,7 @@ #include #include #include +#include #include namespace grpc { @@ -127,7 +128,18 @@ class CallbackWithStatusTag : public grpc_completion_queue_functor { auto status = std::move(status_); func_ = nullptr; // reset to clear this out for sure status_ = Status(); // reset to clear this out for sure - CatchingCallback(std::move(func), std::move(status)); + GetGlobalCallbackHook()->RunCallback( + call_, [func = std::move(func), status = std::move(status)]() { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(status); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(status); +#endif // GRPC_ALLOW_EXCEPTIONS + }); grpc_call_unref(call_); } }; @@ -214,7 +226,17 @@ class CallbackWithSuccessTag : public grpc_completion_queue_functor { #endif if (do_callback) { - CatchingCallback(func_, ok); + GetGlobalCallbackHook()->RunCallback(call_, [this, ok]() { +#if GRPC_ALLOW_EXCEPTIONS + try { + func_(ok); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func_(ok); +#endif // GRPC_ALLOW_EXCEPTIONS + }); } } }; diff --git a/include/grpcpp/support/global_callback_hook.h b/include/grpcpp/support/global_callback_hook.h new file mode 100644 index 00000000000..c453bc807f4 --- /dev/null +++ b/include/grpcpp/support/global_callback_hook.h @@ -0,0 +1,58 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H +#define GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H + +#include "absl/functional/function_ref.h" + +struct grpc_call; + +namespace grpc { + +class GlobalCallbackHook { + public: + virtual ~GlobalCallbackHook() = default; + virtual void RunCallback(grpc_call* call, + absl::FunctionRef callback) = 0; + + protected: + // An exception-safe way of invoking a user-specified callback function. + template + void CatchingCallback(Func&& func, Args&&... args) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(std::forward(args)...); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(std::forward(args)...); +#endif // GRPC_ALLOW_EXCEPTIONS + } +}; + +class DefaultGlobalCallbackHook final : public GlobalCallbackHook { + public: + void RunCallback(grpc_call* call, + absl::FunctionRef callback) override { + CatchingCallback(callback); + } +}; + +std::shared_ptr GetGlobalCallbackHook(); +void SetGlobalCallbackHook(GlobalCallbackHook* hook); +} // namespace grpc + +#endif // GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H diff --git a/src/cpp/client/global_callback_hook.cc b/src/cpp/client/global_callback_hook.cc new file mode 100644 index 00000000000..2431508b7b3 --- /dev/null +++ b/src/cpp/client/global_callback_hook.cc @@ -0,0 +1,36 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "absl/base/no_destructor.h" +#include "absl/log/check.h" + +#include + +namespace grpc { + +static absl::NoDestructor> g_callback_hook( + std::make_shared()); + +std::shared_ptr GetGlobalCallbackHook() { + return *g_callback_hook; +} + +void SetGlobalCallbackHook(GlobalCallbackHook* hook) { + CHECK(hook != nullptr); + CHECK(hook != (*g_callback_hook).get()); + *g_callback_hook = std::shared_ptr(hook); +} +} // namespace grpc diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 40501c821bc..af16cffbd10 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -1073,6 +1073,7 @@ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ +include/grpcpp/support/global_callback_hook.h \ include/grpcpp/support/interceptor.h \ include/grpcpp/support/message_allocator.h \ include/grpcpp/support/method_handler.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 68987e28551..7843c17dae2 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1073,6 +1073,7 @@ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ +include/grpcpp/support/global_callback_hook.h \ include/grpcpp/support/interceptor.h \ include/grpcpp/support/message_allocator.h \ include/grpcpp/support/method_handler.h \ @@ -3046,6 +3047,7 @@ src/cpp/client/create_channel.cc \ src/cpp/client/create_channel_internal.cc \ src/cpp/client/create_channel_internal.h \ src/cpp/client/create_channel_posix.cc \ +src/cpp/client/global_callback_hook.cc \ src/cpp/client/insecure_credentials.cc \ src/cpp/client/secure_credentials.cc \ src/cpp/client/secure_credentials.h \ From 605a15e7eba35e5e1bb579967ada5d1dc88a9694 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 15 Aug 2024 16:20:27 -0700 Subject: [PATCH 12/12] [OTel C++] Fix race when adding and removing callbacks (#37485) Split off from https://github.com/grpc/grpc/pull/37425 We are adding and removing callbacks on the OpenTelemetry Async Instruments without synchronization. This opens us to races where we have an AddCallback and RemoveCallback operation happening at the same time. The correct result after these operations is to still have a callback registered with OpenTelemetry at the end, but the two operations could race and we could just decide to remove the OpenTelemetry callback. The fix delays removing OpenTelemetry callbacks to plugin destruction time. Closes #37485 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37485 from yashykt:FixRaceOTelGauge 016b0a41b54102c69e6828425c014ada2b066af2 PiperOrigin-RevId: 663492598 --- src/cpp/ext/otel/otel_plugin.cc | 67 ++++++++------ src/cpp/ext/otel/otel_plugin.h | 1 + test/cpp/ext/otel/otel_plugin_test.cc | 120 ++++++++++++++++++++++++-- 3 files changed, 152 insertions(+), 36 deletions(-) diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc index 6ae99378296..44c653d58b8 100644 --- a/src/cpp/ext/otel/otel_plugin.cc +++ b/src/cpp/ext/otel/otel_plugin.cc @@ -563,6 +563,38 @@ OpenTelemetryPluginImpl::OpenTelemetryPluginImpl( }); } +OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() { + for (const auto& instrument_data : instruments_data_) { + grpc_core::Match( + instrument_data.instrument, [](const Disabled&) {}, + [](const std::unique_ptr>&) {}, + [](const std::unique_ptr>&) { + }, + [](const std::unique_ptr< + opentelemetry::metrics::Histogram>&) {}, + [](const std::unique_ptr>&) { + }, + [](const std::unique_ptr>& state) { + CHECK(state->caches.empty()); + if (state->ot_callback_registered) { + state->instrument->RemoveCallback( + &CallbackGaugeState::CallbackGaugeCallback, + state.get()); + state->ot_callback_registered = false; + } + }, + [](const std::unique_ptr>& state) { + CHECK(state->caches.empty()); + if (state->ot_callback_registered) { + state->instrument->RemoveCallback( + &CallbackGaugeState::CallbackGaugeCallback, + state.get()); + state->ot_callback_registered = false; + } + }); + } +} + namespace { constexpr absl::string_view kLocality = "grpc.lb.locality"; } @@ -823,9 +855,6 @@ void OpenTelemetryPluginImpl::AddCallback( void OpenTelemetryPluginImpl::RemoveCallback( grpc_core::RegisteredMetricCallback* callback) { - std::vector< - absl::variant*, CallbackGaugeState*>> - gauges_that_need_to_remove_callback; { grpc_core::MutexLock lock(&mu_); callback_timestamps_.erase(callback); @@ -848,11 +877,6 @@ void OpenTelemetryPluginImpl::RemoveCallback( CHECK_NE(callback_gauge_state, nullptr); CHECK((*callback_gauge_state)->ot_callback_registered); CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u); - if ((*callback_gauge_state)->caches.empty()) { - gauges_that_need_to_remove_callback.push_back( - callback_gauge_state->get()); - (*callback_gauge_state)->ot_callback_registered = false; - } break; } case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { @@ -867,11 +891,6 @@ void OpenTelemetryPluginImpl::RemoveCallback( CHECK_NE(callback_gauge_state, nullptr); CHECK((*callback_gauge_state)->ot_callback_registered); CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u); - if ((*callback_gauge_state)->caches.empty()) { - gauges_that_need_to_remove_callback.push_back( - callback_gauge_state->get()); - (*callback_gauge_state)->ot_callback_registered = false; - } break; } default: @@ -880,21 +899,13 @@ void OpenTelemetryPluginImpl::RemoveCallback( } } } - // RemoveCallback internally grabs OpenTelemetry's observable_registry's - // lock. So we need to call it without our plugin lock otherwise we may - // deadlock. - for (const auto& gauge : gauges_that_need_to_remove_callback) { - grpc_core::Match( - gauge, - [](CallbackGaugeState* gauge) { - gauge->instrument->RemoveCallback( - &CallbackGaugeState::CallbackGaugeCallback, gauge); - }, - [](CallbackGaugeState* gauge) { - gauge->instrument->RemoveCallback( - &CallbackGaugeState::CallbackGaugeCallback, gauge); - }); - } + // Note that we are not removing the callback from OpenTelemetry immediately, + // and instead remove it when the plugin is destroyed. We just have a single + // callback per OpenTelemetry instrument which is a small number. If we decide + // to remove the callback immediately at this point, we need to make sure that + // 1) the callback is removed without holding mu_ and 2) we make sure that + // this does not race against a possible `AddCallback` operation. A potential + // way to do this is to use WorkSerializer. } template diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h index 70321fb021d..9b0c4328673 100644 --- a/src/cpp/ext/otel/otel_plugin.h +++ b/src/cpp/ext/otel/otel_plugin.h @@ -223,6 +223,7 @@ class OpenTelemetryPluginImpl absl::AnyInvocable< bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> channel_scope_filter); + ~OpenTelemetryPluginImpl() override; private: class ClientCallTracer; diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc index 75ff7daf0e7..22b971ac370 100644 --- a/test/cpp/ext/otel/otel_plugin_test.cc +++ b/test/cpp/ext/otel/otel_plugin_test.cc @@ -1715,14 +1715,8 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, InstrumentsEnabledTest) { EXPECT_FALSE(stats_plugins.IsInstrumentEnabled(counter_handle)); } -class OpenTelemetryPluginCallbackMetricsTest - : public OpenTelemetryPluginEnd2EndTest { - protected: - OpenTelemetryPluginCallbackMetricsTest() - : endpoint_config_(grpc_core::ChannelArgs()) {} - - grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config_; -}; +using OpenTelemetryPluginCallbackMetricsTest = + OpenTelemetryPluginNPCMetricsTest; // The callback minimal interval is longer than the OT reporting interval, so we // expect to collect duplicated (cached) values. @@ -1981,6 +1975,116 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, kOptionalLabelKeys, kOptionalLabelValuesSet2, 0.0, true)); } +// Verifies that callbacks are cleaned up when the OpenTelemetry plugin is +// destroyed. +TEST_F(OpenTelemetryPluginCallbackMetricsTest, VerifyCallbacksAreCleanedUp) { + constexpr absl::string_view kInt64CallbackGaugeMetric = + "yet_another_int64_callback_gauge"; + constexpr absl::string_view kDoubleCallbackGaugeMetric = + "yet_another_double_callback_gauge"; + auto integer_gauge_handle = + grpc_core::GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge( + kInt64CallbackGaugeMetric, "An int64 callback gauge.", "unit", + /*enable_by_default=*/true) + .Build(); + auto double_gauge_handle = + grpc_core::GlobalInstrumentsRegistry::RegisterCallbackDoubleGauge( + kDoubleCallbackGaugeMetric, "A double callback gauge.", "unit", + /*enable_by_default=*/true) + .Build(); + Init(std::move(Options().set_metric_names( + {kInt64CallbackGaugeMetric, kDoubleCallbackGaugeMetric}))); + auto stats_plugins = + grpc_core::GlobalStatsPluginRegistry::GetStatsPluginsForChannel( + grpc_core::experimental::StatsPluginChannelScope( + "dns:///localhost:8080", "", endpoint_config_)); + // Multiple callbacks for the same metrics, each reporting different + // label values. + int report_count_1 = 0; + int64_t int_value_1 = 1; + double double_value_1 = 0.5; + auto registered_metric_callback_1 = stats_plugins.RegisterCallback( + [&](grpc_core::CallbackMetricReporter& reporter) { + ++report_count_1; + reporter.Report(integer_gauge_handle, int_value_1++, {}, {}); + reporter.Report(double_gauge_handle, double_value_1++, {}, {}); + }, + grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(), + integer_gauge_handle, double_gauge_handle); + int report_count_2 = 0; + int64_t int_value_2 = 1; + double double_value_2 = 0.5; + auto registered_metric_callback_2 = stats_plugins.RegisterCallback( + [&](grpc_core::CallbackMetricReporter& reporter) { + ++report_count_2; + reporter.Report(integer_gauge_handle, int_value_2++, {}, {}); + reporter.Report(double_gauge_handle, double_value_2++, {}, {}); + }, + grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(), + integer_gauge_handle, double_gauge_handle); + constexpr int kIterations = 50; + { + MetricsCollectorThread collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { + return !data.contains(kInt64CallbackGaugeMetric) || + !data.contains(kDoubleCallbackGaugeMetric); + }}; + } + // Verify that callbacks are invoked + EXPECT_EQ(report_count_1, kIterations); + EXPECT_EQ(report_count_2, kIterations); + // Remove one of the callbacks + registered_metric_callback_1.reset(); + { + MetricsCollectorThread new_collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return false; }}; + } + EXPECT_EQ(report_count_1, kIterations); // No change since previous + EXPECT_EQ(report_count_2, 2 * kIterations); // Gets another kIterations + // Remove the other callback as well + registered_metric_callback_2.reset(); + MetricsCollectorThread new_new_collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return false; }}; + // We shouldn't get any new callbacks + EXPECT_THAT(new_new_collector.Stop(), ::testing::IsEmpty()); + EXPECT_EQ(report_count_1, kIterations); + EXPECT_EQ(report_count_2, 2 * kIterations); + // Reset stats plugins as well + grpc_core::GlobalStatsPluginRegistryTestPeer:: + ResetGlobalStatsPluginRegistry(); + registered_metric_callback_2.reset(); + MetricsCollectorThread new_new_new_collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return false; }}; + // Still no new callbacks + EXPECT_THAT(new_new_new_collector.Stop(), ::testing::IsEmpty()); + EXPECT_EQ(report_count_1, kIterations); + EXPECT_EQ(report_count_2, 2 * kIterations); +} + TEST(OpenTelemetryPluginMetricsEnablingDisablingTest, TestEnableDisableAPIs) { grpc::internal::OpenTelemetryPluginBuilderImpl builder; // First disable all metrics