[client_channel] second attempt: SubchannelWrapper hops into WorkSerializer before destruction (#34321)

Original PR was #34307, reverted in #34318 due to internal test
failures.

The first commit is a revert of the revert. The second commit contains
the fix.

The original idea here was that `SubchannelWrapper::Orphan()`, which is
called when the strong refcount reaches 0, would take a new weak ref and
then hop into the `WorkSerializer` before dropping that weak ref, thus
ensuring that the `SubchannelWrapper` is destroyed inside the
`WorkSerializer` (which is needed because the `SubchannelWrapper` dtor
cleans up some state in the channel related to the subchannel). The
problem is that `DualRefCounted<>::Unref()` itself actually increments
the weak ref count before calling `Orphan()` and then decrements it
afterwards. So in the case where the `SubchannelWrapper` is unreffed
outside of the `WorkSerializer` and no other thread happens to be
holding the `WorkSerializer`, the weak ref that we were taking in
`Orphan()` was unreffed inline, which meant that it wasn't actually the
last weak ref -- the last weak ref was the one taken by
`DualRefCounted<>::Unref()`, and it wasn't released until after the
`WorkSerializer` was released.

To this this problem, we move the code from the `SubchannelWrapper` dtor
that cleans up the channel's state into the `WorkSerializer` callback
that is scheduled in `Orphan()`. Thus, regardless of whether or not the
last weak ref is released inside of the `WorkSerializer`, we are
definitely doing that cleanup inside the `WorkSerializer`, which is what
we actually care about.

Also adds an experiment to guard this behavior.
pull/34377/head
Mark D. Roth 1 year ago committed by GitHub
parent 2db446aa9a
commit 5a4e8f3dbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 9
      bazel/experiments.bzl
  3. 1
      src/core/BUILD
  4. 100
      src/core/ext/filters/client_channel/client_channel.cc
  5. 9
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
  6. 33
      src/core/lib/experiments/experiments.cc
  7. 18
      src/core/lib/experiments/experiments.h
  8. 7
      src/core/lib/experiments/experiments.yaml
  9. 2
      src/core/lib/experiments/rollouts.yaml

@ -3065,6 +3065,7 @@ grpc_cc_library(
"//src/core:dual_ref_counted",
"//src/core:env",
"//src/core:error",
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_deadline_filter",

@ -61,15 +61,18 @@ EXPERIMENTS = {
"work_stealing",
],
"cpp_lb_end2end_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
"flow_control_test": [
"lazier_stream_updates",
],
"lb_unit_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
"xds_end2end_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
},
@ -118,15 +121,18 @@ EXPERIMENTS = {
"work_stealing",
],
"cpp_lb_end2end_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
"flow_control_test": [
"lazier_stream_updates",
],
"lb_unit_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
"xds_end2end_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
},
@ -185,15 +191,18 @@ EXPERIMENTS = {
"work_stealing",
],
"cpp_lb_end2end_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
"flow_control_test": [
"lazier_stream_updates",
],
"lb_unit_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
"xds_end2end_test": [
"client_channel_subchannel_wrapper_work_serializer_orphan",
"round_robin_delegate_to_pick_first",
],
},

@ -4965,6 +4965,7 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"experiments",
"grpc_backend_metric_data",
"grpc_lb_subchannel_list",
"json",

@ -69,6 +69,7 @@
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -622,6 +623,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
chand, this, subchannel_.get());
}
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
@ -634,7 +636,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
++it->second;
}
}
GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
chand_->subchannel_wrappers_.insert(this);
}
@ -644,24 +645,55 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
"chand=%p: destroying subchannel wrapper %p for subchannel %p",
chand_, this, subchannel_.get());
}
GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(
subchannel_node->uuid());
chand_->subchannel_refcount_map_.erase(it);
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(
subchannel_node->uuid());
chand_->subchannel_refcount_map_.erase(it);
}
}
}
}
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
}
void Orphan() override {
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
return;
}
// Make sure we clean up the channel's subchannel maps inside the
// WorkSerializer.
// Ref held by callback.
WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release();
chand_->work_serializer_->Run(
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
auto it =
chand_->subchannel_refcount_map_.find(subchannel_.get());
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(
subchannel_node->uuid());
chand_->subchannel_refcount_map_.erase(it);
}
}
}
WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
},
DEBUG_LOCATION);
}
void WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
@ -729,13 +761,17 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
: watcher_(std::move(watcher)), parent_(std::move(parent)) {}
~WatcherWrapper() override {
auto* parent = parent_.release(); // ref owned by lambda
parent->chand_->work_serializer_->Run(
[parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
*parent_->chand_->work_serializer_) {
parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
},
DEBUG_LOCATION);
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
auto* parent = parent_.release(); // ref owned by lambda
parent->chand_->work_serializer_->Run(
[parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
*parent_->chand_->work_serializer_) {
parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
},
DEBUG_LOCATION);
return;
}
parent_.reset(DEBUG_LOCATION, "WatcherWrapper");
}
void OnConnectivityStateChange(
@ -2776,6 +2812,9 @@ absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
// We need to unref pickers in the WorkSerializer.
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
auto cleanup = absl::MakeCleanup([&]() {
if (IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
return;
}
chand_->work_serializer_->Run(
[pickers = std::move(pickers)]() mutable {
for (auto& picker : pickers) {
@ -2784,14 +2823,29 @@ absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
},
DEBUG_LOCATION);
});
absl::AnyInvocable<void(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>)>
set_picker;
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
set_picker =
[&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
pickers.emplace_back(std::move(picker));
};
} else {
pickers.emplace_back();
set_picker =
[&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
pickers[0] = std::move(picker);
};
}
// Grab mutex and take a ref to the picker.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
chand_, this);
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
{
MutexLock lock(&chand_->lb_mu_);
pickers.emplace_back(chand_->picker_);
set_picker(chand_->picker_);
}
while (true) {
// Do pick.
@ -2804,13 +2858,13 @@ absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
if (!pick_complete) {
MutexLock lock(&chand_->lb_mu_);
// If picker has been swapped out since we grabbed it, try again.
if (chand_->picker_ != pickers.back()) {
if (pickers.back() != chand_->picker_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: pick not complete, but picker changed",
chand_, this);
}
pickers.emplace_back(chand_->picker_);
set_picker(chand_->picker_);
continue;
}
// Otherwise queue the pick to try again later when we get a new picker.

@ -54,6 +54,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -623,8 +624,12 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
self->BuildSchedulerAndStartTimerLocked();
}
}
// Release the picker ref inside the WorkSerializer.
work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION);
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
// Release the picker ref inside the WorkSerializer.
work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION);
return;
}
self.reset();
});
}

@ -110,6 +110,13 @@ const char* const description_round_robin_delegate_to_pick_first =
"backend design.";
const char* const additional_constraints_round_robin_delegate_to_pick_first =
"{}";
const char* const
description_client_channel_subchannel_wrapper_work_serializer_orphan =
"Client channel subchannel wrapper hops into WorkSerializer at "
"Orphan() time, rather than requiring callers to do it.";
const char* const
additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan =
"{}";
} // namespace
namespace grpc_core {
@ -166,6 +173,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"round_robin_delegate_to_pick_first",
description_round_robin_delegate_to_pick_first,
additional_constraints_round_robin_delegate_to_pick_first, true, true},
{"client_channel_subchannel_wrapper_work_serializer_orphan",
description_client_channel_subchannel_wrapper_work_serializer_orphan,
additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan,
true, true},
};
} // namespace grpc_core
@ -260,6 +271,13 @@ const char* const description_round_robin_delegate_to_pick_first =
"backend design.";
const char* const additional_constraints_round_robin_delegate_to_pick_first =
"{}";
const char* const
description_client_channel_subchannel_wrapper_work_serializer_orphan =
"Client channel subchannel wrapper hops into WorkSerializer at "
"Orphan() time, rather than requiring callers to do it.";
const char* const
additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan =
"{}";
} // namespace
namespace grpc_core {
@ -316,6 +334,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"round_robin_delegate_to_pick_first",
description_round_robin_delegate_to_pick_first,
additional_constraints_round_robin_delegate_to_pick_first, true, true},
{"client_channel_subchannel_wrapper_work_serializer_orphan",
description_client_channel_subchannel_wrapper_work_serializer_orphan,
additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan,
true, true},
};
} // namespace grpc_core
@ -410,6 +432,13 @@ const char* const description_round_robin_delegate_to_pick_first =
"backend design.";
const char* const additional_constraints_round_robin_delegate_to_pick_first =
"{}";
const char* const
description_client_channel_subchannel_wrapper_work_serializer_orphan =
"Client channel subchannel wrapper hops into WorkSerializer at "
"Orphan() time, rather than requiring callers to do it.";
const char* const
additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan =
"{}";
} // namespace
namespace grpc_core {
@ -466,6 +495,10 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"round_robin_delegate_to_pick_first",
description_round_robin_delegate_to_pick_first,
additional_constraints_round_robin_delegate_to_pick_first, true, true},
{"client_channel_subchannel_wrapper_work_serializer_orphan",
description_client_channel_subchannel_wrapper_work_serializer_orphan,
additional_constraints_client_channel_subchannel_wrapper_work_serializer_orphan,
true, true},
};
} // namespace grpc_core

@ -89,6 +89,10 @@ inline bool IsLazierStreamUpdatesEnabled() { return true; }
inline bool IsJitterMaxIdleEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN
inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() {
return true;
}
#elif defined(GPR_WINDOWS)
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -121,6 +125,10 @@ inline bool IsLazierStreamUpdatesEnabled() { return true; }
inline bool IsJitterMaxIdleEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN
inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() {
return true;
}
#else
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -153,6 +161,10 @@ inline bool IsLazierStreamUpdatesEnabled() { return true; }
inline bool IsJitterMaxIdleEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN
inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() {
return true;
}
#endif
#else
@ -212,8 +224,12 @@ inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); }
inline bool IsRoundRobinDelegateToPickFirstEnabled() {
return IsExperimentEnabled(23);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN
inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() {
return IsExperimentEnabled(24);
}
constexpr const size_t kNumExperiments = 24;
constexpr const size_t kNumExperiments = 25;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
#endif

@ -192,3 +192,10 @@
expiry: 2023/11/15
owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]
- name: client_channel_subchannel_wrapper_work_serializer_orphan
description:
Client channel subchannel wrapper hops into WorkSerializer at
Orphan() time, rather than requiring callers to do it.
expiry: 2023/11/15
owner: roth@google.com
test_tags: ["lb_unit_test", "cpp_lb_end2end_test", "xds_end2end_test"]

@ -96,3 +96,5 @@
default: true
- name: round_robin_delegate_to_pick_first
default: true
- name: client_channel_subchannel_wrapper_work_serializer_orphan
default: true

Loading…
Cancel
Save