diff --git a/BUILD b/BUILD index eb4bc77f081..7cd081fb527 100644 --- a/BUILD +++ b/BUILD @@ -907,6 +907,7 @@ grpc_cc_library( ], visibility = ["@grpc:grpc++_public_hdrs"], deps = [ + "global_callback_hook", "grpc_public_hdrs", "//src/core:gpr_atm", ], @@ -951,6 +952,7 @@ grpc_cc_library( tags = ["nofixdeps"], visibility = ["@grpc:public"], deps = [ + "global_callback_hook", "grpc++_base", "//src/core:gpr_atm", "//src/core:slice", @@ -1260,6 +1262,7 @@ grpc_cc_library( deps = [ "channel_arg_names", "generic_stub_internal", + "global_callback_hook", "gpr", "grpc++_base_unsecure", "grpc++_codegen_proto", @@ -2455,6 +2458,7 @@ grpc_cc_library( "config", "exec_ctx", "generic_stub_internal", + "global_callback_hook", "gpr", "grpc", "grpc++_codegen_proto", @@ -2544,6 +2548,7 @@ grpc_cc_library( "config", "exec_ctx", "generic_stub_internal", + "global_callback_hook", "gpr", "grpc_base", "grpc_core_credentials_header", @@ -4913,6 +4918,22 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "global_callback_hook", + srcs = [ + "src/cpp/client/global_callback_hook.cc", + ], + hdrs = [ + "include/grpcpp/support/global_callback_hook.h", + ], + external_deps = [ + "absl/base:no_destructor", + "absl/log:check", + "absl/functional:function_ref", + ], + language = "c++", +) + # TODO(yashykt): Remove the UPB definitions from here once they are no longer needed ### UPB Targets diff --git a/CMakeLists.txt b/CMakeLists.txt index da48ca9f836..99a85a30376 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4182,6 +4182,7 @@ add_library(grpc++ src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/client/xds_credentials.cc @@ -4465,6 +4466,7 @@ foreach(_hdr include/grpcpp/support/client_callback.h include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h + include/grpcpp/support/global_callback_hook.h include/grpcpp/support/interceptor.h include/grpcpp/support/message_allocator.h include/grpcpp/support/method_handler.h @@ -4938,6 +4940,7 @@ add_library(grpc++_unsecure src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/common/alarm.cc src/cpp/common/channel_arguments.cc @@ -5209,6 +5212,7 @@ foreach(_hdr include/grpcpp/support/client_callback.h include/grpcpp/support/client_interceptor.h include/grpcpp/support/config.h + include/grpcpp/support/global_callback_hook.h include/grpcpp/support/interceptor.h include/grpcpp/support/message_allocator.h include/grpcpp/support/method_handler.h @@ -8266,6 +8270,7 @@ add_executable(binder_transport_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -13405,6 +13410,7 @@ add_executable(endpoint_binder_pool_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -14260,6 +14266,7 @@ add_executable(fake_binder_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -32271,6 +32278,7 @@ add_executable(transport_stream_receiver_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -33157,6 +33165,7 @@ add_executable(wire_reader_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -33267,6 +33276,7 @@ add_executable(wire_writer_test src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc + src/cpp/client/global_callback_hook.cc src/cpp/client/insecure_credentials.cc src/cpp/client/secure_credentials.cc src/cpp/common/alarm.cc @@ -33847,6 +33857,7 @@ add_executable(xds_client_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + src/cpp/client/global_callback_hook.cc src/cpp/util/status.cc test/core/xds/xds_client_test.cc test/core/xds/xds_transport_fake.cc @@ -34142,6 +34153,7 @@ add_executable(xds_cluster_resource_type_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h + src/cpp/client/global_callback_hook.cc src/cpp/util/status.cc test/core/xds/xds_cluster_resource_type_test.cc ) @@ -35138,6 +35150,7 @@ add_executable(xds_endpoint_resource_type_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + src/cpp/client/global_callback_hook.cc src/cpp/util/status.cc test/core/xds/xds_endpoint_resource_type_test.cc ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b95f89cafd6..abcef83cc2a 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -3836,6 +3836,7 @@ libs: - include/grpcpp/support/client_callback.h - include/grpcpp/support/client_interceptor.h - include/grpcpp/support/config.h + - include/grpcpp/support/global_callback_hook.h - include/grpcpp/support/interceptor.h - include/grpcpp/support/message_allocator.h - include/grpcpp/support/method_handler.h @@ -3915,6 +3916,7 @@ libs: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/client/xds_credentials.cc @@ -4267,6 +4269,7 @@ libs: - include/grpcpp/support/client_callback.h - include/grpcpp/support/client_interceptor.h - include/grpcpp/support/config.h + - include/grpcpp/support/global_callback_hook.h - include/grpcpp/support/interceptor.h - include/grpcpp/support/message_allocator.h - include/grpcpp/support/method_handler.h @@ -4303,6 +4306,7 @@ libs: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/common/alarm.cc - src/cpp/common/channel_arguments.cc @@ -6182,6 +6186,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -9668,6 +9673,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -10143,6 +10149,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -20379,6 +20386,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -20798,6 +20806,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -20909,6 +20918,7 @@ targets: - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc + - src/cpp/client/global_callback_hook.cc - src/cpp/client/insecure_credentials.cc - src/cpp/client/secure_credentials.cc - src/cpp/common/alarm.cc @@ -21232,6 +21242,7 @@ targets: - src/proto/grpc/testing/xds/v3/base.proto - src/proto/grpc/testing/xds/v3/discovery.proto - src/proto/grpc/testing/xds/v3/percent.proto + - src/cpp/client/global_callback_hook.cc - src/cpp/util/status.cc - test/core/xds/xds_client_test.cc - test/core/xds/xds_transport_fake.cc @@ -21329,6 +21340,7 @@ targets: - src/proto/grpc/testing/xds/v3/tls.proto - src/proto/grpc/testing/xds/v3/typed_struct.proto - src/proto/grpc/testing/xds/v3/wrr_locality.proto + - src/cpp/client/global_callback_hook.cc - src/cpp/util/status.cc - test/core/xds/xds_cluster_resource_type_test.cc deps: @@ -21676,6 +21688,7 @@ targets: - src/proto/grpc/testing/xds/v3/endpoint.proto - src/proto/grpc/testing/xds/v3/health_check.proto - src/proto/grpc/testing/xds/v3/percent.proto + - src/cpp/client/global_callback_hook.cc - src/cpp/util/status.cc - test/core/xds/xds_endpoint_resource_type_test.cc deps: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index fe2176500b4..122a26ff39d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -206,6 +206,7 @@ Pod::Spec.new do |s| 'include/grpcpp/support/client_callback.h', 'include/grpcpp/support/client_interceptor.h', 'include/grpcpp/support/config.h', + 'include/grpcpp/support/global_callback_hook.h', 'include/grpcpp/support/interceptor.h', 'include/grpcpp/support/message_allocator.h', 'include/grpcpp/support/method_handler.h', @@ -1378,6 +1379,7 @@ Pod::Spec.new do |s| 'src/cpp/client/create_channel_internal.cc', 'src/cpp/client/create_channel_internal.h', 'src/cpp/client/create_channel_posix.cc', + 'src/cpp/client/global_callback_hook.cc', 'src/cpp/client/insecure_credentials.cc', 'src/cpp/client/secure_credentials.cc', 'src/cpp/client/secure_credentials.h', diff --git a/grpc.def b/grpc.def index 4820cf14213..59bc8f2aa78 100644 --- a/grpc.def +++ b/grpc.def @@ -231,7 +231,6 @@ EXPORTS gpr_cpu_num_cores gpr_cpu_current_cpu gpr_log - absl_vlog2_enabled gpr_log_verbosity_init gpr_format_message gpr_strdup diff --git a/include/grpc/support/log.h b/include/grpc/support/log.h index 82e64a8c56f..d536bc85922 100644 --- a/include/grpc/support/log.h +++ b/include/grpc/support/log.h @@ -51,9 +51,6 @@ typedef enum gpr_log_severity { GPRAPI void gpr_log(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); -/** Deprecated. **/ -GPRAPI int absl_vlog2_enabled(); - GPRAPI void gpr_log_verbosity_init(void); #ifdef __cplusplus diff --git a/include/grpcpp/support/callback_common.h b/include/grpcpp/support/callback_common.h index 0de9cbcf050..a333407bb4e 100644 --- a/include/grpcpp/support/callback_common.h +++ b/include/grpcpp/support/callback_common.h @@ -30,6 +30,7 @@ #include #include #include +#include #include namespace grpc { @@ -127,7 +128,18 @@ class CallbackWithStatusTag : public grpc_completion_queue_functor { auto status = std::move(status_); func_ = nullptr; // reset to clear this out for sure status_ = Status(); // reset to clear this out for sure - CatchingCallback(std::move(func), std::move(status)); + GetGlobalCallbackHook()->RunCallback( + call_, [func = std::move(func), status = std::move(status)]() { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(status); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(status); +#endif // GRPC_ALLOW_EXCEPTIONS + }); grpc_call_unref(call_); } }; @@ -214,7 +226,17 @@ class CallbackWithSuccessTag : public grpc_completion_queue_functor { #endif if (do_callback) { - CatchingCallback(func_, ok); + GetGlobalCallbackHook()->RunCallback(call_, [this, ok]() { +#if GRPC_ALLOW_EXCEPTIONS + try { + func_(ok); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func_(ok); +#endif // GRPC_ALLOW_EXCEPTIONS + }); } } }; diff --git a/include/grpcpp/support/global_callback_hook.h b/include/grpcpp/support/global_callback_hook.h new file mode 100644 index 00000000000..c453bc807f4 --- /dev/null +++ b/include/grpcpp/support/global_callback_hook.h @@ -0,0 +1,58 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H +#define GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H + +#include "absl/functional/function_ref.h" + +struct grpc_call; + +namespace grpc { + +class GlobalCallbackHook { + public: + virtual ~GlobalCallbackHook() = default; + virtual void RunCallback(grpc_call* call, + absl::FunctionRef callback) = 0; + + protected: + // An exception-safe way of invoking a user-specified callback function. + template + void CatchingCallback(Func&& func, Args&&... args) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(std::forward(args)...); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(std::forward(args)...); +#endif // GRPC_ALLOW_EXCEPTIONS + } +}; + +class DefaultGlobalCallbackHook final : public GlobalCallbackHook { + public: + void RunCallback(grpc_call* call, + absl::FunctionRef callback) override { + CatchingCallback(callback); + } +}; + +std::shared_ptr GetGlobalCallbackHook(); +void SetGlobalCallbackHook(GlobalCallbackHook* hook); +} // namespace grpc + +#endif // GRPCPP_SUPPORT_GLOBAL_CALLBACK_HOOK_H diff --git a/src/core/handshaker/security/secure_endpoint.cc b/src/core/handshaker/security/secure_endpoint.cc index 490110b78b5..cf720f19d21 100644 --- a/src/core/handshaker/security/secure_endpoint.cc +++ b/src/core/handshaker/security/secure_endpoint.cc @@ -252,6 +252,13 @@ static void on_read(void* user_data, grpc_error_handle error) { { grpc_core::MutexLock l(&ep->read_mu); + + // If we were shut down after this callback was scheduled with OK + // status but before it was invoked, we need to treat that as an error. + if (ep->wrapped_ep == nullptr && error.ok()) { + error = absl::CancelledError("secure endpoint shutdown"); + } + uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer); uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); @@ -505,8 +512,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, static void endpoint_destroy(grpc_endpoint* secure_ep) { secure_endpoint* ep = reinterpret_cast(secure_ep); + ep->read_mu.Lock(); ep->wrapped_ep.reset(); ep->memory_owner.Reset(); + ep->read_mu.Unlock(); SECURE_ENDPOINT_UNREF(ep, "destroy"); } diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc index 47e67d8e4a4..4721437860b 100644 --- a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc @@ -335,9 +335,18 @@ void CFStreamEndpointImpl::DoWrite( continue; } - size_t written_size = + CFIndex written_size = CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size()); + if (written_size < 0) { + auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_)); + GRPC_TRACE_LOG(event_engine_endpoint, INFO) + << "CFStream write error: " << status + << ", written_size: " << written_size; + on_writable(status); + return; + } + total_written_size += written_size; if (written_size < slice.size()) { SliceBuffer written; diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index a244a416e98..13c466a7a34 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -105,7 +105,7 @@ test_tags: [flow_control_test] - name: pick_first_new description: New pick_first impl with memory reduction. - expiry: 2024/07/30 + expiry: 2024/10/30 owner: roth@google.com test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"] - name: promise_based_inproc_transport diff --git a/src/core/load_balancing/rls/rls.cc b/src/core/load_balancing/rls/rls.cc index ef2d44c4d28..1248cfd639e 100644 --- a/src/core/load_balancing/rls/rls.cc +++ b/src/core/load_balancing/rls/rls.cc @@ -353,7 +353,8 @@ class RlsLb final : public LoadBalancingPolicy { // is called after releasing it. // // Both methods grab the data they need from the parent object. - void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + void StartUpdate(OrphanablePtr* child_policy_to_delete) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_); void ExitIdleLocked() { @@ -397,14 +398,14 @@ class RlsLb final : public LoadBalancingPolicy { }; // Note: We are forced to disable lock analysis here because - // Orphan() is called by Unref() which is called by RefCountedPtr<>, which + // Orphaned() is called by Unref() which is called by RefCountedPtr<>, which // cannot have lock annotations for this particular caller. void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS; RefCountedPtr lb_policy_; std::string target_; - bool is_shutdown_ = false; + bool is_shutdown_ = false; // Protected by WorkSerializer OrphanablePtr child_policy_; RefCountedPtr pending_config_; @@ -503,12 +504,25 @@ class RlsLb final : public LoadBalancingPolicy { // Returns a list of child policy wrappers on which FinishUpdate() // needs to be called after releasing the lock. std::vector OnRlsResponseLocked( - ResponseInfo response, std::unique_ptr backoff_state) + ResponseInfo response, std::unique_ptr backoff_state, + OrphanablePtr* child_policy_to_delete) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Moves entry to the end of the LRU list. void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + // Takes entries from child_policy_wrappers_ and appends them to the end + // of \a child_policy_wrappers. + void TakeChildPolicyWrappers( + std::vector>* child_policy_wrappers) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { + child_policy_wrappers->insert( + child_policy_wrappers->end(), + std::make_move_iterator(child_policy_wrappers_.begin()), + std::make_move_iterator(child_policy_wrappers_.end())); + child_policy_wrappers_.clear(); + } + private: class BackoffTimer final : public InternallyRefCounted { public: @@ -566,19 +580,24 @@ class RlsLb final : public LoadBalancingPolicy { // the caller. Otherwise, the entry found is returned to the caller. The // entry returned to the user is considered recently used and its order in // the LRU list of the cache is updated. - Entry* FindOrInsert(const RequestKey& key) + Entry* FindOrInsert(const RequestKey& key, + std::vector>* + child_policy_wrappers_to_delete) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Resizes the cache. If the new cache size is greater than the current size // of the cache, do nothing. Otherwise, evict the oldest entries that // exceed the new size limit of the cache. - void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + void Resize(size_t bytes, std::vector>* + child_policy_wrappers_to_delete) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Resets backoff of all the cache entries. void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); // Shutdown the cache; clean-up and orphan all the stored cache entries. - void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); + GRPC_MUST_USE_RESULT std::vector> + Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); void ReportMetricsLocked(CallbackMetricReporter& reporter) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); @@ -594,7 +613,9 @@ class RlsLb final : public LoadBalancingPolicy { // Evicts oversized cache elements when the current size is greater than // the specified limit. - void MaybeShrinkSize(size_t bytes) + void MaybeShrinkSize(size_t bytes, + std::vector>* + child_policy_wrappers_to_delete) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); RlsLb* lb_policy_; @@ -857,7 +878,8 @@ absl::optional InsertOrUpdateChildPolicyField(const std::string& field, return Json::FromArray(std::move(array)); } -void RlsLb::ChildPolicyWrapper::StartUpdate() { +void RlsLb::ChildPolicyWrapper::StartUpdate( + OrphanablePtr* child_policy_to_delete) { ValidationErrors errors; auto child_policy_config = InsertOrUpdateChildPolicyField( lb_policy_->config_->child_policy_config_target_field_name(), target_, @@ -880,7 +902,7 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() { pending_config_.reset(); picker_ = MakeRefCounted( absl::UnavailableError(config.status().message())); - child_policy_.reset(); + *child_policy_to_delete = std::move(child_policy_); } else { pending_config_ = std::move(*config); } @@ -934,9 +956,9 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( << ": UpdateState(state=" << ConnectivityStateName(state) << ", status=" << status << ", picker=" << picker.get() << ")"; } + if (wrapper_->is_shutdown_) return; { MutexLock lock(&wrapper_->lb_policy_->mu_); - if (wrapper_->is_shutdown_) return; // TODO(roth): It looks like this ignores subsequent TF updates that // might change the status used to fail picks, which seems wrong. if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && @@ -946,7 +968,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( wrapper_->connectivity_state_ = state; DCHECK(picker != nullptr); if (picker != nullptr) { - wrapper_->picker_ = std::move(picker); + // We want to unref the picker after we release the lock. + wrapper_->picker_.swap(picker); } } wrapper_->lb_policy_->UpdatePickerLocked(); @@ -1194,18 +1217,19 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr lb_policy, lb_policy_->cache_.lru_list_.end(), key)) {} void RlsLb::Cache::Entry::Orphan() { + // We should be holding RlsLB::mu_. GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " " << lru_iterator_->ToString() << ": cache entry evicted"; is_shutdown_ = true; lb_policy_->cache_.lru_list_.erase(lru_iterator_); lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case. + CHECK(child_policy_wrappers_.empty()); backoff_state_.reset(); if (backoff_timer_ != nullptr) { backoff_timer_.reset(); lb_policy_->UpdatePickerAsync(); } - child_policy_wrappers_.clear(); Unref(DEBUG_LOCATION, "Orphan"); } @@ -1284,7 +1308,8 @@ void RlsLb::Cache::Entry::MarkUsed() { std::vector RlsLb::Cache::Entry::OnRlsResponseLocked( - ResponseInfo response, std::unique_ptr backoff_state) { + ResponseInfo response, std::unique_ptr backoff_state, + OrphanablePtr* child_policy_to_delete) { // Move the entry to the end of the LRU list. MarkUsed(); // If the request failed, store the failed status and update the @@ -1345,7 +1370,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked( if (it == lb_policy_->child_policy_map_.end()) { auto new_child = MakeRefCounted( lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target); - new_child->StartUpdate(); + new_child->StartUpdate(child_policy_to_delete); child_policies_to_finish_update.push_back(new_child.get()); new_child_policy_wrappers.emplace_back(std::move(new_child)); } else { @@ -1382,12 +1407,15 @@ RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) { return it->second.get(); } -RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { +RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert( + const RequestKey& key, std::vector>* + child_policy_wrappers_to_delete) { auto it = map_.find(key); // If not found, create new entry. if (it == map_.end()) { size_t entry_size = EntrySizeForKey(key); - MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size)); + MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size), + child_policy_wrappers_to_delete); Entry* entry = new Entry( lb_policy_->RefAsSubclass(DEBUG_LOCATION, "CacheEntry"), key); map_.emplace(key, OrphanablePtr(entry)); @@ -1405,11 +1433,13 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { return it->second.get(); } -void RlsLb::Cache::Resize(size_t bytes) { +void RlsLb::Cache::Resize(size_t bytes, + std::vector>* + child_policy_wrappers_to_delete) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes"; size_limit_ = bytes; - MaybeShrinkSize(size_limit_); + MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete); } void RlsLb::Cache::ResetAllBackoff() { @@ -1419,7 +1449,12 @@ void RlsLb::Cache::ResetAllBackoff() { lb_policy_->UpdatePickerAsync(); } -void RlsLb::Cache::Shutdown() { +std::vector> RlsLb::Cache::Shutdown() { + std::vector> + child_policy_wrappers_to_delete; + for (auto& entry : map_) { + entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete); + } map_.clear(); lru_list_.clear(); if (cleanup_timer_handle_.has_value() && @@ -1429,6 +1464,7 @@ void RlsLb::Cache::Shutdown() { << "[rlslb " << lb_policy_ << "] cache cleanup timer canceled"; } cleanup_timer_handle_.reset(); + return child_policy_wrappers_to_delete; } void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) { @@ -1464,12 +1500,15 @@ void RlsLb::Cache::StartCleanupTimer() { void RlsLb::Cache::OnCleanupTimer() { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_ << "] cache cleanup timer fired"; + std::vector> + child_policy_wrappers_to_delete; MutexLock lock(&lb_policy_->mu_); if (!cleanup_timer_handle_.has_value()) return; if (lb_policy_->is_shutdown_) return; for (auto it = map_.begin(); it != map_.end();) { if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) { size_ -= it->second->Size(); + it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete); it = map_.erase(it); } else { ++it; @@ -1483,7 +1522,9 @@ size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) { return (key.Size() * 2) + sizeof(Entry); } -void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { +void RlsLb::Cache::MaybeShrinkSize( + size_t bytes, std::vector>* + child_policy_wrappers_to_delete) { while (size_ > bytes) { auto lru_it = lru_list_.begin(); if (GPR_UNLIKELY(lru_it == lru_list_.end())) break; @@ -1494,6 +1535,7 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { << "[rlslb " << lb_policy_ << "] LRU eviction: removing entry " << map_it->second.get() << " " << lru_it->ToString(); size_ -= map_it->second->Size(); + map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete); map_.erase(map_it); } GRPC_TRACE_LOG(rls_lb, INFO) @@ -1814,13 +1856,18 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) { << "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " " << key_.ToString() << ": response info: " << response.ToString(); std::vector child_policies_to_finish_update; + std::vector> + child_policy_wrappers_to_delete; + OrphanablePtr child_policy_to_delete; { MutexLock lock(&lb_policy_->mu_); if (lb_policy_->is_shutdown_) return; rls_channel_->ReportResponseLocked(response.status.ok()); - Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_); + Cache::Entry* cache_entry = + lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete); child_policies_to_finish_update = cache_entry->OnRlsResponseLocked( - std::move(response), std::move(backoff_state_)); + std::move(response), std::move(backoff_state_), + &child_policy_to_delete); lb_policy_->request_map_.erase(key_); } // Now that we've released the lock, finish the update on any newly @@ -1919,14 +1966,7 @@ RlsLb::RlsLb(Args args) instance_uuid_(channel_args() .GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID) .value_or(GenerateUUID())), - cache_(this), - registered_metric_callback_( - channel_control_helper()->GetStatsPluginGroup().RegisterCallback( - [this](CallbackMetricReporter& reporter) { - MutexLock lock(&mu_); - cache_.ReportMetricsLocked(reporter); - }, - Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries)) { + cache_(this) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created"; } @@ -2006,6 +2046,9 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { } } // Now grab the lock to swap out the state it guards. + std::vector> + child_policy_wrappers_to_delete; + OrphanablePtr child_policy_to_delete; { MutexLock lock(&mu_); // Swap out RLS channel if needed. @@ -2017,19 +2060,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { // Resize cache if needed. if (old_config == nullptr || config_->cache_size_bytes() != old_config->cache_size_bytes()) { - cache_.Resize(static_cast(config_->cache_size_bytes())); + cache_.Resize(static_cast(config_->cache_size_bytes()), + &child_policy_wrappers_to_delete); } // Start update of child policies if needed. if (update_child_policies) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] starting child policy updates"; for (auto& p : child_policy_map_) { - p.second->StartUpdate(); + p.second->StartUpdate(&child_policy_to_delete); } } else if (created_default_child) { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] starting default child policy update"; - default_child_policy_->StartUpdate(); + default_child_policy_->StartUpdate(&child_policy_to_delete); } } // Now that we've released the lock, finish update of child policies. @@ -2054,6 +2098,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { } } update_in_progress_ = false; + // On the initial update only, we set the gauge metric callback. We + // can't do this before the initial update, because otherwise the + // callback could be invoked before we've set state that we need for + // the label values (e.g., we'd add metrics with empty string for the + // RLS server name). + if (registered_metric_callback_ == nullptr) { + registered_metric_callback_ = + channel_control_helper()->GetStatsPluginGroup().RegisterCallback( + [this](CallbackMetricReporter& reporter) { + MutexLock lock(&mu_); + cache_.ReportMetricsLocked(reporter); + }, + Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries); + } // In principle, we need to update the picker here only if the config // fields used by the picker have changed. However, it seems fragile // to check individual fields, since the picker logic could change in @@ -2090,14 +2148,20 @@ void RlsLb::ResetBackoffLocked() { void RlsLb::ShutdownLocked() { GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown"; registered_metric_callback_.reset(); - MutexLock lock(&mu_); - is_shutdown_ = true; - config_.reset(DEBUG_LOCATION, "ShutdownLocked"); + RefCountedPtr child_policy_to_delete; + std::vector> + child_policy_wrappers_to_delete; + OrphanablePtr rls_channel_to_delete; + { + MutexLock lock(&mu_); + is_shutdown_ = true; + config_.reset(DEBUG_LOCATION, "ShutdownLocked"); + child_policy_wrappers_to_delete = cache_.Shutdown(); + request_map_.clear(); + rls_channel_to_delete = std::move(rls_channel_); + child_policy_to_delete = std::move(default_child_policy_); + } channel_args_ = ChannelArgs(); - cache_.Shutdown(); - request_map_.clear(); - rls_channel_.reset(); - default_child_policy_.reset(); } void RlsLb::UpdatePickerAsync() { diff --git a/src/core/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/resolver/dns/c_ares/dns_resolver_ares.cc index 87f4d7f3f24..67993abc5b7 100644 --- a/src/core/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/resolver/dns/c_ares/dns_resolver_ares.cc @@ -106,9 +106,10 @@ class AresClientChannelDNSResolver final : public PollingResolver { resolver_->authority().c_str(), resolver_->name_to_resolve().c_str(), kDefaultSecurePort, resolver_->interested_parties(), &on_hostname_resolved_, &addresses_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving hostnames. hostname_request_:%p", - resolver_.get(), hostname_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << resolver_.get() + << " Started resolving hostnames. hostname_request_:" + << hostname_request_.get(); if (resolver_->enable_srv_queries_) { Ref(DEBUG_LOCATION, "OnSRVResolved").release(); GRPC_CLOSURE_INIT(&on_srv_resolved_, OnSRVResolved, this, nullptr); @@ -117,9 +118,10 @@ class AresClientChannelDNSResolver final : public PollingResolver { resolver_->name_to_resolve().c_str(), resolver_->interested_parties(), &on_srv_resolved_, &balancer_addresses_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving SRV records. srv_request_:%p", - resolver_.get(), srv_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << resolver_.get() + << " Started resolving SRV records. srv_request_:" + << srv_request_.get(); } if (resolver_->request_service_config_) { Ref(DEBUG_LOCATION, "OnTXTResolved").release(); @@ -129,9 +131,10 @@ class AresClientChannelDNSResolver final : public PollingResolver { resolver_->name_to_resolve().c_str(), resolver_->interested_parties(), &on_txt_resolved_, &service_config_json_, resolver_->query_timeout_ms_)); - GRPC_CARES_TRACE_LOG( - "resolver:%p Started resolving TXT records. txt_request_:%p", - resolver_.get(), txt_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << resolver_.get() + << " Started resolving TXT records. txt_request_:" + << txt_request_.get(); } } @@ -219,8 +222,9 @@ AresClientChannelDNSResolver::AresClientChannelDNSResolver( .value_or(GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS))) {} AresClientChannelDNSResolver::~AresClientChannelDNSResolver() { - GRPC_CARES_TRACE_LOG("resolver:%p destroying AresClientChannelDNSResolver", - this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " destroying AresClientChannelDNSResolver"; } OrphanablePtr AresClientChannelDNSResolver::StartRequest() { @@ -283,15 +287,16 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(on_resolved_mu_) { if (hostname_request_ != nullptr || srv_request_ != nullptr || txt_request_ != nullptr) { - GRPC_CARES_TRACE_LOG( - "resolver:%p OnResolved() waiting for results (hostname: %s, srv: %s, " - "txt: %s)", - this, hostname_request_ != nullptr ? "waiting" : "done", - srv_request_ != nullptr ? "waiting" : "done", - txt_request_ != nullptr ? "waiting" : "done"); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " OnResolved() waiting for results (hostname: " + << (hostname_request_ != nullptr ? "waiting" : "done") + << ", srv: " << (srv_request_ != nullptr ? "waiting" : "done") + << ", txt: " << (txt_request_ != nullptr ? "waiting" : "done") << ")"; return absl::nullopt; } - GRPC_CARES_TRACE_LOG("resolver:%p OnResolved() proceeding", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this << " OnResolved() proceeding"; Result result; result.args = resolver_->channel_args(); // TODO(roth): Change logic to be able to report failures for addresses @@ -309,8 +314,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( absl::StrCat("failed to parse service config: ", StatusToString(service_config_string.status()))); } else if (!service_config_string->empty()) { - GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", - this, service_config_string->c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " selected service config choice: " << *service_config_string; result.service_config = ServiceConfigImpl::Create( resolver_->channel_args(), *service_config_string); if (!result.service_config.ok()) { @@ -325,8 +331,9 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked( SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_); } } else { - GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this, - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) resolver:" << this + << " dns resolution failed: " << StatusToString(error); std::string error_message; grpc_error_get_str(error, StatusStrProperty::kDescription, &error_message); absl::Status status = absl::UnavailableError( @@ -375,8 +382,9 @@ class AresDNSResolver final : public DNSResolver { class AresRequest { public: virtual ~AresRequest() { - GRPC_CARES_TRACE_LOG("AresRequest:%p dtor ares_request_:%p", this, - grpc_ares_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresRequest:" << this + << " dtor ares_request_:" << grpc_ares_request_.get(); resolver_->UnregisterRequest(task_handle()); grpc_pollset_set_destroy(pollset_set_); } @@ -397,8 +405,9 @@ class AresDNSResolver final : public DNSResolver { bool Cancel() { MutexLock lock(&mu_); if (grpc_ares_request_ != nullptr) { - GRPC_CARES_TRACE_LOG("AresRequest:%p Cancel ares_request_:%p", this, - grpc_ares_request_.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresRequest:" << this + << " Cancel ares_request_:" << grpc_ares_request_.get(); if (completed_) return false; // OnDnsLookupDone will still be run completed_ = true; @@ -499,7 +508,8 @@ class AresDNSResolver final : public DNSResolver { aba_token), default_port_(default_port), on_resolve_address_done_(std::move(on_resolve_address_done)) { - GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p ctor", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresHostnameRequest:" << this << " ctor"; } std::unique_ptr MakeRequestLocked() override { @@ -508,13 +518,15 @@ class AresDNSResolver final : public DNSResolver { name_server().c_str(), name().c_str(), default_port_.c_str(), pollset_set(), on_dns_lookup_done(), &addresses_, timeout().millis())); - GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p Start ares_request_:%p", - this, ares_request.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresHostnameRequest:" << this + << " Start ares_request_:" << ares_request.get(); return ares_request; } void OnComplete(grpc_error_handle error) override { - GRPC_CARES_TRACE_LOG("AresHostnameRequest:%p OnComplete", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresHostnameRequest:" << this << " OnComplete"; if (!error.ok()) { on_resolve_address_done_(grpc_error_to_absl_status(error)); return; @@ -550,7 +562,8 @@ class AresDNSResolver final : public DNSResolver { : AresRequest(name, name_server, timeout, interested_parties, resolver, aba_token), on_resolve_address_done_(std::move(on_resolve_address_done)) { - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p ctor", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this << " ctor"; } std::unique_ptr MakeRequestLocked() override { @@ -558,13 +571,15 @@ class AresDNSResolver final : public DNSResolver { std::unique_ptr(grpc_dns_lookup_srv_ares( name_server().c_str(), name().c_str(), pollset_set(), on_dns_lookup_done(), &balancer_addresses_, timeout().millis())); - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this, - ares_request.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this + << " Start ares_request_:" << ares_request.get(); return ares_request; } void OnComplete(grpc_error_handle error) override { - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this << " OnComplete"; if (!error.ok()) { on_resolve_address_done_(grpc_error_to_absl_status(error)); return; @@ -596,7 +611,8 @@ class AresDNSResolver final : public DNSResolver { : AresRequest(name, name_server, timeout, interested_parties, resolver, aba_token), on_resolved_(std::move(on_resolved)) { - GRPC_CARES_TRACE_LOG("AresTXTRequest:%p ctor", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresTXTRequest:" << this << " ctor"; } ~AresTXTRequest() override { gpr_free(service_config_json_); } @@ -606,13 +622,15 @@ class AresDNSResolver final : public DNSResolver { std::unique_ptr(grpc_dns_lookup_txt_ares( name_server().c_str(), name().c_str(), pollset_set(), on_dns_lookup_done(), &service_config_json_, timeout().millis())); - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p Start ares_request_:%p", this, - ares_request.get()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this + << " Start ares_request_:" << ares_request.get(); return ares_request; } void OnComplete(grpc_error_handle error) override { - GRPC_CARES_TRACE_LOG("AresSRVRequest:%p OnComplete", this); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresSRVRequest:" << this << " OnComplete"; if (!error.ok()) { on_resolved_(grpc_error_to_absl_status(error)); return; @@ -684,14 +702,15 @@ class AresDNSResolver final : public DNSResolver { MutexLock lock(&mu_); if (!open_requests_.contains(handle)) { // Unknown request, possibly completed already, or an invalid handle. - GRPC_CARES_TRACE_LOG( - "AresDNSResolver:%p attempt to cancel unknown TaskHandle:%s", this, - HandleToString(handle).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresDNSResolver:" << this + << " attempt to cancel unknown TaskHandle:" << HandleToString(handle); return false; } auto* request = reinterpret_cast(handle.keys[0]); - GRPC_CARES_TRACE_LOG("AresDNSResolver:%p cancel ares_request:%p", this, - request); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) AresDNSResolver:" << this + << " cancel ares_request:" << request; return request->Cancel(); } diff --git a/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index ef70bfc5b67..142958f104f 100644 --- a/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -133,8 +133,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } ~GrpcPolledFdWindows() override { - GRPC_CARES_TRACE_LOG("fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ", - GetName(), shutdown_called_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| ~GrpcPolledFdWindows shutdown_called_: " << shutdown_called_; CSliceUnref(read_buf_); CSliceUnref(write_buf_); CHECK_EQ(read_closure_, nullptr); @@ -173,10 +174,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void ContinueRegisterForOnReadableLocked() { - GRPC_CARES_TRACE_LOG( - "fd:|%s| ContinueRegisterForOnReadableLocked " - "wsa_connect_error_:%d", - GetName(), wsa_connect_error_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| ContinueRegisterForOnReadableLocked " + << "wsa_connect_error_:" << wsa_connect_error_; CHECK(connect_done_); if (wsa_connect_error_ != 0) { ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect")); @@ -194,10 +195,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { &winsocket_->read_info.overlapped, nullptr)) { int wsa_last_error = WSAGetLastError(); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG( - "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| " - "msg:|%s|", - GetName(), wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| RegisterForOnReadableLocked WSARecvFrom error code:|" + << wsa_last_error << "| msg:|" << msg << "|"; gpr_free(msg); if (wsa_last_error != WSA_IO_PENDING) { ScheduleAndNullReadClosure( @@ -210,14 +211,15 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { if (socket_type_ == SOCK_DGRAM) { - GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called", - GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| RegisterForOnWriteableLocked called"; } else { CHECK(socket_type_ == SOCK_STREAM); - GRPC_CARES_TRACE_LOG( - "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d " - "connect_done_: %d", - GetName(), tcp_write_state_, connect_done_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| RegisterForOnWriteableLocked called tcp_write_state_: " + << tcp_write_state_ << " connect_done_: " << connect_done_; } CHECK_EQ(write_closure_, nullptr); write_closure_ = write_closure; @@ -234,10 +236,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void ContinueRegisterForOnWriteableLocked() { - GRPC_CARES_TRACE_LOG( - "fd:|%s| ContinueRegisterForOnWriteableLocked " - "wsa_connect_error_:%d", - GetName(), wsa_connect_error_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| ContinueRegisterForOnWriteableLocked " + << "wsa_connect_error_:" << wsa_connect_error_; CHECK(connect_done_); if (wsa_connect_error_ != 0) { ScheduleAndNullWriteClosure( @@ -288,10 +290,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, ares_socket_t data_len, int /* flags */, struct sockaddr* from, ares_socklen_t* from_len) { - GRPC_CARES_TRACE_LOG( - "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf " - "length:|%d|", - GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_)); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " RecvFrom called read_buf_has_data:" << read_buf_has_data_ + << " Current read buf length:" << GRPC_SLICE_LENGTH(read_buf_); if (!read_buf_has_data_) { wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); return -1; @@ -340,20 +342,21 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1, bytes_sent_ptr, flags, overlapped, nullptr); *wsa_error_code = WSAGetLastError(); - GRPC_CARES_TRACE_LOG( - "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d " - "overlapped:%p " - "return:%d *wsa_error_code:%d", - GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0, - overlapped, out, *wsa_error_code); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendWriteBuf WSASend buf.len:" << buf.len << " *bytes_sent_ptr:" + << (bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0) + << " overlapped:" << overlapped << " return:" << out + << " *wsa_error_code:" << *wsa_error_code; return out; } ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov, int iov_count) { - GRPC_CARES_TRACE_LOG( - "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d", - GetName(), connect_done_, wsa_connect_error_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendV called connect_done_:" << connect_done_ + << " wsa_connect_error_:" << wsa_connect_error_; if (!connect_done_) { wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK); return -1; @@ -377,7 +380,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { // c-ares doesn't handle retryable errors on writes of UDP sockets. // Therefore, the sendv handler for UDP sockets must only attempt // to write everything inline. - GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " SendVUDP called"; CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0); CSliceUnref(write_buf_); write_buf_ = FlattenIovec(iov, iov_count); @@ -388,9 +392,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { write_buf_ = grpc_empty_slice(); wsa_error_ctx->SetWSAError(wsa_error_code); char* msg = gpr_format_message(wsa_error_code); - GRPC_CARES_TRACE_LOG( - "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(), - wsa_error_code, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendVUDP SendWriteBuf error code:" << wsa_error_code + << " msg:" << msg; gpr_free(msg); return -1; } @@ -406,8 +411,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { // out in the background, and making further send progress in general, will // happen as long as c-ares continues to show interest in writeability on // this fd. - GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d", - GetName(), tcp_write_state_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " SendVTCP called tcp_write_state_:" << tcp_write_state_; switch (tcp_write_state_) { case WRITE_IDLE: tcp_write_state_ = WRITE_REQUESTED; @@ -450,13 +456,13 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void OnTcpConnectLocked(grpc_error_handle error) { - GRPC_CARES_TRACE_LOG( - "fd:%s InnerOnTcpConnectLocked error:|%s| " - "pending_register_for_readable:%d" - " pending_register_for_writeable:%d", - GetName(), StatusToString(error).c_str(), - pending_continue_register_for_on_readable_locked_, - pending_continue_register_for_on_writeable_locked_); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " InnerOnTcpConnectLocked error:" << StatusToString(error) + << " pending_register_for_readable:" + << pending_continue_register_for_on_readable_locked_ + << " pending_register_for_writeable:" + << pending_continue_register_for_on_writeable_locked_; CHECK(!connect_done_); connect_done_ = true; CHECK_EQ(wsa_connect_error_, 0); @@ -473,10 +479,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { if (!wsa_success) { wsa_connect_error_ = WSAGetLastError(); char* msg = gpr_format_message(wsa_connect_error_); - GRPC_CARES_TRACE_LOG( - "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d " - "msg:|%s|", - GetName(), wsa_connect_error_, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " InnerOnTcpConnectLocked WSA overlapped result code:" + << wsa_connect_error_ << " msg:" << msg; gpr_free(msg); } } @@ -502,7 +508,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, ares_socklen_t target_len) { - GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " ConnectUDP"; CHECK(!connect_done_); CHECK_EQ(wsa_connect_error_, 0); SOCKET s = grpc_winsocket_wrapped_socket(winsocket_); @@ -512,8 +519,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { wsa_error_ctx->SetWSAError(wsa_connect_error_); connect_done_ = true; char* msg = gpr_format_message(wsa_connect_error_); - GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(), - wsa_connect_error_, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " WSAConnect error code:|" + << wsa_connect_error_ << "| msg:|" << msg << "|"; gpr_free(msg); // c-ares expects a posix-style connect API return out == 0 ? 0 : -1; @@ -521,7 +529,8 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, ares_socklen_t target_len) { - GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() << " ConnectTCP"; LPFN_CONNECTEX ConnectEx; GUID guid = WSAID_CONNECTEX; DWORD ioctl_num_bytes; @@ -532,10 +541,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int wsa_last_error = WSAGetLastError(); wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG( - "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d " - "msg:|%s|", - GetName(), wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:" + << wsa_last_error << " msg:|" << msg << "|"; gpr_free(msg); connect_done_ = true; wsa_connect_error_ = wsa_last_error; @@ -555,8 +564,9 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int wsa_last_error = WSAGetLastError(); wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(), - wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " bind error code:" << wsa_last_error << " msg:|" << msg << "|"; gpr_free(msg); connect_done_ = true; wsa_connect_error_ = wsa_last_error; @@ -569,8 +579,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { int wsa_last_error = WSAGetLastError(); wsa_error_ctx->SetWSAError(wsa_last_error); char* msg = gpr_format_message(wsa_last_error); - GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(), - wsa_last_error, msg); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << GetName() + << " ConnectEx error code:" << wsa_last_error << " msg:|" << msg + << "|"; gpr_free(msg); if (wsa_last_error == WSA_IO_PENDING) { // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on @@ -610,11 +622,12 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) { error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error, "OnIocpReadableInner"); - GRPC_CARES_TRACE_LOG( - "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error " - "code:|%d| msg:|%s|", - GetName(), winsocket_->read_info.wsa_error, - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpReadableInner winsocket_->read_info.wsa_error " + "code:|" + << winsocket_->read_info.wsa_error << "| msg:|" + << StatusToString(error) << "|"; } } } @@ -626,9 +639,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { CSliceUnref(read_buf_); read_buf_ = grpc_empty_slice(); } - GRPC_CARES_TRACE_LOG( - "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(), - GRPC_SLICE_LENGTH(read_buf_)); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpReadable finishing. read buf length now:|" + << GRPC_SLICE_LENGTH(read_buf_) << "|"; ScheduleAndNullReadClosure(error); } @@ -639,17 +653,19 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { } void OnIocpWriteableLocked(grpc_error_handle error) { - GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) OnIocpWriteableInner. fd:|" << GetName() << "|"; CHECK(socket_type_ == SOCK_STREAM); if (error.ok()) { if (winsocket_->write_info.wsa_error != 0) { error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error, "OnIocpWriteableInner"); - GRPC_CARES_TRACE_LOG( - "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error " - "code:|%d| msg:|%s|", - GetName(), winsocket_->write_info.wsa_error, - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpWriteableInner. winsocket_->write_info.wsa_error " + "code:|" + << winsocket_->write_info.wsa_error << "| msg:|" + << StatusToString(error) << "|"; } } CHECK(tcp_write_state_ == WRITE_PENDING); @@ -657,8 +673,10 @@ class GrpcPolledFdWindows final : public GrpcPolledFd { tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY; write_buf_ = grpc_slice_sub_no_ref( write_buf_, 0, winsocket_->write_info.bytes_transferred); - GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d", - GetName(), winsocket_->write_info.bytes_transferred); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:|" << GetName() + << "| OnIocpWriteableInner. bytes transferred:" + << winsocket_->write_info.bytes_transferred; } else { CSliceUnref(write_buf_); write_buf_ = grpc_empty_slice(); @@ -728,7 +746,9 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory { // static ares_socket_t Socket(int af, int type, int protocol, void* user_data) { if (type != SOCK_DGRAM && type != SOCK_STREAM) { - GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) Socket called with invalid socket type:" + << type; return INVALID_SOCKET; } GrpcPolledFdFactoryWindows* self = @@ -736,15 +756,16 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory { SOCKET s = WSASocket(af, type, protocol, nullptr, 0, grpc_get_default_wsa_socket_flags()); if (s == INVALID_SOCKET) { - GRPC_CARES_TRACE_LOG( - "WSASocket failed with params af:%d type:%d protocol:%d", af, type, - protocol); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) WSASocket failed with params af:" << af + << " type:" << type << " protocol:" << protocol; return s; } grpc_error_handle error = grpc_tcp_set_non_block(s); if (!error.ok()) { - GRPC_CARES_TRACE_LOG("WSAIoctl failed with error: %s", - StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) WSAIoctl failed with error: " + << StatusToString(error); return INVALID_SOCKET; } auto on_shutdown_locked = [self, s]() { @@ -755,9 +776,10 @@ class GrpcPolledFdFactoryWindows final : public GrpcPolledFdFactory { }; auto polled_fd = new GrpcPolledFdWindows(s, self->mu_, af, type, std::move(on_shutdown_locked)); - GRPC_CARES_TRACE_LOG( - "fd:|%s| created with params af:%d type:%d protocol:%d", - polled_fd->GetName(), af, type, protocol); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) fd:" << polled_fd->GetName() + << " created with params af:" << af << " type:" << type + << " protocol:" << protocol; CHECK(self->sockets_.insert({s, polled_fd}).second); return s; } diff --git a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc index 0dcd1667d1e..50161653ea4 100644 --- a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -200,8 +200,9 @@ static absl::Status AresStatusToAbslStatus(int status, static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( grpc_ares_ev_driver* ev_driver) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { - GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, - ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request << " Ref ev_driver " + << ev_driver; gpr_ref(&ev_driver->refs); return ev_driver; } @@ -211,11 +212,13 @@ static void grpc_ares_complete_request_locked(grpc_ares_request* r) static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { - GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, - ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " Unref ev_driver " << ev_driver; if (gpr_unref(&ev_driver->refs)) { - GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, - ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " destroy ev_driver " << ev_driver; CHECK_EQ(ev_driver->fds, nullptr); ares_destroy(ev_driver->channel); grpc_ares_complete_request_locked(ev_driver->request); @@ -225,8 +228,9 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) static void fd_node_destroy_locked(fd_node* fdn) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&grpc_ares_request::mu) { - GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << fdn->ev_driver->request + << " delete fd: " << fdn->grpc_polled_fd->GetName(); CHECK(!fdn->readable_registered); CHECK(!fdn->writable_registered); CHECK(fdn->already_shutdown); @@ -292,21 +296,21 @@ static grpc_core::Timestamp calculate_next_ares_backup_poll_alarm( // by the c-ares code comments. grpc_core::Duration until_next_ares_backup_poll_alarm = grpc_core::Duration::Seconds(1); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p. next ares process poll time in " - "%" PRId64 " ms", - driver->request, driver, until_next_ares_backup_poll_alarm.millis()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver << ". next ares process poll time in " + << until_next_ares_backup_poll_alarm.millis() << " ms"; return grpc_core::Timestamp::Now() + until_next_ares_backup_poll_alarm; } static void on_timeout(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* driver = static_cast(arg); grpc_core::MutexLock lock(&driver->request->mu); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " - "err=%s", - driver->request, driver, driver->shutting_down, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver + << " on_timeout_locked. driver->shutting_down=" << driver->shutting_down + << ". err=" << grpc_core::StatusToString(error); if (!driver->shutting_down && error.ok()) { grpc_ares_ev_driver_shutdown_locked(driver); } @@ -327,20 +331,20 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) static void on_ares_backup_poll_alarm(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* driver = static_cast(arg); grpc_core::MutexLock lock(&driver->request->mu); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " - "driver->shutting_down=%d. " - "err=%s", - driver->request, driver, driver->shutting_down, - grpc_core::StatusToString(error).c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver + << " on_ares_backup_poll_alarm_locked. driver->shutting_down=" + << driver->shutting_down << ". err=" << grpc_core::StatusToString(error); if (!driver->shutting_down && error.ok()) { fd_node* fdn = driver->fds; while (fdn != nullptr) { if (!fdn->already_shutdown) { - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; " - "ares_process_fd. fd=%s", - driver->request, driver, fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << driver->request + << " ev_driver=" << driver + << " on_ares_backup_poll_alarm_locked; ares_process_fd. fd=" + << fdn->grpc_polled_fd->GetName(); ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); ares_process_fd(driver->channel, as, as); } @@ -373,8 +377,9 @@ static void on_readable(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->readable_registered = false; - GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << fdn->ev_driver->request + << " readable on " << fdn->grpc_polled_fd->GetName(); if (error.ok() && !ev_driver->shutting_down) { ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); } else { @@ -397,8 +402,9 @@ static void on_writable(void* arg, grpc_error_handle error) { grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->writable_registered = false; - GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request << " writable on " + << fdn->grpc_polled_fd->GetName(); if (error.ok() && !ev_driver->shutting_down) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); } else { @@ -433,8 +439,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( socks[i], ev_driver->pollset_set); - GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " new fd: " << fdn->grpc_polled_fd->GetName(); fdn->readable_registered = false; fdn->writable_registered = false; fdn->already_shutdown = false; @@ -449,15 +456,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn, grpc_schedule_on_exec_ctx); if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) { - GRPC_CARES_TRACE_LOG("request:%p schedule direct read on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " schedule direct read on: " + << fdn->grpc_polled_fd->GetName(); grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure, absl::OkStatus()); } else { - GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " notify read on: " << fdn->grpc_polled_fd->GetName(); fdn->grpc_polled_fd->RegisterForOnReadableLocked( &fdn->read_closure); } @@ -467,9 +475,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) // has not been registered with this socket. if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { - GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", - ev_driver->request, - fdn->grpc_polled_fd->GetName()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " notify write on: " << fdn->grpc_polled_fd->GetName(); grpc_ares_ev_driver_ref(ev_driver); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, grpc_schedule_on_exec_ctx); @@ -505,10 +513,11 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) ev_driver->query_timeout_ms == 0 ? grpc_core::Duration::Infinity() : grpc_core::Duration::Milliseconds(ev_driver->query_timeout_ms); - GRPC_CARES_TRACE_LOG( - "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " - "%" PRId64 " ms", - ev_driver->request, ev_driver, timeout.millis()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << ev_driver->request + << " ev_driver=" << ev_driver + << " grpc_ares_ev_driver_start_locked. timeout in " << timeout.millis() + << " ms"; grpc_ares_ev_driver_ref(ev_driver); GRPC_CLOSURE_INIT(&ev_driver->on_timeout_locked, on_timeout, ev_driver, grpc_schedule_on_exec_ctx); @@ -547,7 +556,8 @@ grpc_error_handle grpc_ares_ev_driver_create_locked( } int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); grpc_ares_test_only_inject_config(&(*ev_driver)->channel); - GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); + GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << request + << " grpc_ares_ev_driver_create_locked"; if (status != ARES_SUCCESS) { grpc_error_handle err = GRPC_ERROR_CREATE(absl::StrCat( "Failed to init ares channel. C-ares error: ", ares_strerror(status))); @@ -645,10 +655,10 @@ static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, const char* host, uint16_t port, bool is_balancer, const char* qtype) ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_request->mu) { - GRPC_CARES_TRACE_LOG( - "request:%p create_hostbyname_request_locked host:%s port:%d " - "is_balancer:%d qtype:%s", - parent_request, host, port, is_balancer, qtype); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << parent_request + << " create_hostbyname_request_locked host:" << host << " port:" << port + << " is_balancer:" << is_balancer << " qtype:" << qtype; grpc_ares_hostbyname_request* hr = new grpc_ares_hostbyname_request(); hr->parent_request = parent_request; hr->host = gpr_strdup(host); @@ -675,9 +685,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, static_cast(arg); grpc_ares_request* r = hr->parent_request; if (status == ARES_SUCCESS) { - GRPC_CARES_TRACE_LOG( - "request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r, - hr->qtype, hr->host); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_hostbyname_done_locked qtype=" << hr->qtype + << " host=" << hr->host << " ARES_SUCCESS"; std::unique_ptr* address_list_ptr = hr->is_balancer ? r->balancer_addresses_out : r->addresses_out; if (*address_list_ptr == nullptr) { @@ -701,10 +712,11 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, addr->sin6_port = hr->port; char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN); - GRPC_CARES_TRACE_LOG( - "request:%p c-ares resolver gets a AF_INET6 result: \n" - " addr: %s\n port: %d\n sin6_scope_id: %d\n", - r, output, ntohs(hr->port), addr->sin6_scope_id); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares resolver gets a AF_INET6 result: \n" + << " addr: " << output << "\n port: " << ntohs(hr->port) + << "\n sin6_scope_id: " << addr->sin6_scope_id << "\n"; break; } case AF_INET: { @@ -716,10 +728,10 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, addr->sin_port = hr->port; char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN); - GRPC_CARES_TRACE_LOG( - "request:%p c-ares resolver gets a AF_INET result: \n" - " addr: %s\n port: %d\n", - r, output, ntohs(hr->port)); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares resolver gets a AF_INET result: \n addr: " << output + << "\n port: " << ntohs(hr->port) << "\n"; break; } } @@ -729,8 +741,9 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/, std::string error_msg = absl::StrFormat( "C-ares status is not ARES_SUCCESS qtype=%s name=%s is_balancer=%d: %s", hr->qtype, hr->host, hr->is_balancer, ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r, - error_msg.c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_hostbyname_done_locked: " << error_msg; r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); } @@ -745,13 +758,14 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, GrpcAresQuery* q = static_cast(arg); grpc_ares_request* r = q->parent_request(); if (status == ARES_SUCCESS) { - GRPC_CARES_TRACE_LOG( - "request:%p on_srv_query_done_locked name=%s ARES_SUCCESS", r, - q->name().c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_srv_query_done_locked name=" << q->name() << " ARES_SUCCESS"; struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); - GRPC_CARES_TRACE_LOG("request:%p ares_parse_srv_reply: %d", r, - parse_status); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " ares_parse_srv_reply: " << parse_status; if (parse_status == ARES_SUCCESS) { for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { @@ -775,8 +789,9 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/, std::string error_msg = absl::StrFormat( "C-ares status is not ARES_SUCCESS qtype=SRV name=%s: %s", q->name(), ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r, - error_msg.c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_srv_query_done_locked: " << error_msg; r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); } @@ -797,8 +812,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; if (status != ARES_SUCCESS) goto fail; - GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked name=%s ARES_SUCCESS", r, - q->name().c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " on_txt_done_locked name=" << q->name() << " ARES_SUCCESS"; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; // Find service config in TXT record. @@ -826,8 +842,9 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/, service_config_len += result->length; } (*r->service_config_json_out)[service_config_len] = '\0'; - GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r, - *r->service_config_json_out); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " found service config: " << *r->service_config_json_out; } // Clean up. ares_free_data(reply); @@ -837,8 +854,8 @@ fail: std::string error_msg = absl::StrFormat("C-ares status is not ARES_SUCCESS qtype=TXT name=%s: %s", q->name(), ares_strerror(status)); - GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r, - error_msg.c_str()); + GRPC_TRACE_VLOG(cares_resolver, 2) << "(c-ares resolver) request:" << r + << " on_txt_done_locked " << error_msg; r->error = grpc_error_add_child(AresStatusToAbslStatus(status, error_msg), r->error); } @@ -847,8 +864,9 @@ grpc_error_handle set_request_dns_server(grpc_ares_request* r, absl::string_view dns_server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(r->mu) { if (!dns_server.empty()) { - GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r, - dns_server.data()); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r << " Using DNS server " + << dns_server.data(); grpc_resolved_address addr; if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) { r->dns_server_addr.family = AF_INET; @@ -1043,10 +1061,10 @@ static grpc_ares_request* grpc_dns_lookup_hostname_ares_impl( r->ev_driver = nullptr; r->on_done = on_done; r->addresses_out = addrs; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_hostname_ares_impl name=%s, " - "default_port=%s", - r, name, default_port); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares grpc_dns_lookup_hostname_ares_impl name=" << name + << ", default_port=" << default_port; // Early out if the target is an ipv4 or ipv6 literal. if (resolve_as_ip_literal_locked(name, default_port, addrs)) { grpc_ares_complete_request_locked(r); @@ -1097,8 +1115,9 @@ grpc_ares_request* grpc_dns_lookup_srv_ares_impl( r->ev_driver = nullptr; r->on_done = on_done; r->balancer_addresses_out = balancer_addresses; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_srv_ares_impl name=%s", r, name); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares grpc_dns_lookup_srv_ares_impl name=" << name; grpc_error_handle error; // Don't query for SRV records if the target is "localhost" if (target_matches_localhost(name)) { @@ -1135,8 +1154,9 @@ grpc_ares_request* grpc_dns_lookup_txt_ares_impl( r->ev_driver = nullptr; r->on_done = on_done; r->service_config_json_out = service_config_json; - GRPC_CARES_TRACE_LOG( - "request:%p c-ares grpc_dns_lookup_txt_ares_impl name=%s", r, name); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " c-ares grpc_dns_lookup_txt_ares_impl name=" << name; grpc_error_handle error; // Don't query for TXT records if the target is "localhost" if (target_matches_localhost(name)) { @@ -1185,8 +1205,9 @@ grpc_ares_request* (*grpc_dns_lookup_txt_ares)( static void grpc_cancel_ares_request_impl(grpc_ares_request* r) { CHECK_NE(r, nullptr); grpc_core::MutexLock lock(&r->mu); - GRPC_CARES_TRACE_LOG("request:%p grpc_cancel_ares_request ev_driver:%p", r, - r->ev_driver); + GRPC_TRACE_VLOG(cares_resolver, 2) + << "(c-ares resolver) request:" << r + << " grpc_cancel_ares_request ev_driver:" << r->ev_driver; if (r->ev_driver != nullptr) { grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } diff --git a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h index 9fdb94c0da8..bd4fbe2720f 100644 --- a/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -39,13 +39,6 @@ #define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000 -#define GRPC_CARES_TRACE_LOG(format, ...) \ - do { \ - if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) { \ - VLOG(2) << "(c-ares resolver) " << absl::StrFormat(format, __VA_ARGS__); \ - } \ - } while (0) - typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; struct grpc_ares_request { diff --git a/src/core/util/log.cc b/src/core/util/log.cc index c34c1503d3c..d32c8373787 100644 --- a/src/core/util/log.cc +++ b/src/core/util/log.cc @@ -40,8 +40,6 @@ void gpr_unreachable_code(const char* reason, const char* file, int line) { grpc_core::SourceLocation(file, line)); } -int absl_vlog2_enabled() { return ABSL_VLOG_IS_ON(2); } - int gpr_should_log(gpr_log_severity severity) { switch (severity) { case GPR_LOG_SEVERITY_ERROR: @@ -85,10 +83,6 @@ void gpr_log_message(const char* file, int line, gpr_log_severity severity, } void gpr_log_verbosity_init(void) { -// This is enabled in Github only. -// This ifndef is converted to ifdef internally by copybara. -// Internally grpc verbosity is managed using absl settings. -// So internally we avoid setting it like this. #ifndef GRPC_VERBOSITY_MACRO // SetMinLogLevel sets the value for the entire binary, not just gRPC. // This setting will change things for other libraries/code that is unrelated diff --git a/src/cpp/client/global_callback_hook.cc b/src/cpp/client/global_callback_hook.cc new file mode 100644 index 00000000000..2431508b7b3 --- /dev/null +++ b/src/cpp/client/global_callback_hook.cc @@ -0,0 +1,36 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "absl/base/no_destructor.h" +#include "absl/log/check.h" + +#include + +namespace grpc { + +static absl::NoDestructor> g_callback_hook( + std::make_shared()); + +std::shared_ptr GetGlobalCallbackHook() { + return *g_callback_hook; +} + +void SetGlobalCallbackHook(GlobalCallbackHook* hook) { + CHECK(hook != nullptr); + CHECK(hook != (*g_callback_hook).get()); + *g_callback_hook = std::shared_ptr(hook); +} +} // namespace grpc diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc index 6ae99378296..44c653d58b8 100644 --- a/src/cpp/ext/otel/otel_plugin.cc +++ b/src/cpp/ext/otel/otel_plugin.cc @@ -563,6 +563,38 @@ OpenTelemetryPluginImpl::OpenTelemetryPluginImpl( }); } +OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() { + for (const auto& instrument_data : instruments_data_) { + grpc_core::Match( + instrument_data.instrument, [](const Disabled&) {}, + [](const std::unique_ptr>&) {}, + [](const std::unique_ptr>&) { + }, + [](const std::unique_ptr< + opentelemetry::metrics::Histogram>&) {}, + [](const std::unique_ptr>&) { + }, + [](const std::unique_ptr>& state) { + CHECK(state->caches.empty()); + if (state->ot_callback_registered) { + state->instrument->RemoveCallback( + &CallbackGaugeState::CallbackGaugeCallback, + state.get()); + state->ot_callback_registered = false; + } + }, + [](const std::unique_ptr>& state) { + CHECK(state->caches.empty()); + if (state->ot_callback_registered) { + state->instrument->RemoveCallback( + &CallbackGaugeState::CallbackGaugeCallback, + state.get()); + state->ot_callback_registered = false; + } + }); + } +} + namespace { constexpr absl::string_view kLocality = "grpc.lb.locality"; } @@ -823,9 +855,6 @@ void OpenTelemetryPluginImpl::AddCallback( void OpenTelemetryPluginImpl::RemoveCallback( grpc_core::RegisteredMetricCallback* callback) { - std::vector< - absl::variant*, CallbackGaugeState*>> - gauges_that_need_to_remove_callback; { grpc_core::MutexLock lock(&mu_); callback_timestamps_.erase(callback); @@ -848,11 +877,6 @@ void OpenTelemetryPluginImpl::RemoveCallback( CHECK_NE(callback_gauge_state, nullptr); CHECK((*callback_gauge_state)->ot_callback_registered); CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u); - if ((*callback_gauge_state)->caches.empty()) { - gauges_that_need_to_remove_callback.push_back( - callback_gauge_state->get()); - (*callback_gauge_state)->ot_callback_registered = false; - } break; } case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { @@ -867,11 +891,6 @@ void OpenTelemetryPluginImpl::RemoveCallback( CHECK_NE(callback_gauge_state, nullptr); CHECK((*callback_gauge_state)->ot_callback_registered); CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u); - if ((*callback_gauge_state)->caches.empty()) { - gauges_that_need_to_remove_callback.push_back( - callback_gauge_state->get()); - (*callback_gauge_state)->ot_callback_registered = false; - } break; } default: @@ -880,21 +899,13 @@ void OpenTelemetryPluginImpl::RemoveCallback( } } } - // RemoveCallback internally grabs OpenTelemetry's observable_registry's - // lock. So we need to call it without our plugin lock otherwise we may - // deadlock. - for (const auto& gauge : gauges_that_need_to_remove_callback) { - grpc_core::Match( - gauge, - [](CallbackGaugeState* gauge) { - gauge->instrument->RemoveCallback( - &CallbackGaugeState::CallbackGaugeCallback, gauge); - }, - [](CallbackGaugeState* gauge) { - gauge->instrument->RemoveCallback( - &CallbackGaugeState::CallbackGaugeCallback, gauge); - }); - } + // Note that we are not removing the callback from OpenTelemetry immediately, + // and instead remove it when the plugin is destroyed. We just have a single + // callback per OpenTelemetry instrument which is a small number. If we decide + // to remove the callback immediately at this point, we need to make sure that + // 1) the callback is removed without holding mu_ and 2) we make sure that + // this does not race against a possible `AddCallback` operation. A potential + // way to do this is to use WorkSerializer. } template diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h index 70321fb021d..9b0c4328673 100644 --- a/src/cpp/ext/otel/otel_plugin.h +++ b/src/cpp/ext/otel/otel_plugin.h @@ -223,6 +223,7 @@ class OpenTelemetryPluginImpl absl::AnyInvocable< bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> channel_scope_filter); + ~OpenTelemetryPluginImpl() override; private: class ClientCallTracer; diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 8dd2530cbc8..14bce49d37f 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -60,30 +60,30 @@ static VALUE grpc_rb_call_credentials_callback(VALUE args) { VALUE callback_func = rb_ary_entry(args, 0); VALUE callback_args = rb_ary_entry(args, 1); VALUE md_ary_obj = rb_ary_entry(args, 2); - if (absl_vlog2_enabled()) { - VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0); - VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0); - VALUE callback_source_info = - rb_funcall(callback_func, rb_intern("source_location"), 0); - if (callback_source_info != Qnil) { - VALUE source_filename = rb_ary_entry(callback_source_info, 0); - VALUE source_line_number = rb_funcall( - rb_ary_entry(callback_source_info, 1), rb_intern("to_s"), 0); - gpr_log(GPR_DEBUG, - "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " - "source_filename:%s line_number:%s with arguments:|%s|", - StringValueCStr(callback_func_str), - StringValueCStr(source_filename), - StringValueCStr(source_line_number), - StringValueCStr(callback_args_str)); - } else { - gpr_log(GPR_DEBUG, - "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " - "(failed to get source filename and line) with arguments:|%s|", - StringValueCStr(callback_func_str), - StringValueCStr(callback_args_str)); - } + + VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0); + VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0); + VALUE callback_source_info = + rb_funcall(callback_func, rb_intern("source_location"), 0); + if (callback_source_info != Qnil) { + VALUE source_filename = rb_ary_entry(callback_source_info, 0); + VALUE source_line_number = + rb_funcall(rb_ary_entry(callback_source_info, 1), rb_intern("to_s"), 0); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " + "source_filename:%s line_number:%s with arguments:|%s|", + StringValueCStr(callback_func_str), + StringValueCStr(source_filename), + StringValueCStr(source_line_number), + StringValueCStr(callback_args_str)); + } else { + gpr_log(GPR_DEBUG, + "GRPC_RUBY: grpc_rb_call_credentials invoking user callback:|%s| " + "(failed to get source filename and line) with arguments:|%s|", + StringValueCStr(callback_func_str), + StringValueCStr(callback_args_str)); } + VALUE metadata = rb_funcall(callback_func, rb_intern("call"), 1, callback_args); grpc_metadata_array* md_ary = NULL; diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 36e66f82b86..31cb6523097 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -254,7 +254,6 @@ gpr_free_aligned_type gpr_free_aligned_import; gpr_cpu_num_cores_type gpr_cpu_num_cores_import; gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import; gpr_log_type gpr_log_import; -absl_vlog2_enabled_type absl_vlog2_enabled_import; gpr_log_verbosity_init_type gpr_log_verbosity_init_import; gpr_format_message_type gpr_format_message_import; gpr_strdup_type gpr_strdup_import; @@ -539,7 +538,6 @@ void grpc_rb_load_imports(HMODULE library) { gpr_cpu_num_cores_import = (gpr_cpu_num_cores_type) GetProcAddress(library, "gpr_cpu_num_cores"); gpr_cpu_current_cpu_import = (gpr_cpu_current_cpu_type) GetProcAddress(library, "gpr_cpu_current_cpu"); gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log"); - absl_vlog2_enabled_import = (absl_vlog2_enabled_type) GetProcAddress(library, "absl_vlog2_enabled"); gpr_log_verbosity_init_import = (gpr_log_verbosity_init_type) GetProcAddress(library, "gpr_log_verbosity_init"); gpr_format_message_import = (gpr_format_message_type) GetProcAddress(library, "gpr_format_message"); gpr_strdup_import = (gpr_strdup_type) GetProcAddress(library, "gpr_strdup"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 8ddd62a7ce9..02aee5008a7 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -738,9 +738,6 @@ extern gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import; typedef void(*gpr_log_type)(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); extern gpr_log_type gpr_log_import; #define gpr_log gpr_log_import -typedef int(*absl_vlog2_enabled_type)(); -extern absl_vlog2_enabled_type absl_vlog2_enabled_import; -#define absl_vlog2_enabled absl_vlog2_enabled_import typedef void(*gpr_log_verbosity_init_type)(void); extern gpr_log_verbosity_init_type gpr_log_verbosity_init_import; #define gpr_log_verbosity_init gpr_log_verbosity_init_import diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc index cbc9b18fc42..22b971ac370 100644 --- a/test/cpp/ext/otel/otel_plugin_test.cc +++ b/test/cpp/ext/otel/otel_plugin_test.cc @@ -1715,14 +1715,8 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, InstrumentsEnabledTest) { EXPECT_FALSE(stats_plugins.IsInstrumentEnabled(counter_handle)); } -class OpenTelemetryPluginCallbackMetricsTest - : public OpenTelemetryPluginEnd2EndTest { - protected: - OpenTelemetryPluginCallbackMetricsTest() - : endpoint_config_(grpc_core::ChannelArgs()) {} - - grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config_; -}; +using OpenTelemetryPluginCallbackMetricsTest = + OpenTelemetryPluginNPCMetricsTest; // The callback minimal interval is longer than the OT reporting interval, so we // expect to collect duplicated (cached) values. @@ -1775,14 +1769,11 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_1 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_1; - reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1, + reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1, + reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); + ; }, grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), integer_gauge_handle, double_gauge_handle); @@ -1792,12 +1783,8 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_2 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_2; - reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); }, @@ -1910,14 +1897,10 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_1 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_1; - reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1, + reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1, + reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet1, kOptionalLabelValuesSet1); - reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2, - kOptionalLabelValuesSet2); }, grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(), integer_gauge_handle, double_gauge_handle); @@ -1927,12 +1910,8 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, auto registered_metric_callback_2 = stats_plugins.RegisterCallback( [&](grpc_core::CallbackMetricReporter& reporter) { ++report_count_2; - reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); - reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1, - kOptionalLabelValuesSet1); reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2, kOptionalLabelValuesSet2); }, @@ -1996,6 +1975,116 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest, kOptionalLabelKeys, kOptionalLabelValuesSet2, 0.0, true)); } +// Verifies that callbacks are cleaned up when the OpenTelemetry plugin is +// destroyed. +TEST_F(OpenTelemetryPluginCallbackMetricsTest, VerifyCallbacksAreCleanedUp) { + constexpr absl::string_view kInt64CallbackGaugeMetric = + "yet_another_int64_callback_gauge"; + constexpr absl::string_view kDoubleCallbackGaugeMetric = + "yet_another_double_callback_gauge"; + auto integer_gauge_handle = + grpc_core::GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge( + kInt64CallbackGaugeMetric, "An int64 callback gauge.", "unit", + /*enable_by_default=*/true) + .Build(); + auto double_gauge_handle = + grpc_core::GlobalInstrumentsRegistry::RegisterCallbackDoubleGauge( + kDoubleCallbackGaugeMetric, "A double callback gauge.", "unit", + /*enable_by_default=*/true) + .Build(); + Init(std::move(Options().set_metric_names( + {kInt64CallbackGaugeMetric, kDoubleCallbackGaugeMetric}))); + auto stats_plugins = + grpc_core::GlobalStatsPluginRegistry::GetStatsPluginsForChannel( + grpc_core::experimental::StatsPluginChannelScope( + "dns:///localhost:8080", "", endpoint_config_)); + // Multiple callbacks for the same metrics, each reporting different + // label values. + int report_count_1 = 0; + int64_t int_value_1 = 1; + double double_value_1 = 0.5; + auto registered_metric_callback_1 = stats_plugins.RegisterCallback( + [&](grpc_core::CallbackMetricReporter& reporter) { + ++report_count_1; + reporter.Report(integer_gauge_handle, int_value_1++, {}, {}); + reporter.Report(double_gauge_handle, double_value_1++, {}, {}); + }, + grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(), + integer_gauge_handle, double_gauge_handle); + int report_count_2 = 0; + int64_t int_value_2 = 1; + double double_value_2 = 0.5; + auto registered_metric_callback_2 = stats_plugins.RegisterCallback( + [&](grpc_core::CallbackMetricReporter& reporter) { + ++report_count_2; + reporter.Report(integer_gauge_handle, int_value_2++, {}, {}); + reporter.Report(double_gauge_handle, double_value_2++, {}, {}); + }, + grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(), + integer_gauge_handle, double_gauge_handle); + constexpr int kIterations = 50; + { + MetricsCollectorThread collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { + return !data.contains(kInt64CallbackGaugeMetric) || + !data.contains(kDoubleCallbackGaugeMetric); + }}; + } + // Verify that callbacks are invoked + EXPECT_EQ(report_count_1, kIterations); + EXPECT_EQ(report_count_2, kIterations); + // Remove one of the callbacks + registered_metric_callback_1.reset(); + { + MetricsCollectorThread new_collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return false; }}; + } + EXPECT_EQ(report_count_1, kIterations); // No change since previous + EXPECT_EQ(report_count_2, 2 * kIterations); // Gets another kIterations + // Remove the other callback as well + registered_metric_callback_2.reset(); + MetricsCollectorThread new_new_collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return false; }}; + // We shouldn't get any new callbacks + EXPECT_THAT(new_new_collector.Stop(), ::testing::IsEmpty()); + EXPECT_EQ(report_count_1, kIterations); + EXPECT_EQ(report_count_2, 2 * kIterations); + // Reset stats plugins as well + grpc_core::GlobalStatsPluginRegistryTestPeer:: + ResetGlobalStatsPluginRegistry(); + registered_metric_callback_2.reset(); + MetricsCollectorThread new_new_new_collector{ + this, + grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(), + kIterations, + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return false; }}; + // Still no new callbacks + EXPECT_THAT(new_new_new_collector.Stop(), ::testing::IsEmpty()); + EXPECT_EQ(report_count_1, kIterations); + EXPECT_EQ(report_count_2, 2 * kIterations); +} + TEST(OpenTelemetryPluginMetricsEnablingDisablingTest, TestEnableDisableAPIs) { grpc::internal::OpenTelemetryPluginBuilderImpl builder; // First disable all metrics diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 40501c821bc..af16cffbd10 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -1073,6 +1073,7 @@ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ +include/grpcpp/support/global_callback_hook.h \ include/grpcpp/support/interceptor.h \ include/grpcpp/support/message_allocator.h \ include/grpcpp/support/method_handler.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 68987e28551..7843c17dae2 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1073,6 +1073,7 @@ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/client_callback.h \ include/grpcpp/support/client_interceptor.h \ include/grpcpp/support/config.h \ +include/grpcpp/support/global_callback_hook.h \ include/grpcpp/support/interceptor.h \ include/grpcpp/support/message_allocator.h \ include/grpcpp/support/method_handler.h \ @@ -3046,6 +3047,7 @@ src/cpp/client/create_channel.cc \ src/cpp/client/create_channel_internal.cc \ src/cpp/client/create_channel_internal.h \ src/cpp/client/create_channel_posix.cc \ +src/cpp/client/global_callback_hook.cc \ src/cpp/client/insecure_credentials.cc \ src/cpp/client/secure_credentials.cc \ src/cpp/client/secure_credentials.h \ diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index ab43a6c0868..4363c0482dc 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -134,6 +134,7 @@ LANG_RELEASE_MATRIX = { ("v1.62.0", ReleaseInfo()), ("v1.63.1", ReleaseInfo()), ("v1.64.1", ReleaseInfo()), + ("v1.65.0", ReleaseInfo()), ] ), "go": OrderedDict( @@ -799,6 +800,12 @@ LANG_RELEASE_MATRIX = { runtimes=["python"], testcases_file="python__master" ), ), + ( + "v1.65.0", + ReleaseInfo( + runtimes=["python"], testcases_file="python__master" + ), + ), ] ), "node": OrderedDict( @@ -897,6 +904,7 @@ LANG_RELEASE_MATRIX = { ("v1.62.0", ReleaseInfo()), ("v1.63.0", ReleaseInfo()), ("v1.64.0", ReleaseInfo()), + ("v1.65.0", ReleaseInfo()), ] ), "php": OrderedDict( @@ -959,6 +967,7 @@ LANG_RELEASE_MATRIX = { ("v1.62.0", ReleaseInfo()), ("v1.63.0", ReleaseInfo()), ("v1.64.0", ReleaseInfo()), + ("v1.65.0", ReleaseInfo()), ] ), "csharp": OrderedDict( diff --git a/tools/run_tests/sanity/banned_functions.py b/tools/run_tests/sanity/banned_functions.py index ec8e8b2ec14..9db09d82699 100755 --- a/tools/run_tests/sanity/banned_functions.py +++ b/tools/run_tests/sanity/banned_functions.py @@ -37,11 +37,6 @@ os.chdir(os.path.join(os.path.dirname(sys.argv[0]), "../../..")) # Map of deprecated functions to allowlist files DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = { - "absl_vlog2_enabled(": [ - "./include/grpc/support/log.h", - "./src/core/util/log.cc", - "./src/ruby/ext/grpc/rb_call_credentials.c", - ], "gpr_log_severity": [ "./include/grpc/support/log.h", "./src/core/util/android/log.cc",