Merge remote-tracking branch 'upstream/master' into gcp_service_account_identity_call_creds

pull/37510/head
Mark D. Roth 6 months ago
commit c35ea498cd
  1. 21
      BUILD
  2. 13
      CMakeLists.txt
  3. 13
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 1
      grpc.def
  6. 3
      include/grpc/support/log.h
  7. 26
      include/grpcpp/support/callback_common.h
  8. 58
      include/grpcpp/support/global_callback_hook.h
  9. 9
      src/core/handshaker/security/secure_endpoint.cc
  10. 11
      src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
  11. 2
      src/core/lib/experiments/experiments.yaml
  12. 146
      src/core/load_balancing/rls/rls.cc
  13. 105
      src/core/resolver/dns/c_ares/dns_resolver_ares.cc
  14. 196
      src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  15. 199
      src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc
  16. 7
      src/core/resolver/dns/c_ares/grpc_ares_wrapper.h
  17. 6
      src/core/util/log.cc
  18. 36
      src/cpp/client/global_callback_hook.cc
  19. 67
      src/cpp/ext/otel/otel_plugin.cc
  20. 1
      src/cpp/ext/otel/otel_plugin.h
  21. 46
      src/ruby/ext/grpc/rb_call_credentials.c
  22. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  23. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  24. 145
      test/cpp/ext/otel/otel_plugin_test.cc
  25. 1
      tools/doxygen/Doxyfile.c++
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 9
      tools/interop_matrix/client_matrix.py
  28. 5
      tools/run_tests/sanity/banned_functions.py

21
BUILD

@ -907,6 +907,7 @@ grpc_cc_library(
],
visibility = ["@grpc:grpc++_public_hdrs"],
deps = [
"global_callback_hook",
"grpc_public_hdrs",
"//src/core:gpr_atm",
],
@ -951,6 +952,7 @@ grpc_cc_library(
tags = ["nofixdeps"],
visibility = ["@grpc:public"],
deps = [
"global_callback_hook",
"grpc++_base",
"//src/core:gpr_atm",
"//src/core:slice",
@ -1260,6 +1262,7 @@ grpc_cc_library(
deps = [
"channel_arg_names",
"generic_stub_internal",
"global_callback_hook",
"gpr",
"grpc++_base_unsecure",
"grpc++_codegen_proto",
@ -2455,6 +2458,7 @@ grpc_cc_library(
"config",
"exec_ctx",
"generic_stub_internal",
"global_callback_hook",
"gpr",
"grpc",
"grpc++_codegen_proto",
@ -2544,6 +2548,7 @@ grpc_cc_library(
"config",
"exec_ctx",
"generic_stub_internal",
"global_callback_hook",
"gpr",
"grpc_base",
"grpc_core_credentials_header",
@ -4913,6 +4918,22 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "global_callback_hook",
srcs = [
"src/cpp/client/global_callback_hook.cc",
],
hdrs = [
"include/grpcpp/support/global_callback_hook.h",
],
external_deps = [
"absl/base:no_destructor",
"absl/log:check",
"absl/functional:function_ref",
],
language = "c++",
)
# TODO(yashykt): Remove the UPB definitions from here once they are no longer needed
### UPB Targets

13
CMakeLists.txt generated

@ -4182,6 +4182,7 @@ add_library(grpc++
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/client/xds_credentials.cc
@ -4465,6 +4466,7 @@ foreach(_hdr
include/grpcpp/support/client_callback.h
include/grpcpp/support/client_interceptor.h
include/grpcpp/support/config.h
include/grpcpp/support/global_callback_hook.h
include/grpcpp/support/interceptor.h
include/grpcpp/support/message_allocator.h
include/grpcpp/support/method_handler.h
@ -4938,6 +4940,7 @@ add_library(grpc++_unsecure
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/common/alarm.cc
src/cpp/common/channel_arguments.cc
@ -5209,6 +5212,7 @@ foreach(_hdr
include/grpcpp/support/client_callback.h
include/grpcpp/support/client_interceptor.h
include/grpcpp/support/config.h
include/grpcpp/support/global_callback_hook.h
include/grpcpp/support/interceptor.h
include/grpcpp/support/message_allocator.h
include/grpcpp/support/method_handler.h
@ -8266,6 +8270,7 @@ add_executable(binder_transport_test
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/common/alarm.cc
@ -13405,6 +13410,7 @@ add_executable(endpoint_binder_pool_test
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/common/alarm.cc
@ -14260,6 +14266,7 @@ add_executable(fake_binder_test
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/common/alarm.cc
@ -32271,6 +32278,7 @@ add_executable(transport_stream_receiver_test
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/common/alarm.cc
@ -33157,6 +33165,7 @@ add_executable(wire_reader_test
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/common/alarm.cc
@ -33267,6 +33276,7 @@ add_executable(wire_writer_test
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/global_callback_hook.cc
src/cpp/client/insecure_credentials.cc
src/cpp/client/secure_credentials.cc
src/cpp/common/alarm.cc
@ -33847,6 +33857,7 @@ add_executable(xds_client_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
src/cpp/client/global_callback_hook.cc
src/cpp/util/status.cc
test/core/xds/xds_client_test.cc
test/core/xds/xds_transport_fake.cc
@ -34142,6 +34153,7 @@ add_executable(xds_cluster_resource_type_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h
src/cpp/client/global_callback_hook.cc
src/cpp/util/status.cc
test/core/xds/xds_cluster_resource_type_test.cc
)
@ -35138,6 +35150,7 @@ add_executable(xds_endpoint_resource_type_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
src/cpp/client/global_callback_hook.cc
src/cpp/util/status.cc
test/core/xds/xds_endpoint_resource_type_test.cc
)

@ -3836,6 +3836,7 @@ libs:
- include/grpcpp/support/client_callback.h
- include/grpcpp/support/client_interceptor.h
- include/grpcpp/support/config.h
- include/grpcpp/support/global_callback_hook.h
- include/grpcpp/support/interceptor.h
- include/grpcpp/support/message_allocator.h
- include/grpcpp/support/method_handler.h
@ -3915,6 +3916,7 @@ libs:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/client/xds_credentials.cc
@ -4267,6 +4269,7 @@ libs:
- include/grpcpp/support/client_callback.h
- include/grpcpp/support/client_interceptor.h
- include/grpcpp/support/config.h
- include/grpcpp/support/global_callback_hook.h
- include/grpcpp/support/interceptor.h
- include/grpcpp/support/message_allocator.h
- include/grpcpp/support/method_handler.h
@ -4303,6 +4306,7 @@ libs:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/common/alarm.cc
- src/cpp/common/channel_arguments.cc
@ -6182,6 +6186,7 @@ targets:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/common/alarm.cc
@ -9668,6 +9673,7 @@ targets:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/common/alarm.cc
@ -10143,6 +10149,7 @@ targets:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/common/alarm.cc
@ -20379,6 +20386,7 @@ targets:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/common/alarm.cc
@ -20798,6 +20806,7 @@ targets:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/common/alarm.cc
@ -20909,6 +20918,7 @@ targets:
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/global_callback_hook.cc
- src/cpp/client/insecure_credentials.cc
- src/cpp/client/secure_credentials.cc
- src/cpp/common/alarm.cc
@ -21232,6 +21242,7 @@ targets:
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/discovery.proto
- src/proto/grpc/testing/xds/v3/percent.proto
- src/cpp/client/global_callback_hook.cc
- src/cpp/util/status.cc
- test/core/xds/xds_client_test.cc
- test/core/xds/xds_transport_fake.cc
@ -21329,6 +21340,7 @@ targets:
- src/proto/grpc/testing/xds/v3/tls.proto
- src/proto/grpc/testing/xds/v3/typed_struct.proto
- src/proto/grpc/testing/xds/v3/wrr_locality.proto
- src/cpp/client/global_callback_hook.cc
- src/cpp/util/status.cc
- test/core/xds/xds_cluster_resource_type_test.cc
deps:
@ -21676,6 +21688,7 @@ targets:
- src/proto/grpc/testing/xds/v3/endpoint.proto
- src/proto/grpc/testing/xds/v3/health_check.proto
- src/proto/grpc/testing/xds/v3/percent.proto
- src/cpp/client/global_callback_hook.cc
- src/cpp/util/status.cc
- test/core/xds/xds_endpoint_resource_type_test.cc
deps:

2
gRPC-C++.podspec generated

@ -206,6 +206,7 @@ Pod::Spec.new do |s|
'include/grpcpp/support/client_callback.h',
'include/grpcpp/support/client_interceptor.h',
'include/grpcpp/support/config.h',
'include/grpcpp/support/global_callback_hook.h',
'include/grpcpp/support/interceptor.h',
'include/grpcpp/support/message_allocator.h',
'include/grpcpp/support/method_handler.h',
@ -1378,6 +1379,7 @@ Pod::Spec.new do |s|
'src/cpp/client/create_channel_internal.cc',
'src/cpp/client/create_channel_internal.h',
'src/cpp/client/create_channel_posix.cc',
'src/cpp/client/global_callback_hook.cc',
'src/cpp/client/insecure_credentials.cc',
'src/cpp/client/secure_credentials.cc',
'src/cpp/client/secure_credentials.h',

1
grpc.def generated

@ -231,7 +231,6 @@ EXPORTS
gpr_cpu_num_cores
gpr_cpu_current_cpu
gpr_log
absl_vlog2_enabled
gpr_log_verbosity_init
gpr_format_message
gpr_strdup

@ -51,9 +51,6 @@ typedef enum gpr_log_severity {
GPRAPI void gpr_log(const char* file, int line, gpr_log_severity severity,
const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
/** Deprecated. **/
GPRAPI int absl_vlog2_enabled();
GPRAPI void gpr_log_verbosity_init(void);
#ifdef __cplusplus

@ -30,6 +30,7 @@
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/completion_queue_tag.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/global_callback_hook.h>
#include <grpcpp/support/status.h>
namespace grpc {
@ -127,7 +128,18 @@ class CallbackWithStatusTag : public grpc_completion_queue_functor {
auto status = std::move(status_);
func_ = nullptr; // reset to clear this out for sure
status_ = Status(); // reset to clear this out for sure
CatchingCallback(std::move(func), std::move(status));
GetGlobalCallbackHook()->RunCallback(
call_, [func = std::move(func), status = std::move(status)]() {
#if GRPC_ALLOW_EXCEPTIONS
try {
func(status);
} catch (...) {
// nothing to return or change here, just don't crash the library
}
#else // GRPC_ALLOW_EXCEPTIONS
func(status);
#endif // GRPC_ALLOW_EXCEPTIONS
});
grpc_call_unref(call_);
}
};
@ -214,7 +226,17 @@ class CallbackWithSuccessTag : public grpc_completion_queue_functor {
#endif
if (do_callback) {
CatchingCallback(func_, ok);
GetGlobalCallbackHook()->RunCallback(call_, [this, ok]() {
#if GRPC_ALLOW_EXCEPTIONS
try {
func_(ok);
} catch (...) {
// nothing to return or change here, just don't crash the library
}
#else // GRPC_ALLOW_EXCEPTIONS
func_(ok);
#endif // GRPC_ALLOW_EXCEPTIONS
});
}
}
};

@ -0,0 +1,58 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H
#define GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H
#include "absl/functional/function_ref.h"
struct grpc_call;
namespace grpc {
class GlobalCallbackHook {
public:
virtual ~GlobalCallbackHook() = default;
virtual void RunCallback(grpc_call* call,
absl::FunctionRef<void()> callback) = 0;
protected:
// An exception-safe way of invoking a user-specified callback function.
template <class Func, class... Args>
void CatchingCallback(Func&& func, Args&&... args) {
#if GRPC_ALLOW_EXCEPTIONS
try {
func(std::forward<Args>(args)...);
} catch (...) {
// nothing to return or change here, just don't crash the library
}
#else // GRPC_ALLOW_EXCEPTIONS
func(std::forward<Args>(args)...);
#endif // GRPC_ALLOW_EXCEPTIONS
}
};
class DefaultGlobalCallbackHook final : public GlobalCallbackHook {
public:
void RunCallback(grpc_call* call,
absl::FunctionRef<void()> callback) override {
CatchingCallback(callback);
}
};
std::shared_ptr<GlobalCallbackHook> GetGlobalCallbackHook();
void SetGlobalCallbackHook(GlobalCallbackHook* hook);
} // namespace grpc
#endif // GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H

@ -252,6 +252,13 @@ static void on_read(void* user_data, grpc_error_handle error) {
{
grpc_core::MutexLock l(&ep->read_mu);
// If we were shut down after this callback was scheduled with OK
// status but before it was invoked, we need to treat that as an error.
if (ep->wrapped_ep == nullptr && error.ok()) {
error = absl::CancelledError("secure endpoint shutdown");
}
uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
@ -505,8 +512,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
static void endpoint_destroy(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->read_mu.Lock();
ep->wrapped_ep.reset();
ep->memory_owner.Reset();
ep->read_mu.Unlock();
SECURE_ENDPOINT_UNREF(ep, "destroy");
}

@ -335,9 +335,18 @@ void CFStreamEndpointImpl::DoWrite(
continue;
}
size_t written_size =
CFIndex written_size =
CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size());
if (written_size < 0) {
auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
GRPC_TRACE_LOG(event_engine_endpoint, INFO)
<< "CFStream write error: " << status
<< ", written_size: " << written_size;
on_writable(status);
return;
}
total_written_size += written_size;
if (written_size < slice.size()) {
SliceBuffer written;

@ -105,7 +105,7 @@
test_tags: [flow_control_test]
- name: pick_first_new
description: New pick_first impl with memory reduction.
expiry: 2024/07/30
expiry: 2024/10/30
owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]
- name: promise_based_inproc_transport

@ -353,7 +353,8 @@ class RlsLb final : public LoadBalancingPolicy {
// is called after releasing it.
//
// Both methods grab the data they need from the parent object.
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
void StartUpdate(OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
void ExitIdleLocked() {
@ -397,14 +398,14 @@ class RlsLb final : public LoadBalancingPolicy {
};
// Note: We are forced to disable lock analysis here because
// Orphan() is called by Unref() which is called by RefCountedPtr<>, which
// Orphaned() is called by Unref() which is called by RefCountedPtr<>, which
// cannot have lock annotations for this particular caller.
void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
RefCountedPtr<RlsLb> lb_policy_;
std::string target_;
bool is_shutdown_ = false;
bool is_shutdown_ = false; // Protected by WorkSerializer
OrphanablePtr<ChildPolicyHandler> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
@ -503,12 +504,25 @@ class RlsLb final : public LoadBalancingPolicy {
// Returns a list of child policy wrappers on which FinishUpdate()
// needs to be called after releasing the lock.
std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
// Moves entry to the end of the LRU list.
void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
// Takes entries from child_policy_wrappers_ and appends them to the end
// of \a child_policy_wrappers.
void TakeChildPolicyWrappers(
std::vector<RefCountedPtr<ChildPolicyWrapper>>* child_policy_wrappers)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
child_policy_wrappers->insert(
child_policy_wrappers->end(),
std::make_move_iterator(child_policy_wrappers_.begin()),
std::make_move_iterator(child_policy_wrappers_.end()));
child_policy_wrappers_.clear();
}
private:
class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
public:
@ -566,19 +580,24 @@ class RlsLb final : public LoadBalancingPolicy {
// the caller. Otherwise, the entry found is returned to the caller. The
// entry returned to the user is considered recently used and its order in
// the LRU list of the cache is updated.
Entry* FindOrInsert(const RequestKey& key)
Entry* FindOrInsert(const RequestKey& key,
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
// Resizes the cache. If the new cache size is greater than the current size
// of the cache, do nothing. Otherwise, evict the oldest entries that
// exceed the new size limit of the cache.
void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
void Resize(size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
// Resets backoff of all the cache entries.
void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
// Shutdown the cache; clean-up and orphan all the stored cache entries.
void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
GRPC_MUST_USE_RESULT std::vector<RefCountedPtr<ChildPolicyWrapper>>
Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
void ReportMetricsLocked(CallbackMetricReporter& reporter)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
@ -594,7 +613,9 @@ class RlsLb final : public LoadBalancingPolicy {
// Evicts oversized cache elements when the current size is greater than
// the specified limit.
void MaybeShrinkSize(size_t bytes)
void MaybeShrinkSize(size_t bytes,
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
RlsLb* lb_policy_;
@ -857,7 +878,8 @@ absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
return Json::FromArray(std::move(array));
}
void RlsLb::ChildPolicyWrapper::StartUpdate() {
void RlsLb::ChildPolicyWrapper::StartUpdate(
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
ValidationErrors errors;
auto child_policy_config = InsertOrUpdateChildPolicyField(
lb_policy_->config_->child_policy_config_target_field_name(), target_,
@ -880,7 +902,7 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() {
pending_config_.reset();
picker_ = MakeRefCounted<TransientFailurePicker>(
absl::UnavailableError(config.status().message()));
child_policy_.reset();
*child_policy_to_delete = std::move(child_policy_);
} else {
pending_config_ = std::move(*config);
}
@ -934,9 +956,9 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
<< ": UpdateState(state=" << ConnectivityStateName(state)
<< ", status=" << status << ", picker=" << picker.get() << ")";
}
if (wrapper_->is_shutdown_) return;
{
MutexLock lock(&wrapper_->lb_policy_->mu_);
if (wrapper_->is_shutdown_) return;
// TODO(roth): It looks like this ignores subsequent TF updates that
// might change the status used to fail picks, which seems wrong.
if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
@ -946,7 +968,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
wrapper_->connectivity_state_ = state;
DCHECK(picker != nullptr);
if (picker != nullptr) {
wrapper_->picker_ = std::move(picker);
// We want to unref the picker after we release the lock.
wrapper_->picker_.swap(picker);
}
}
wrapper_->lb_policy_->UpdatePickerLocked();
@ -1194,18 +1217,19 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
lb_policy_->cache_.lru_list_.end(), key)) {}
void RlsLb::Cache::Entry::Orphan() {
// We should be holding RlsLB::mu_.
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
<< lru_iterator_->ToString() << ": cache entry evicted";
is_shutdown_ = true;
lb_policy_->cache_.lru_list_.erase(lru_iterator_);
lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
CHECK(child_policy_wrappers_.empty());
backoff_state_.reset();
if (backoff_timer_ != nullptr) {
backoff_timer_.reset();
lb_policy_->UpdatePickerAsync();
}
child_policy_wrappers_.clear();
Unref(DEBUG_LOCATION, "Orphan");
}
@ -1284,7 +1308,8 @@ void RlsLb::Cache::Entry::MarkUsed() {
std::vector<RlsLb::ChildPolicyWrapper*>
RlsLb::Cache::Entry::OnRlsResponseLocked(
ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
// Move the entry to the end of the LRU list.
MarkUsed();
// If the request failed, store the failed status and update the
@ -1345,7 +1370,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
if (it == lb_policy_->child_policy_map_.end()) {
auto new_child = MakeRefCounted<ChildPolicyWrapper>(
lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
new_child->StartUpdate();
new_child->StartUpdate(child_policy_to_delete);
child_policies_to_finish_update.push_back(new_child.get());
new_child_policy_wrappers.emplace_back(std::move(new_child));
} else {
@ -1382,12 +1407,15 @@ RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
return it->second.get();
}
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(
const RequestKey& key, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete) {
auto it = map_.find(key);
// If not found, create new entry.
if (it == map_.end()) {
size_t entry_size = EntrySizeForKey(key);
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size),
child_policy_wrappers_to_delete);
Entry* entry = new Entry(
lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
map_.emplace(key, OrphanablePtr<Entry>(entry));
@ -1405,11 +1433,13 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
return it->second.get();
}
void RlsLb::Cache::Resize(size_t bytes) {
void RlsLb::Cache::Resize(size_t bytes,
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes";
size_limit_ = bytes;
MaybeShrinkSize(size_limit_);
MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete);
}
void RlsLb::Cache::ResetAllBackoff() {
@ -1419,7 +1449,12 @@ void RlsLb::Cache::ResetAllBackoff() {
lb_policy_->UpdatePickerAsync();
}
void RlsLb::Cache::Shutdown() {
std::vector<RefCountedPtr<RlsLb::ChildPolicyWrapper>> RlsLb::Cache::Shutdown() {
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
for (auto& entry : map_) {
entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
}
map_.clear();
lru_list_.clear();
if (cleanup_timer_handle_.has_value() &&
@ -1429,6 +1464,7 @@ void RlsLb::Cache::Shutdown() {
<< "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
}
cleanup_timer_handle_.reset();
return child_policy_wrappers_to_delete;
}
void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
@ -1464,12 +1500,15 @@ void RlsLb::Cache::StartCleanupTimer() {
void RlsLb::Cache::OnCleanupTimer() {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
MutexLock lock(&lb_policy_->mu_);
if (!cleanup_timer_handle_.has_value()) return;
if (lb_policy_->is_shutdown_) return;
for (auto it = map_.begin(); it != map_.end();) {
if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
size_ -= it->second->Size();
it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
it = map_.erase(it);
} else {
++it;
@ -1483,7 +1522,9 @@ size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
return (key.Size() * 2) + sizeof(Entry);
}
void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
void RlsLb::Cache::MaybeShrinkSize(
size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete) {
while (size_ > bytes) {
auto lru_it = lru_list_.begin();
if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
@ -1494,6 +1535,7 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
<< "[rlslb " << lb_policy_ << "] LRU eviction: removing entry "
<< map_it->second.get() << " " << lru_it->ToString();
size_ -= map_it->second->Size();
map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
map_.erase(map_it);
}
GRPC_TRACE_LOG(rls_lb, INFO)
@ -1814,13 +1856,18 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
<< key_.ToString() << ": response info: " << response.ToString();
std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
{
MutexLock lock(&lb_policy_->mu_);
if (lb_policy_->is_shutdown_) return;
rls_channel_->ReportResponseLocked(response.status.ok());
Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
Cache::Entry* cache_entry =
lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete);
child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
std::move(response), std::move(backoff_state_));
std::move(response), std::move(backoff_state_),
&child_policy_to_delete);
lb_policy_->request_map_.erase(key_);
}
// Now that we've released the lock, finish the update on any newly
@ -1919,14 +1966,7 @@ RlsLb::RlsLb(Args args)
instance_uuid_(channel_args()
.GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
.value_or(GenerateUUID())),
cache_(this),
registered_metric_callback_(
channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
[this](CallbackMetricReporter& reporter) {
MutexLock lock(&mu_);
cache_.ReportMetricsLocked(reporter);
},
Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries)) {
cache_(this) {
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created";
}
@ -2006,6 +2046,9 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
}
}
// Now grab the lock to swap out the state it guards.
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
{
MutexLock lock(&mu_);
// Swap out RLS channel if needed.
@ -2017,19 +2060,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
// Resize cache if needed.
if (old_config == nullptr ||
config_->cache_size_bytes() != old_config->cache_size_bytes()) {
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()));
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()),
&child_policy_wrappers_to_delete);
}
// Start update of child policies if needed.
if (update_child_policies) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] starting child policy updates";
for (auto& p : child_policy_map_) {
p.second->StartUpdate();
p.second->StartUpdate(&child_policy_to_delete);
}
} else if (created_default_child) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] starting default child policy update";
default_child_policy_->StartUpdate();
default_child_policy_->StartUpdate(&child_policy_to_delete);
}
}
// Now that we've released the lock, finish update of child policies.
@ -2054,6 +2098,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
}
}
update_in_progress_ = false;
// On the initial update only, we set the gauge metric callback. We
// can't do this before the initial update, because otherwise the
// callback could be invoked before we've set state that we need for
// the label values (e.g., we'd add metrics with empty string for the
// RLS server name).
if (registered_metric_callback_ == nullptr) {
registered_metric_callback_ =
channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
[this](CallbackMetricReporter& reporter) {
MutexLock lock(&mu_);
cache_.ReportMetricsLocked(reporter);
},
Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries);
}
// In principle, we need to update the picker here only if the config
// fields used by the picker have changed. However, it seems fragile
// to check individual fields, since the picker logic could change in
@ -2090,14 +2148,20 @@ void RlsLb::ResetBackoffLocked() {
void RlsLb::ShutdownLocked() {
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown";
registered_metric_callback_.reset();
MutexLock lock(&mu_);
is_shutdown_ = true;
config_.reset(DEBUG_LOCATION, "ShutdownLocked");
RefCountedPtr<ChildPolicyWrapper> child_policy_to_delete;
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
OrphanablePtr<RlsChannel> rls_channel_to_delete;
{
MutexLock lock(&mu_);
is_shutdown_ = true;
config_.reset(DEBUG_LOCATION, "ShutdownLocked");
child_policy_wrappers_to_delete = cache_.Shutdown();
request_map_.clear();
rls_channel_to_delete = std::move(rls_channel_);
child_policy_to_delete = std::move(default_child_policy_);
}
channel_args_ = ChannelArgs();
cache_.Shutdown();
request_map_.clear();
rls_channel_.reset();
default_child_policy_.reset();
}
void RlsLb::UpdatePickerAsync() {

@ -106,9 +106,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(),
kDefaultSecurePort, resolver_->interested_parties(),
&on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving hostnames. hostname_request_:%p",
resolver_.get(), hostname_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << resolver_.get()
<< " Started resolving hostnames. hostname_request_:"
<< hostname_request_.get();
if (resolver_->enable_srv_queries_) {
Ref(DEBUG_LOCATION, "OnSRVResolved").release();
GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr);
@ -117,9 +118,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_srv_resolved_,
&balancer_addresses_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving SRV records. srv_request_:%p",
resolver_.get(), srv_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << resolver_.get()
<< " Started resolving SRV records. srv_request_:"
<< srv_request_.get();
}
if (resolver_->request_service_config_) {
Ref(DEBUG_LOCATION, "OnTXTResolved").release();
@ -129,9 +131,10 @@ class AresClientChannelDNSResolver final : public PollingResolver {
resolver_->name_to_resolve().c_str(),
resolver_->interested_parties(), &on_txt_resolved_,
&service_config_json_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving TXT records. txt_request_:%p",
resolver_.get(), txt_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << resolver_.get()
<< " Started resolving TXT records. txt_request_:"
<< txt_request_.get();
}
}
@ -219,8 +222,9 @@ AresClientChannelDNSResolver::AresClientChannelDNSResolver(
.value_or(GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS))) {}
AresClientChannelDNSResolver::~AresClientChannelDNSResolver() {
GRPC_CARES_TRACE_LOG("resolver:%p destroying AresClientChannelDNSResolver",
this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " destroying AresClientChannelDNSResolver";
}
OrphanablePtr<Orphanable> AresClientChannelDNSResolver::StartRequest() {
@ -283,15 +287,16 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) {
if (hostname_request_ != nullptr || srv_request_ != nullptr ||
txt_request_ != nullptr) {
GRPC_CARES_TRACE_LOG(
"resolver:%p OnResolved() waiting for results (hostname: %s, srv: %s, "
"txt: %s)",
this, hostname_request_ != nullptr ? "waiting" : "done",
srv_request_ != nullptr ? "waiting" : "done",
txt_request_ != nullptr ? "waiting" : "done");
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " OnResolved() waiting for results (hostname: "
<< (hostname_request_ != nullptr ? "waiting" : "done")
<< ", srv: " << (srv_request_ != nullptr ? "waiting" : "done")
<< ", txt: " << (txt_request_ != nullptr ? "waiting" : "done") << ")";
return absl::nullopt;
}
GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this << " OnResolved() proceeding";
Result result;
result.args = resolver_->channel_args();
// TODO(roth): Change logic to be able to report failures for addresses
@ -309,8 +314,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
absl::StrCat("failed to parse service config: ",
StatusToString(service_config_string.status())));
} else if (!service_config_string->empty()) {
GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s",
this, service_config_string->c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " selected service config choice: " << *service_config_string;
result.service_config = ServiceConfigImpl::Create(
resolver_->channel_args(), *service_config_string);
if (!result.service_config.ok()) {
@ -325,8 +331,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_);
}
} else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this,
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) resolver:" << this
<< " dns resolution failed: " << StatusToString(error);
std::string error_message;
grpc_error_get_str(error, StatusStrProperty::kDescription, &error_message);
absl::Status status = absl::UnavailableError(
@ -375,8 +382,9 @@ class AresDNSResolver final : public DNSResolver {
class AresRequest {
public:
virtual ~AresRequest() {
GRPC_CARES_TRACE_LOG("AresRequest:%p dtor ares_request_:%p", this,
grpc_ares_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresRequest:" << this
<< " dtor ares_request_:" << grpc_ares_request_.get();
resolver_->UnregisterRequest(task_handle());
grpc_pollset_set_destroy(pollset_set_);
}
@ -397,8 +405,9 @@ class AresDNSResolver final : public DNSResolver {
bool Cancel() {
MutexLock lock(&mu_);
if (grpc_ares_request_ != nullptr) {
GRPC_CARES_TRACE_LOG("AresRequest:%p Cancel ares_request_:%p", this,
grpc_ares_request_.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresRequest:" << this
<< " Cancel ares_request_:" << grpc_ares_request_.get();
if (completed_) return false;
// OnDnsLookupDone will still be run
completed_ = true;
@ -499,7 +508,8 @@ class AresDNSResolver final : public DNSResolver {
aba_token),
default_port_(default_port),
on_resolve_address_done_(std::move(on_resolve_address_done)) {
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p ctor", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this << " ctor";
}
std::unique_ptr<grpc_ares_request> MakeRequestLocked() override {
@ -508,13 +518,15 @@ class AresDNSResolver final : public DNSResolver {
name_server().c_str(), name().c_str(), default_port_.c_str(),
pollset_set(), on_dns_lookup_done(), &addresses_,
timeout().millis()));
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p Start ares_request_:%p",
this, ares_request.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request;
}
void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p OnComplete", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresHostnameRequest:" << this << " OnComplete";
if (!error.ok()) {
on_resolve_address_done_(grpc_error_to_absl_status(error));
return;
@ -550,7 +562,8 @@ class AresDNSResolver final : public DNSResolver {
: AresRequest(name, name_server, timeout, interested_parties, resolver,
aba_token),
on_resolve_address_done_(std::move(on_resolve_address_done)) {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p ctor", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " ctor";
}
std::unique_ptr<grpc_ares_request> MakeRequestLocked() override {
@ -558,13 +571,15 @@ class AresDNSResolver final : public DNSResolver {
std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_srv_ares(
name_server().c_str(), name().c_str(), pollset_set(),
on_dns_lookup_done(), &balancer_addresses_, timeout().millis()));
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this,
ares_request.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request;
}
void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " OnComplete";
if (!error.ok()) {
on_resolve_address_done_(grpc_error_to_absl_status(error));
return;
@ -596,7 +611,8 @@ class AresDNSResolver final : public DNSResolver {
: AresRequest(name, name_server, timeout, interested_parties, resolver,
aba_token),
on_resolved_(std::move(on_resolved)) {
GRPC_CARES_TRACE_LOG("AresTXTRequest:%p ctor", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresTXTRequest:" << this << " ctor";
}
~AresTXTRequest() override { gpr_free(service_config_json_); }
@ -606,13 +622,15 @@ class AresDNSResolver final : public DNSResolver {
std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_txt_ares(
name_server().c_str(), name().c_str(), pollset_set(),
on_dns_lookup_done(), &service_config_json_, timeout().millis()));
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this,
ares_request.get());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this
<< " Start ares_request_:" << ares_request.get();
return ares_request;
}
void OnComplete(grpc_error_handle error) override {
GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresSRVRequest:" << this << " OnComplete";
if (!error.ok()) {
on_resolved_(grpc_error_to_absl_status(error));
return;
@ -684,14 +702,15 @@ class AresDNSResolver final : public DNSResolver {
MutexLock lock(&mu_);
if (!open_requests_.contains(handle)) {
// Unknown request, possibly completed already, or an invalid handle.
GRPC_CARES_TRACE_LOG(
"AresDNSResolver:%p attempt to cancel unknown TaskHandle:%s", this,
HandleToString(handle).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresDNSResolver:" << this
<< " attempt to cancel unknown TaskHandle:" << HandleToString(handle);
return false;
}
auto* request = reinterpret_cast<AresRequest*>(handle.keys[0]);
GRPC_CARES_TRACE_LOG("AresDNSResolver:%p cancel ares_request:%p", this,
request);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) AresDNSResolver:" << this
<< " cancel ares_request:" << request;
return request->Cancel();
}

@ -133,8 +133,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
~GrpcPolledFdWindows() override {
GRPC_CARES_TRACE_LOG("fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ",
GetName(), shutdown_called_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| ~GrpcPolledFdWindows shutdown_called_: " << shutdown_called_;
CSliceUnref(read_buf_);
CSliceUnref(write_buf_);
CHECK_EQ(read_closure_, nullptr);
@ -173,10 +174,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void ContinueRegisterForOnReadableLocked() {
GRPC_CARES_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| ContinueRegisterForOnReadableLocked "
<< "wsa_connect_error_:" << wsa_connect_error_;
CHECK(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
@ -194,10 +195,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
&winsocket_->read_info.overlapped, nullptr)) {
int wsa_last_error = WSAGetLastError();
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG(
"fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
"msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnReadableLocked WSARecvFrom error code:|"
<< wsa_last_error << "| msg:|" << msg << "|";
gpr_free(msg);
if (wsa_last_error != WSA_IO_PENDING) {
ScheduleAndNullReadClosure(
@ -210,14 +211,15 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
if (socket_type_ == SOCK_DGRAM) {
GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called",
GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called";
} else {
CHECK(socket_type_ == SOCK_STREAM);
GRPC_CARES_TRACE_LOG(
"fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
"connect_done_: %d",
GetName(), tcp_write_state_, connect_done_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called tcp_write_state_: "
<< tcp_write_state_ << " connect_done_: " << connect_done_;
}
CHECK_EQ(write_closure_, nullptr);
write_closure_ = write_closure;
@ -234,10 +236,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void ContinueRegisterForOnWriteableLocked() {
GRPC_CARES_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| ContinueRegisterForOnWriteableLocked "
<< "wsa_connect_error_:" << wsa_connect_error_;
CHECK(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullWriteClosure(
@ -288,10 +290,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int /* flags */,
struct sockaddr* from, ares_socklen_t* from_len) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
"length:|%d|",
GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " RecvFrom called read_buf_has_data:" << read_buf_has_data_
<< " Current read buf length:" << GRPC_SLICE_LENGTH(read_buf_);
if (!read_buf_has_data_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
@ -340,20 +342,21 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
bytes_sent_ptr, flags, overlapped, nullptr);
*wsa_error_code = WSAGetLastError();
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
"overlapped:%p "
"return:%d *wsa_error_code:%d",
GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
overlapped, out, *wsa_error_code);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendWriteBuf WSASend buf.len:" << buf.len << " *bytes_sent_ptr:"
<< (bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0)
<< " overlapped:" << overlapped << " return:" << out
<< " *wsa_error_code:" << *wsa_error_code;
return out;
}
ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
GetName(), connect_done_, wsa_connect_error_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendV called connect_done_:" << connect_done_
<< " wsa_connect_error_:" << wsa_connect_error_;
if (!connect_done_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
@ -377,7 +380,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " SendVUDP called";
CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0);
CSliceUnref(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count);
@ -388,9 +392,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
write_buf_ = grpc_empty_slice();
wsa_error_ctx->SetWSAError(wsa_error_code);
char* msg = gpr_format_message(wsa_error_code);
GRPC_CARES_TRACE_LOG(
"fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
wsa_error_code, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendVUDP SendWriteBuf error code:" << wsa_error_code
<< " msg:" << msg;
gpr_free(msg);
return -1;
}
@ -406,8 +411,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
// out in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on
// this fd.
GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
GetName(), tcp_write_state_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " SendVTCP called tcp_write_state_:" << tcp_write_state_;
switch (tcp_write_state_) {
case WRITE_IDLE:
tcp_write_state_ = WRITE_REQUESTED;
@ -450,13 +456,13 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void OnTcpConnectLocked(grpc_error_handle error) {
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked error:|%s| "
"pending_register_for_readable:%d"
" pending_register_for_writeable:%d",
GetName(), StatusToString(error).c_str(),
pending_continue_register_for_on_readable_locked_,
pending_continue_register_for_on_writeable_locked_);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " InnerOnTcpConnectLocked error:" << StatusToString(error)
<< " pending_register_for_readable:"
<< pending_continue_register_for_on_readable_locked_
<< " pending_register_for_writeable:"
<< pending_continue_register_for_on_writeable_locked_;
CHECK(!connect_done_);
connect_done_ = true;
CHECK_EQ(wsa_connect_error_, 0);
@ -473,10 +479,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
"msg:|%s|",
GetName(), wsa_connect_error_, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " InnerOnTcpConnectLocked WSA overlapped result code:"
<< wsa_connect_error_ << " msg:" << msg;
gpr_free(msg);
}
}
@ -502,7 +508,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " ConnectUDP";
CHECK(!connect_done_);
CHECK_EQ(wsa_connect_error_, 0);
SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
@ -512,8 +519,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
wsa_error_ctx->SetWSAError(wsa_connect_error_);
connect_done_ = true;
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(),
wsa_connect_error_, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " WSAConnect error code:|"
<< wsa_connect_error_ << "| msg:|" << msg << "|";
gpr_free(msg);
// c-ares expects a posix-style connect API
return out == 0 ? 0 : -1;
@ -521,7 +529,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName() << " ConnectTCP";
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
@ -532,10 +541,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG(
"fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
"msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:"
<< wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
@ -555,8 +564,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(),
wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " bind error code:" << wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
@ -569,8 +579,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(),
wsa_last_error, msg);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << GetName()
<< " ConnectEx error code:" << wsa_last_error << " msg:|" << msg
<< "|";
gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
@ -610,11 +622,12 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
"OnIocpReadableInner");
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->read_info.wsa_error,
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|"
<< winsocket_->read_info.wsa_error << "| msg:|"
<< StatusToString(error) << "|";
}
}
}
@ -626,9 +639,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
CSliceUnref(read_buf_);
read_buf_ = grpc_empty_slice();
}
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
GRPC_SLICE_LENGTH(read_buf_));
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpReadable finishing. read buf length now:|"
<< GRPC_SLICE_LENGTH(read_buf_) << "|";
ScheduleAndNullReadClosure(error);
}
@ -639,17 +653,19 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
}
void OnIocpWriteableLocked(grpc_error_handle error) {
GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) OnIocpWriteableInner. fd:|" << GetName() << "|";
CHECK(socket_type_ == SOCK_STREAM);
if (error.ok()) {
if (winsocket_->write_info.wsa_error != 0) {
error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
"OnIocpWriteableInner");
GRPC_CARES_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->write_info.wsa_error,
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|"
<< winsocket_->write_info.wsa_error << "| msg:|"
<< StatusToString(error) << "|";
}
}
CHECK(tcp_write_state_ == WRITE_PENDING);
@ -657,8 +673,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info.bytes_transferred);
GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
GetName(), winsocket_->write_info.bytes_transferred);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. bytes transferred:"
<< winsocket_->write_info.bytes_transferred;
} else {
CSliceUnref(write_buf_);
write_buf_ = grpc_empty_slice();
@ -728,7 +746,9 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
//
static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
if (type != SOCK_DGRAM && type != SOCK_STREAM) {
GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) Socket called with invalid socket type:"
<< type;
return INVALID_SOCKET;
}
GrpcPolledFdFactoryWindows* self =
@ -736,15 +756,16 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
grpc_get_default_wsa_socket_flags());
if (s == INVALID_SOCKET) {
GRPC_CARES_TRACE_LOG(
"WSASocket failed with params af:%d type:%d protocol:%d", af, type,
protocol);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) WSASocket failed with params af:" << af
<< " type:" << type << " protocol:" << protocol;
return s;
}
grpc_error_handle error = grpc_tcp_set_non_block(s);
if (!error.ok()) {
GRPC_CARES_TRACE_LOG("WSAIoctl failed with error: %s",
StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) WSAIoctl failed with error: "
<< StatusToString(error);
return INVALID_SOCKET;
}
auto on_shutdown_locked = [self, s]() {
@ -755,9 +776,10 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory {
};
auto polled_fd = new GrpcPolledFdWindows(s, self->mu_, af, type,
std::move(on_shutdown_locked));
GRPC_CARES_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) fd:" << polled_fd->GetName()
<< " created with params af:" << af << " type:" << type
<< " protocol:" << protocol;
CHECK(self->sockets_.insert({s, polled_fd}).second);
return s;
}

@ -200,8 +200,9 @@ static absl::Status AresStatusToAbslStatus(int status,
static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request,
ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request << " Ref ev_driver "
<< ev_driver;
gpr_ref(&ev_driver->refs);
return ev_driver;
}
@ -211,11 +212,13 @@ static void grpc_ares_complete_request_locked(grpc_ares_request* r)
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request,
ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " Unref ev_driver " << ev_driver;
if (gpr_unref(&ev_driver->refs)) {
GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request,
ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " destroy ev_driver " << ev_driver;
CHECK_EQ(ev_driver->fds, nullptr);
ares_destroy(ev_driver->channel);
grpc_ares_complete_request_locked(ev_driver->request);
@ -225,8 +228,9 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver)
static void fd_node_destroy_locked(fd_node* fdn)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) {
GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << fdn->ev_driver->request
<< " delete fd: " << fdn->grpc_polled_fd->GetName();
CHECK(!fdn->readable_registered);
CHECK(!fdn->writable_registered);
CHECK(fdn->already_shutdown);
@ -292,21 +296,21 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm(
// by the c-ares code comments.
grpc_core::Duration until_next_ares_backup_poll_alarm =
grpc_core::Duration::Seconds(1);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p. next ares process poll time in "
"%" PRId64 " ms",
driver->request, driver, until_next_ares_backup_poll_alarm.millis());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver << ". next ares process poll time in "
<< until_next_ares_backup_poll_alarm.millis() << " ms";
return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm;
}
static void on_timeout(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. "
"err=%s",
driver->request, driver, driver->shutting_down,
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver
<< " on_timeout_locked. driver->shutting_down=" << driver->shutting_down
<< ". err=" << grpc_core::StatusToString(error);
if (!driver->shutting_down && error.ok()) {
grpc_ares_ev_driver_shutdown_locked(driver);
}
@ -327,20 +331,20 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
grpc_core::MutexLock lock(&driver->request->mu);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. "
"driver->shutting_down=%d. "
"err=%s",
driver->request, driver, driver->shutting_down,
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver
<< " on_ares_backup_poll_alarm_locked. driver->shutting_down="
<< driver->shutting_down << ". err=" << grpc_core::StatusToString(error);
if (!driver->shutting_down && error.ok()) {
fd_node* fdn = driver->fds;
while (fdn != nullptr) {
if (!fdn->already_shutdown) {
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; "
"ares_process_fd. fd=%s",
driver->request, driver, fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << driver->request
<< " ev_driver=" << driver
<< " on_ares_backup_poll_alarm_locked; ares_process_fd. fd="
<< fdn->grpc_polled_fd->GetName();
ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
ares_process_fd(driver->channel, as, as);
}
@ -373,8 +377,9 @@ static void on_readable(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->readable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << fdn->ev_driver->request
<< " readable on " << fdn->grpc_polled_fd->GetName();
if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} else {
@ -397,8 +402,9 @@ static void on_writable(void* arg, grpc_error_handle error) {
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->writable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request << " writable on "
<< fdn->grpc_polled_fd->GetName();
if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
} else {
@ -433,8 +439,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
fdn->grpc_polled_fd =
ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
socks[i], ev_driver->pollset_set);
GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " new fd: " << fdn->grpc_polled_fd->GetName();
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
@ -449,15 +456,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn,
grpc_schedule_on_exec_ctx);
if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) {
GRPC_CARES_TRACE_LOG("request:%p schedule direct read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " schedule direct read on: "
<< fdn->grpc_polled_fd->GetName();
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure,
absl::OkStatus());
} else {
GRPC_CARES_TRACE_LOG("request:%p notify read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " notify read on: " << fdn->grpc_polled_fd->GetName();
fdn->grpc_polled_fd->RegisterForOnReadableLocked(
&fdn->read_closure);
}
@ -467,9 +475,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
// has not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
GRPC_CARES_TRACE_LOG("request:%p notify write on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " notify write on: " << fdn->grpc_polled_fd->GetName();
grpc_ares_ev_driver_ref(ev_driver);
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn,
grpc_schedule_on_exec_ctx);
@ -505,10 +513,11 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver)
ev_driver->query_timeout_ms == 0
? grpc_core::Duration::Infinity()
: grpc_core::Duration::Milliseconds(ev_driver->query_timeout_ms);
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in "
"%" PRId64 " ms",
ev_driver->request, ev_driver, timeout.millis());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << ev_driver->request
<< " ev_driver=" << ev_driver
<< " grpc_ares_ev_driver_start_locked. timeout in " << timeout.millis()
<< " ms";
grpc_ares_ev_driver_ref(ev_driver);
GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver,
grpc_schedule_on_exec_ctx);
@ -547,7 +556,8 @@ grpc_error_handle grpc_ares_ev_driver_create_locked(
}
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
grpc_ares_test_only_inject_config(&(*ev_driver)->channel);
GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request);
GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << request
<< " grpc_ares_ev_driver_create_locked";
if (status != ARES_SUCCESS) {
grpc_error_handle err = GRPC_ERROR_CREATE(absl::StrCat(
"Failed to init ares channel. C-ares error: ", ares_strerror(status)));
@ -645,10 +655,10 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, const char* host, uint16_t port,
bool is_balancer, const char* qtype)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) {
GRPC_CARES_TRACE_LOG(
"request:%p create_hostbyname_request_locked host:%s port:%d "
"is_balancer:%d qtype:%s",
parent_request, host, port, is_balancer, qtype);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << parent_request
<< " create_hostbyname_request_locked host:" << host << " port:" << port
<< " is_balancer:" << is_balancer << " qtype:" << qtype;
grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request();
hr->parent_request = parent_request;
hr->host = gpr_strdup(host);
@ -675,9 +685,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request;
if (status == ARES_SUCCESS) {
GRPC_CARES_TRACE_LOG(
"request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r,
hr->qtype, hr->host);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_hostbyname_done_locked qtype=" << hr->qtype
<< " host=" << hr->host << " ARES_SUCCESS";
std::unique_ptr<EndpointAddressesList>* address_list_ptr =
hr->is_balancer ? r->balancer_addresses_out : r->addresses_out;
if (*address_list_ptr == nullptr) {
@ -701,10 +712,11 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
addr->sin6_port = hr->port;
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_CARES_TRACE_LOG(
"request:%p c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
r, output, ntohs(hr->port), addr->sin6_scope_id);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares resolver gets a AF_INET6 result: \n"
<< " addr: " << output << "\n port: " << ntohs(hr->port)
<< "\n sin6_scope_id: " << addr->sin6_scope_id << "\n";
break;
}
case AF_INET: {
@ -716,10 +728,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
addr->sin_port = hr->port;
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN);
GRPC_CARES_TRACE_LOG(
"request:%p c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
r, output, ntohs(hr->port));
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares resolver gets a AF_INET result: \n addr: " << output
<< "\n port: " << ntohs(hr->port) << "\n";
break;
}
}
@ -729,8 +741,9 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
std::string error_msg = absl::StrFormat(
"C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s",
hr->qtype, hr->host, hr->is_balancer, ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r,
error_msg.c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_hostbyname_done_locked: " << error_msg;
r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg),
r->error);
}
@ -745,13 +758,14 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
GrpcAresQuery* q = static_cast<GrpcAresQuery*>(arg);
grpc_ares_request* r = q->parent_request();
if (status == ARES_SUCCESS) {
GRPC_CARES_TRACE_LOG(
"request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r,
q->name().c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_srv_query_done_locked name=" << q->name() << " ARES_SUCCESS";
struct ares_srv_reply* reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r,
parse_status);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " ares_parse_srv_reply: " << parse_status;
if (parse_status == ARES_SUCCESS) {
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) {
@ -775,8 +789,9 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
std::string error_msg = absl::StrFormat(
"C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(),
ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r,
error_msg.c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_srv_query_done_locked: " << error_msg;
r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg),
r->error);
}
@ -797,8 +812,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
struct ares_txt_ext* result = nullptr;
struct ares_txt_ext* reply = nullptr;
if (status != ARES_SUCCESS) goto fail;
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r,
q->name().c_str());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " on_txt_done_locked name=" << q->name() << " ARES_SUCCESS";
status = ares_parse_txt_reply_ext(buf, len, &reply);
if (status != ARES_SUCCESS) goto fail;
// Find service config in TXT record.
@ -826,8 +842,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
service_config_len += result->length;
}
(*r->service_config_json_out)[service_config_len] = '\0';
GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r,
*r->service_config_json_out);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " found service config: " << *r->service_config_json_out;
}
// Clean up.
ares_free_data(reply);
@ -837,8 +854,8 @@ fail:
std::string error_msg =
absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s",
q->name(), ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r,
error_msg.c_str());
GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << r
<< " on_txt_done_locked " << error_msg;
r->error =
grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error);
}
@ -847,8 +864,9 @@ grpc_error_handle set_request_dns_server(grpc_ares_request* r,
absl::string_view dns_server)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) {
if (!dns_server.empty()) {
GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r,
dns_server.data());
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r << " Using DNS server "
<< dns_server.data();
grpc_resolved_address addr;
if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
r->dns_server_addr.family = AF_INET;
@ -1043,10 +1061,10 @@ static grpc_ares_request* grpc_dns_lookup_hostname_ares_impl(
r->ev_driver = nullptr;
r->on_done = on_done;
r->addresses_out = addrs;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_hostname_ares_impl name=%s, "
"default_port=%s",
r, name, default_port);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_hostname_ares_impl name=" << name
<< ", default_port=" << default_port;
// Early out if the target is an ipv4 or ipv6 literal.
if (resolve_as_ip_literal_locked(name, default_port, addrs)) {
grpc_ares_complete_request_locked(r);
@ -1097,8 +1115,9 @@ grpc_ares_request* grpc_dns_lookup_srv_ares_impl(
r->ev_driver = nullptr;
r->on_done = on_done;
r->balancer_addresses_out = balancer_addresses;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_srv_ares_impl name=%s", r, name);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_srv_ares_impl name=" << name;
grpc_error_handle error;
// Don't query for SRV records if the target is "localhost"
if (target_matches_localhost(name)) {
@ -1135,8 +1154,9 @@ grpc_ares_request* grpc_dns_lookup_txt_ares_impl(
r->ev_driver = nullptr;
r->on_done = on_done;
r->service_config_json_out = service_config_json;
GRPC_CARES_TRACE_LOG(
"request:%p c-ares grpc_dns_lookup_txt_ares_impl name=%s", r, name);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " c-ares grpc_dns_lookup_txt_ares_impl name=" << name;
grpc_error_handle error;
// Don't query for TXT records if the target is "localhost"
if (target_matches_localhost(name)) {
@ -1185,8 +1205,9 @@ grpc_ares_request* (*grpc_dns_lookup_txt_ares)(
static void grpc_cancel_ares_request_impl(grpc_ares_request* r) {
CHECK_NE(r, nullptr);
grpc_core::MutexLock lock(&r->mu);
GRPC_CARES_TRACE_LOG("request:%p grpc_cancel_ares_request ev_driver:%p", r,
r->ev_driver);
GRPC_TRACE_VLOG(cares_resolver, 2)
<< "(c-ares resolver) request:" << r
<< " grpc_cancel_ares_request ev_driver:" << r->ev_driver;
if (r->ev_driver != nullptr) {
grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
}

@ -39,13 +39,6 @@
#define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000
#define GRPC_CARES_TRACE_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) { \
VLOG(2) << "(c-ares resolver) " << absl::StrFormat(format, __VA_ARGS__); \
} \
} while (0)
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
struct grpc_ares_request {

@ -40,8 +40,6 @@ void gpr_unreachable_code(const char* reason, const char* file, int line) {
grpc_core::SourceLocation(file, line));
}
int absl_vlog2_enabled() { return ABSL_VLOG_IS_ON(2); }
int gpr_should_log(gpr_log_severity severity) {
switch (severity) {
case GPR_LOG_SEVERITY_ERROR:
@ -85,10 +83,6 @@ void gpr_log_message(const char* file, int line, gpr_log_severity severity,
}
void gpr_log_verbosity_init(void) {
// This is enabled in Github only.
// This ifndef is converted to ifdef internally by copybara.
// Internally grpc verbosity is managed using absl settings.
// So internally we avoid setting it like this.
#ifndef GRPC_VERBOSITY_MACRO
// SetMinLogLevel sets the value for the entire binary, not just gRPC.
// This setting will change things for other libraries/code that is unrelated

@ -0,0 +1,36 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include "absl/base/no_destructor.h"
#include "absl/log/check.h"
#include <grpcpp/support/global_callback_hook.h>
namespace grpc {
static absl::NoDestructor<std::shared_ptr<GlobalCallbackHook>> g_callback_hook(
std::make_shared<DefaultGlobalCallbackHook>());
std::shared_ptr<GlobalCallbackHook> GetGlobalCallbackHook() {
return *g_callback_hook;
}
void SetGlobalCallbackHook(GlobalCallbackHook* hook) {
CHECK(hook != nullptr);
CHECK(hook != (*g_callback_hook).get());
*g_callback_hook = std::shared_ptr<GlobalCallbackHook>(hook);
}
} // namespace grpc

@ -563,6 +563,38 @@ OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
});
}
OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() {
for (const auto& instrument_data : instruments_data_) {
grpc_core::Match(
instrument_data.instrument, [](const Disabled&) {},
[](const std::unique_ptr<opentelemetry::metrics::Counter<double>>&) {},
[](const std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>&) {
},
[](const std::unique_ptr<
opentelemetry::metrics::Histogram<uint64_t>>&) {},
[](const std::unique_ptr<opentelemetry::metrics::Histogram<double>>&) {
},
[](const std::unique_ptr<CallbackGaugeState<int64_t>>& state) {
CHECK(state->caches.empty());
if (state->ot_callback_registered) {
state->instrument->RemoveCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback,
state.get());
state->ot_callback_registered = false;
}
},
[](const std::unique_ptr<CallbackGaugeState<double>>& state) {
CHECK(state->caches.empty());
if (state->ot_callback_registered) {
state->instrument->RemoveCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback,
state.get());
state->ot_callback_registered = false;
}
});
}
}
namespace {
constexpr absl::string_view kLocality = "grpc.lb.locality";
}
@ -823,9 +855,6 @@ void OpenTelemetryPluginImpl::AddCallback(
void OpenTelemetryPluginImpl::RemoveCallback(
grpc_core::RegisteredMetricCallback* callback) {
std::vector<
absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
gauges_that_need_to_remove_callback;
{
grpc_core::MutexLock lock(&mu_);
callback_timestamps_.erase(callback);
@ -848,11 +877,6 @@ void OpenTelemetryPluginImpl::RemoveCallback(
CHECK_NE(callback_gauge_state, nullptr);
CHECK((*callback_gauge_state)->ot_callback_registered);
CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
if ((*callback_gauge_state)->caches.empty()) {
gauges_that_need_to_remove_callback.push_back(
callback_gauge_state->get());
(*callback_gauge_state)->ot_callback_registered = false;
}
break;
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
@ -867,11 +891,6 @@ void OpenTelemetryPluginImpl::RemoveCallback(
CHECK_NE(callback_gauge_state, nullptr);
CHECK((*callback_gauge_state)->ot_callback_registered);
CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
if ((*callback_gauge_state)->caches.empty()) {
gauges_that_need_to_remove_callback.push_back(
callback_gauge_state->get());
(*callback_gauge_state)->ot_callback_registered = false;
}
break;
}
default:
@ -880,21 +899,13 @@ void OpenTelemetryPluginImpl::RemoveCallback(
}
}
}
// RemoveCallback internally grabs OpenTelemetry's observable_registry's
// lock. So we need to call it without our plugin lock otherwise we may
// deadlock.
for (const auto& gauge : gauges_that_need_to_remove_callback) {
grpc_core::Match(
gauge,
[](CallbackGaugeState<int64_t>* gauge) {
gauge->instrument->RemoveCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
},
[](CallbackGaugeState<double>* gauge) {
gauge->instrument->RemoveCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
});
}
// Note that we are not removing the callback from OpenTelemetry immediately,
// and instead remove it when the plugin is destroyed. We just have a single
// callback per OpenTelemetry instrument which is a small number. If we decide
// to remove the callback immediately at this point, we need to make sure that
// 1) the callback is removed without holding mu_ and 2) we make sure that
// this does not race against a possible `AddCallback` operation. A potential
// way to do this is to use WorkSerializer.
}
template <typename ValueType>

@ -223,6 +223,7 @@ class OpenTelemetryPluginImpl
absl::AnyInvocable<
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter);
~OpenTelemetryPluginImpl() override;
private:
class ClientCallTracer;

@ -60,30 +60,30 @@ static VALUE grpc_rb_call_credentials_callback(VALUE args) {
VALUE callback_func = rb_ary_entry(args, 0);
VALUE callback_args = rb_ary_entry(args, 1);
VALUE md_ary_obj = rb_ary_entry(args, 2);
if (absl_vlog2_enabled()) {
VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0);
VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0);
VALUE callback_source_info =
rb_funcall(callback_func, rb_intern("source_location"), 0);
if (callback_source_info != Qnil) {
VALUE source_filename = rb_ary_entry(callback_source_info, 0);
VALUE source_line_number = rb_funcall(
rb_ary_entry(callback_source_info, 1), rb_intern("to_s"), 0);
gpr_log(GPR_DEBUG,
"GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| "
"source_filename:%s line_number:%s with arguments:|%s|",
StringValueCStr(callback_func_str),
StringValueCStr(source_filename),
StringValueCStr(source_line_number),
StringValueCStr(callback_args_str));
} else {
gpr_log(GPR_DEBUG,
"GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| "
"(failed to get source filename and line) with arguments:|%s|",
StringValueCStr(callback_func_str),
StringValueCStr(callback_args_str));
}
VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0);
VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0);
VALUE callback_source_info =
rb_funcall(callback_func, rb_intern("source_location"), 0);
if (callback_source_info != Qnil) {
VALUE source_filename = rb_ary_entry(callback_source_info, 0);
VALUE source_line_number =
rb_funcall(rb_ary_entry(callback_source_info, 1), rb_intern("to_s"), 0);
gpr_log(GPR_DEBUG,
"GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| "
"source_filename:%s line_number:%s with arguments:|%s|",
StringValueCStr(callback_func_str),
StringValueCStr(source_filename),
StringValueCStr(source_line_number),
StringValueCStr(callback_args_str));
} else {
gpr_log(GPR_DEBUG,
"GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| "
"(failed to get source filename and line) with arguments:|%s|",
StringValueCStr(callback_func_str),
StringValueCStr(callback_args_str));
}
VALUE metadata =
rb_funcall(callback_func, rb_intern("call"), 1, callback_args);
grpc_metadata_array* md_ary = NULL;

@ -254,7 +254,6 @@ gpr_free_aligned_type gpr_free_aligned_import;
gpr_cpu_num_cores_type gpr_cpu_num_cores_import;
gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import;
gpr_log_type gpr_log_import;
absl_vlog2_enabled_type absl_vlog2_enabled_import;
gpr_log_verbosity_init_type gpr_log_verbosity_init_import;
gpr_format_message_type gpr_format_message_import;
gpr_strdup_type gpr_strdup_import;
@ -539,7 +538,6 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_cpu_num_cores_import = (gpr_cpu_num_cores_type) GetProcAddress(library, "gpr_cpu_num_cores");
gpr_cpu_current_cpu_import = (gpr_cpu_current_cpu_type) GetProcAddress(library, "gpr_cpu_current_cpu");
gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log");
absl_vlog2_enabled_import = (absl_vlog2_enabled_type) GetProcAddress(library, "absl_vlog2_enabled");
gpr_log_verbosity_init_import = (gpr_log_verbosity_init_type) GetProcAddress(library, "gpr_log_verbosity_init");
gpr_format_message_import = (gpr_format_message_type) GetProcAddress(library, "gpr_format_message");
gpr_strdup_import = (gpr_strdup_type) GetProcAddress(library, "gpr_strdup");

@ -738,9 +738,6 @@ extern gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import;
typedef void(*gpr_log_type)(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import
typedef int(*absl_vlog2_enabled_type)();
extern absl_vlog2_enabled_type absl_vlog2_enabled_import;
#define absl_vlog2_enabled absl_vlog2_enabled_import
typedef void(*gpr_log_verbosity_init_type)(void);
extern gpr_log_verbosity_init_type gpr_log_verbosity_init_import;
#define gpr_log_verbosity_init gpr_log_verbosity_init_import

@ -1715,14 +1715,8 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, InstrumentsEnabledTest) {
EXPECT_FALSE(stats_plugins.IsInstrumentEnabled(counter_handle));
}
class OpenTelemetryPluginCallbackMetricsTest
: public OpenTelemetryPluginEnd2EndTest {
protected:
OpenTelemetryPluginCallbackMetricsTest()
: endpoint_config_(grpc_core::ChannelArgs()) {}
grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config_;
};
using OpenTelemetryPluginCallbackMetricsTest =
OpenTelemetryPluginNPCMetricsTest;
// The callback minimal interval is longer than the OT reporting interval, so we
// expect to collect duplicated (cached) values.
@ -1775,14 +1769,11 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest,
auto registered_metric_callback_1 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_1;
reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1,
reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1,
reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
;
},
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
integer_gauge_handle, double_gauge_handle);
@ -1792,12 +1783,8 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest,
auto registered_metric_callback_2 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_2;
reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
@ -1910,14 +1897,10 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest,
auto registered_metric_callback_1 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_1;
reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1,
reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1,
reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(),
integer_gauge_handle, double_gauge_handle);
@ -1927,12 +1910,8 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest,
auto registered_metric_callback_2 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_2;
reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
@ -1996,6 +1975,116 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest,
kOptionalLabelKeys, kOptionalLabelValuesSet2, 0.0, true));
}
// Verifies that callbacks are cleaned up when the OpenTelemetry plugin is
// destroyed.
TEST_F(OpenTelemetryPluginCallbackMetricsTest, VerifyCallbacksAreCleanedUp) {
constexpr absl::string_view kInt64CallbackGaugeMetric =
"yet_another_int64_callback_gauge";
constexpr absl::string_view kDoubleCallbackGaugeMetric =
"yet_another_double_callback_gauge";
auto integer_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
kInt64CallbackGaugeMetric, "An int64 callback gauge.", "unit",
/*enable_by_default=*/true)
.Build();
auto double_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackDoubleGauge(
kDoubleCallbackGaugeMetric, "A double callback gauge.", "unit",
/*enable_by_default=*/true)
.Build();
Init(std::move(Options().set_metric_names(
{kInt64CallbackGaugeMetric, kDoubleCallbackGaugeMetric})));
auto stats_plugins =
grpc_core::GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
grpc_core::experimental::StatsPluginChannelScope(
"dns:///localhost:8080", "", endpoint_config_));
// Multiple callbacks for the same metrics, each reporting different
// label values.
int report_count_1 = 0;
int64_t int_value_1 = 1;
double double_value_1 = 0.5;
auto registered_metric_callback_1 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_1;
reporter.Report(integer_gauge_handle, int_value_1++, {}, {});
reporter.Report(double_gauge_handle, double_value_1++, {}, {});
},
grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(),
integer_gauge_handle, double_gauge_handle);
int report_count_2 = 0;
int64_t int_value_2 = 1;
double double_value_2 = 0.5;
auto registered_metric_callback_2 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_2;
reporter.Report(integer_gauge_handle, int_value_2++, {}, {});
reporter.Report(double_gauge_handle, double_value_2++, {}, {});
},
grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(),
integer_gauge_handle, double_gauge_handle);
constexpr int kIterations = 50;
{
MetricsCollectorThread collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains(kInt64CallbackGaugeMetric) ||
!data.contains(kDoubleCallbackGaugeMetric);
}};
}
// Verify that callbacks are invoked
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, kIterations);
// Remove one of the callbacks
registered_metric_callback_1.reset();
{
MetricsCollectorThread new_collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return false; }};
}
EXPECT_EQ(report_count_1, kIterations); // No change since previous
EXPECT_EQ(report_count_2, 2 * kIterations); // Gets another kIterations
// Remove the other callback as well
registered_metric_callback_2.reset();
MetricsCollectorThread new_new_collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return false; }};
// We shouldn't get any new callbacks
EXPECT_THAT(new_new_collector.Stop(), ::testing::IsEmpty());
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, 2 * kIterations);
// Reset stats plugins as well
grpc_core::GlobalStatsPluginRegistryTestPeer::
ResetGlobalStatsPluginRegistry();
registered_metric_callback_2.reset();
MetricsCollectorThread new_new_new_collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return false; }};
// Still no new callbacks
EXPECT_THAT(new_new_new_collector.Stop(), ::testing::IsEmpty());
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, 2 * kIterations);
}
TEST(OpenTelemetryPluginMetricsEnablingDisablingTest, TestEnableDisableAPIs) {
grpc::internal::OpenTelemetryPluginBuilderImpl builder;
// First disable all metrics

@ -1073,6 +1073,7 @@ include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/client_interceptor.h \
include/grpcpp/support/config.h \
include/grpcpp/support/global_callback_hook.h \
include/grpcpp/support/interceptor.h \
include/grpcpp/support/message_allocator.h \
include/grpcpp/support/method_handler.h \

@ -1073,6 +1073,7 @@ include/grpcpp/support/channel_arguments.h \
include/grpcpp/support/client_callback.h \
include/grpcpp/support/client_interceptor.h \
include/grpcpp/support/config.h \
include/grpcpp/support/global_callback_hook.h \
include/grpcpp/support/interceptor.h \
include/grpcpp/support/message_allocator.h \
include/grpcpp/support/method_handler.h \
@ -3046,6 +3047,7 @@ src/cpp/client/create_channel.cc \
src/cpp/client/create_channel_internal.cc \
src/cpp/client/create_channel_internal.h \
src/cpp/client/create_channel_posix.cc \
src/cpp/client/global_callback_hook.cc \
src/cpp/client/insecure_credentials.cc \
src/cpp/client/secure_credentials.cc \
src/cpp/client/secure_credentials.h \

@ -134,6 +134,7 @@ LANG_RELEASE_MATRIX = {
("v1.62.0", ReleaseInfo()),
("v1.63.1", ReleaseInfo()),
("v1.64.1", ReleaseInfo()),
("v1.65.0", ReleaseInfo()),
]
),
"go": OrderedDict(
@ -799,6 +800,12 @@ LANG_RELEASE_MATRIX = {
runtimes=["python"], testcases_file="python__master"
),
),
(
"v1.65.0",
ReleaseInfo(
runtimes=["python"], testcases_file="python__master"
),
),
]
),
"node": OrderedDict(
@ -897,6 +904,7 @@ LANG_RELEASE_MATRIX = {
("v1.62.0", ReleaseInfo()),
("v1.63.0", ReleaseInfo()),
("v1.64.0", ReleaseInfo()),
("v1.65.0", ReleaseInfo()),
]
),
"php": OrderedDict(
@ -959,6 +967,7 @@ LANG_RELEASE_MATRIX = {
("v1.62.0", ReleaseInfo()),
("v1.63.0", ReleaseInfo()),
("v1.64.0", ReleaseInfo()),
("v1.65.0", ReleaseInfo()),
]
),
"csharp": OrderedDict(

@ -37,11 +37,6 @@ os.chdir(os.path.join(os.path.dirname(sys.argv[0]), "../../.."))
# Map of deprecated functions to allowlist files
DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"absl_vlog2_enabled(": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
"./src/ruby/ext/grpc/rb_call_credentials.c",
],
"gpr_log_severity": [
"./include/grpc/support/log.h",
"./src/core/util/android/log.cc",

Loading…
Cancel
Save