From 5a4e8f3dbdfeb3ff41dbdfd076cda897a77ecd28 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 15 Sep 2023 11:52:14 -0700 Subject: [PATCH] [client_channel] second attempt: SubchannelWrapper hops into WorkSerializer before destruction (#34321) Original PR was #34307, reverted in #34318 due to internal test failures. The first commit is a revert of the revert. The second commit contains the fix. The original idea here was that `SubchannelWrapper::Orphan()`, which is called when the strong refcount reaches 0, would take a new weak ref and then hop into the `WorkSerializer` before dropping that weak ref, thus ensuring that the `SubchannelWrapper` is destroyed inside the `WorkSerializer` (which is needed because the `SubchannelWrapper` dtor cleans up some state in the channel related to the subchannel). The problem is that `DualRefCounted<>::Unref()` itself actually increments the weak ref count before calling `Orphan()` and then decrements it afterwards. So in the case where the `SubchannelWrapper` is unreffed outside of the `WorkSerializer` and no other thread happens to be holding the `WorkSerializer`, the weak ref that we were taking in `Orphan()` was unreffed inline, which meant that it wasn't actually the last weak ref -- the last weak ref was the one taken by `DualRefCounted<>::Unref()`, and it wasn't released until after the `WorkSerializer` was released. To this this problem, we move the code from the `SubchannelWrapper` dtor that cleans up the channel's state into the `WorkSerializer` callback that is scheduled in `Orphan()`. Thus, regardless of whether or not the last weak ref is released inside of the `WorkSerializer`, we are definitely doing that cleanup inside the `WorkSerializer`, which is what we actually care about. Also adds an experiment to guard this behavior. --- BUILD | 1 + bazel/experiments.bzl | 9 ++ src/core/BUILD | 1 + .../filters/client_channel/client_channel.cc | 100 ++++++++++++++---- .../weighted_round_robin.cc | 9 +- src/core/lib/experiments/experiments.cc | 33 ++++++ src/core/lib/experiments/experiments.h | 18 +++- src/core/lib/experiments/experiments.yaml | 7 ++ src/core/lib/experiments/rollouts.yaml | 2 + 9 files changed, 154 insertions(+), 26 deletions(-) diff --git a/BUILD b/BUILD index bde7f174e6a..63c4cbc7bf3 100644 --- a/BUILD +++ b/BUILD @@ -3065,6 +3065,7 @@ grpc_cc_library( "//src/core:dual_ref_counted", "//src/core:env", "//src/core:error", + "//src/core:experiments", "//src/core:gpr_atm", "//src/core:grpc_backend_metric_data", "//src/core:grpc_deadline_filter", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 754f6078c2e..07aa70837fc 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -61,15 +61,18 @@ EXPERIMENTS = { "work_stealing", ], "cpp_lb_end2end_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], "flow_control_test": [ "lazier_stream_updates", ], "lb_unit_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], "xds_end2end_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], }, @@ -118,15 +121,18 @@ EXPERIMENTS = { "work_stealing", ], "cpp_lb_end2end_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], "flow_control_test": [ "lazier_stream_updates", ], "lb_unit_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], "xds_end2end_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], }, @@ -185,15 +191,18 @@ EXPERIMENTS = { "work_stealing", ], "cpp_lb_end2end_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], "flow_control_test": [ "lazier_stream_updates", ], "lb_unit_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], "xds_end2end_test": [ + "client_channel_subchannel_wrapper_work_serializer_orphan", "round_robin_delegate_to_pick_first", ], }, diff --git a/src/core/BUILD b/src/core/BUILD index 077b007141f..45e5f2d2be1 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4965,6 +4965,7 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "experiments", "grpc_backend_metric_data", "grpc_lb_subchannel_list", "json", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 6cfd4c48fcd..e204b5fa9fe 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -69,6 +69,7 @@ #include "src/core/lib/channel/status_util.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/status_helper.h" @@ -622,6 +623,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { chand, this, subchannel_.get()); } GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper"); + GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer()); if (chand_->channelz_node_ != nullptr) { auto* subchannel_node = subchannel_->channelz_node(); if (subchannel_node != nullptr) { @@ -634,7 +636,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { ++it->second; } } - GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer()); chand_->subchannel_wrappers_.insert(this); } @@ -644,24 +645,55 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { "chand=%p: destroying subchannel wrapper %p for subchannel %p", chand_, this, subchannel_.get()); } - GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer()); - chand_->subchannel_wrappers_.erase(this); - if (chand_->channelz_node_ != nullptr) { - auto* subchannel_node = subchannel_->channelz_node(); - if (subchannel_node != nullptr) { - auto it = chand_->subchannel_refcount_map_.find(subchannel_.get()); - GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); - --it->second; - if (it->second == 0) { - chand_->channelz_node_->RemoveChildSubchannel( - subchannel_node->uuid()); - chand_->subchannel_refcount_map_.erase(it); + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + chand_->subchannel_wrappers_.erase(this); + if (chand_->channelz_node_ != nullptr) { + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + auto it = chand_->subchannel_refcount_map_.find(subchannel_.get()); + GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); + --it->second; + if (it->second == 0) { + chand_->channelz_node_->RemoveChildSubchannel( + subchannel_node->uuid()); + chand_->subchannel_refcount_map_.erase(it); + } } } } GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper"); } + void Orphan() override { + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + return; + } + // Make sure we clean up the channel's subchannel maps inside the + // WorkSerializer. + // Ref held by callback. + WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release(); + chand_->work_serializer_->Run( + [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { + chand_->subchannel_wrappers_.erase(this); + if (chand_->channelz_node_ != nullptr) { + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + auto it = + chand_->subchannel_refcount_map_.find(subchannel_.get()); + GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); + --it->second; + if (it->second == 0) { + chand_->channelz_node_->RemoveChildSubchannel( + subchannel_node->uuid()); + chand_->subchannel_refcount_map_.erase(it); + } + } + } + WeakUnref(DEBUG_LOCATION, "subchannel map cleanup"); + }, + DEBUG_LOCATION); + } + void WatchConnectivityState( std::unique_ptr watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { @@ -729,13 +761,17 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { : watcher_(std::move(watcher)), parent_(std::move(parent)) {} ~WatcherWrapper() override { - auto* parent = parent_.release(); // ref owned by lambda - parent->chand_->work_serializer_->Run( - [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( - *parent_->chand_->work_serializer_) { - parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); - }, - DEBUG_LOCATION); + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + auto* parent = parent_.release(); // ref owned by lambda + parent->chand_->work_serializer_->Run( + [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( + *parent_->chand_->work_serializer_) { + parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); + }, + DEBUG_LOCATION); + return; + } + parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); } void OnConnectivityStateChange( @@ -2776,6 +2812,9 @@ absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( // We need to unref pickers in the WorkSerializer. std::vector> pickers; auto cleanup = absl::MakeCleanup([&]() { + if (IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + return; + } chand_->work_serializer_->Run( [pickers = std::move(pickers)]() mutable { for (auto& picker : pickers) { @@ -2784,14 +2823,29 @@ absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( }, DEBUG_LOCATION); }); + absl::AnyInvocable)> + set_picker; + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + set_picker = + [&](RefCountedPtr picker) { + pickers.emplace_back(std::move(picker)); + }; + } else { + pickers.emplace_back(); + set_picker = + [&](RefCountedPtr picker) { + pickers[0] = std::move(picker); + }; + } // Grab mutex and take a ref to the picker. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker", chand_, this); } + RefCountedPtr picker; { MutexLock lock(&chand_->lb_mu_); - pickers.emplace_back(chand_->picker_); + set_picker(chand_->picker_); } while (true) { // Do pick. @@ -2804,13 +2858,13 @@ absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( if (!pick_complete) { MutexLock lock(&chand_->lb_mu_); // If picker has been swapped out since we grabbed it, try again. - if (chand_->picker_ != pickers.back()) { + if (pickers.back() != chand_->picker_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: pick not complete, but picker changed", chand_, this); } - pickers.emplace_back(chand_->picker_); + set_picker(chand_->picker_); continue; } // Otherwise queue the pick to try again later when we get a new picker. diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc index 4d3a47c1d5b..83947edaf7a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc @@ -54,6 +54,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -623,8 +624,12 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { self->BuildSchedulerAndStartTimerLocked(); } } - // Release the picker ref inside the WorkSerializer. - work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION); + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + // Release the picker ref inside the WorkSerializer. + work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION); + return; + } + self.reset(); }); } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 61d87841c08..51a21fe97f7 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -110,6 +110,13 @@ const char* const description_round_robin_delegate_to_pick_first = "backend design."; const char* const additional_constraints_round_robin_delegate_to_pick_first = "{}"; +const char* const + description_client_channel_subchannel_wrapper_work_serializer_orphan = + "Client channel subchannel wrapper hops into WorkSerializer at " + "Orphan() time, rather than requiring callers to do it."; +const char* const + additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan = + "{}"; } // namespace namespace grpc_core { @@ -166,6 +173,10 @@ const ExperimentMetadata g_experiment_metadata[] = { {"round_robin_delegate_to_pick_first", description_round_robin_delegate_to_pick_first, additional_constraints_round_robin_delegate_to_pick_first, true, true}, + {"client_channel_subchannel_wrapper_work_serializer_orphan", + description_client_channel_subchannel_wrapper_work_serializer_orphan, + additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan, + true, true}, }; } // namespace grpc_core @@ -260,6 +271,13 @@ const char* const description_round_robin_delegate_to_pick_first = "backend design."; const char* const additional_constraints_round_robin_delegate_to_pick_first = "{}"; +const char* const + description_client_channel_subchannel_wrapper_work_serializer_orphan = + "Client channel subchannel wrapper hops into WorkSerializer at " + "Orphan() time, rather than requiring callers to do it."; +const char* const + additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan = + "{}"; } // namespace namespace grpc_core { @@ -316,6 +334,10 @@ const ExperimentMetadata g_experiment_metadata[] = { {"round_robin_delegate_to_pick_first", description_round_robin_delegate_to_pick_first, additional_constraints_round_robin_delegate_to_pick_first, true, true}, + {"client_channel_subchannel_wrapper_work_serializer_orphan", + description_client_channel_subchannel_wrapper_work_serializer_orphan, + additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan, + true, true}, }; } // namespace grpc_core @@ -410,6 +432,13 @@ const char* const description_round_robin_delegate_to_pick_first = "backend design."; const char* const additional_constraints_round_robin_delegate_to_pick_first = "{}"; +const char* const + description_client_channel_subchannel_wrapper_work_serializer_orphan = + "Client channel subchannel wrapper hops into WorkSerializer at " + "Orphan() time, rather than requiring callers to do it."; +const char* const + additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan = + "{}"; } // namespace namespace grpc_core { @@ -466,6 +495,10 @@ const ExperimentMetadata g_experiment_metadata[] = { {"round_robin_delegate_to_pick_first", description_round_robin_delegate_to_pick_first, additional_constraints_round_robin_delegate_to_pick_first, true, true}, + {"client_channel_subchannel_wrapper_work_serializer_orphan", + description_client_channel_subchannel_wrapper_work_serializer_orphan, + additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan, + true, true}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index f241376d9ec..74c7dc441ff 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -89,6 +89,10 @@ inline bool IsLazierStreamUpdatesEnabled() { return true; } inline bool IsJitterMaxIdleEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; } +#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN +inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() { + return true; +} #elif defined(GPR_WINDOWS) inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -121,6 +125,10 @@ inline bool IsLazierStreamUpdatesEnabled() { return true; } inline bool IsJitterMaxIdleEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; } +#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN +inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() { + return true; +} #else inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -153,6 +161,10 @@ inline bool IsLazierStreamUpdatesEnabled() { return true; } inline bool IsJitterMaxIdleEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; } +#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN +inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() { + return true; +} #endif #else @@ -212,8 +224,12 @@ inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); } inline bool IsRoundRobinDelegateToPickFirstEnabled() { return IsExperimentEnabled(23); } +#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN +inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() { + return IsExperimentEnabled(24); +} -constexpr const size_t kNumExperiments = 24; +constexpr const size_t kNumExperiments = 25; extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; #endif diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 7c605d53ff9..47ee24f6775 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -192,3 +192,10 @@ expiry: 2023/11/15 owner: roth@google.com test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"] +- name: client_channel_subchannel_wrapper_work_serializer_orphan + description: + Client channel subchannel wrapper hops into WorkSerializer at + Orphan() time, rather than requiring callers to do it. + expiry: 2023/11/15 + owner: roth@google.com + test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"] diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index f1770b22fae..b4243805f8c 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -96,3 +96,5 @@ default: true - name: round_robin_delegate_to_pick_first default: true +- name: client_channel_subchannel_wrapper_work_serializer_orphan + default: true