diff --git a/src/core/BUILD b/src/core/BUILD index 7e33aba1571..4e59c1dc97c 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4032,6 +4032,7 @@ grpc_cc_library( "context", "delegating_helper", "error", + "experiments", "gpr_atm", "grpc_sockaddr", "json", @@ -5042,6 +5043,7 @@ grpc_cc_library( deps = [ "channel_args", "delegating_helper", + "experiments", "grpc_outlier_detection_header", "health_check_client", "iomgr_fwd", @@ -5172,6 +5174,7 @@ grpc_cc_library( "closure", "delegating_helper", "error", + "experiments", "grpc_stateful_session_filter", "grpc_xds_client", "iomgr_fwd", diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index b9db3eeade3..0655b2dfc45 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -107,6 +107,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" @@ -318,10 +319,22 @@ class GrpcLb : public LoadBalancingPolicy { lb_token_(std::move(lb_token)), client_stats_(std::move(client_stats)) {} - ~SubchannelWrapper() override { - if (!lb_policy_->shutting_down_) { - lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel()); + void Orphan() override { + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + if (!lb_policy_->shutting_down_) { + lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel()); + } + return; } + WeakRefCountedPtr self = WeakRef(); + lb_policy_->work_serializer()->Run( + [self = std::move(self)]() { + if (!self->lb_policy_->shutting_down_) { + self->lb_policy_->CacheDeletedSubchannelLocked( + self->wrapped_subchannel()); + } + }, + DEBUG_LOCATION); } const std::string& lb_token() const { return lb_token_; } diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index 8b52dbe8108..aede6d5067d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -49,6 +49,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -123,9 +124,11 @@ class OutlierDetectionLb : public LoadBalancingPolicy { class SubchannelState; class SubchannelWrapper : public DelegatingSubchannel { public: - SubchannelWrapper(RefCountedPtr subchannel_state, + SubchannelWrapper(std::shared_ptr work_serializer, + RefCountedPtr subchannel_state, RefCountedPtr subchannel) : DelegatingSubchannel(std::move(subchannel)), + work_serializer_(std::move(work_serializer)), subchannel_state_(std::move(subchannel_state)) { if (subchannel_state_ != nullptr) { subchannel_state_->AddSubchannel(this); @@ -135,10 +138,21 @@ class OutlierDetectionLb : public LoadBalancingPolicy { } } - ~SubchannelWrapper() override { - if (subchannel_state_ != nullptr) { - subchannel_state_->RemoveSubchannel(this); + void Orphan() override { + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + if (subchannel_state_ != nullptr) { + subchannel_state_->RemoveSubchannel(this); + } + return; } + WeakRefCountedPtr self = WeakRef(); + work_serializer_->Run( + [self = std::move(self)]() { + if (self->subchannel_state_ != nullptr) { + self->subchannel_state_->RemoveSubchannel(self.get()); + } + }, + DEBUG_LOCATION); } void Eject(); @@ -206,6 +220,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy { bool ejected_; }; + std::shared_ptr work_serializer_; RefCountedPtr subchannel_state_; bool ejected_ = false; WatcherWrapper* watcher_wrapper_ = nullptr; @@ -716,8 +731,9 @@ RefCountedPtr OutlierDetectionLb::Helper::CreateSubchannel( } } auto subchannel = MakeRefCounted( - subchannel_state, parent()->channel_control_helper()->CreateSubchannel( - std::move(address), args)); + parent()->work_serializer(), subchannel_state, + parent()->channel_control_helper()->CreateSubchannel(std::move(address), + args)); if (subchannel_state != nullptr) { subchannel_state->AddSubchannel(subchannel.get()); } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index 02949cac379..7c210284aad 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -51,6 +52,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" @@ -717,8 +719,19 @@ void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState( } void XdsOverrideHostLb::SubchannelWrapper::Orphan() { - key_.reset(); - wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); + if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) { + key_.reset(); + wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); + return; + } + WeakRefCountedPtr self = WeakRef(); + policy_->work_serializer()->Run( + [self = std::move(self)]() { + self->key_.reset(); + self->wrapped_subchannel()->CancelConnectivityStateWatch( + self->watcher_); + }, + DEBUG_LOCATION); } grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper::