[LB policies] hop into WorkSerializer in subchannel wrappers' Orphan() (#34394)

The one in xds_override_host was the one that was actually triggering
test failures, but I audited all of the other policies and fixed a
couple of other places that could also be problematic.
pull/34387/head
Mark D. Roth 1 year ago committed by GitHub
parent 87eed73a47
commit 214776e6aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/core/BUILD
  2. 15
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 24
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  4. 13
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc

@ -4032,6 +4032,7 @@ grpc_cc_library(
"context", "context",
"delegating_helper", "delegating_helper",
"error", "error",
"experiments",
"gpr_atm", "gpr_atm",
"grpc_sockaddr", "grpc_sockaddr",
"json", "json",
@ -5042,6 +5043,7 @@ grpc_cc_library(
deps = [ deps = [
"channel_args", "channel_args",
"delegating_helper", "delegating_helper",
"experiments",
"grpc_outlier_detection_header", "grpc_outlier_detection_header",
"health_check_client", "health_check_client",
"iomgr_fwd", "iomgr_fwd",
@ -5172,6 +5174,7 @@ grpc_cc_library(
"closure", "closure",
"delegating_helper", "delegating_helper",
"error", "error",
"experiments",
"grpc_stateful_session_filter", "grpc_stateful_session_filter",
"grpc_xds_client", "grpc_xds_client",
"iomgr_fwd", "iomgr_fwd",

@ -107,6 +107,7 @@
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
@ -318,10 +319,22 @@ class GrpcLb : public LoadBalancingPolicy {
lb_token_(std::move(lb_token)), lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {} client_stats_(std::move(client_stats)) {}
~SubchannelWrapper() override { void Orphan() override {
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
if (!lb_policy_->shutting_down_) { if (!lb_policy_->shutting_down_) {
lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel()); lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
} }
return;
}
WeakRefCountedPtr<SubchannelWrapper> self = WeakRef();
lb_policy_->work_serializer()->Run(
[self = std::move(self)]() {
if (!self->lb_policy_->shutting_down_) {
self->lb_policy_->CacheDeletedSubchannelLocked(
self->wrapped_subchannel());
}
},
DEBUG_LOCATION);
} }
const std::string& lb_token() const { return lb_token_; } const std::string& lb_token() const { return lb_token_; }

@ -49,6 +49,7 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
@ -123,9 +124,11 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
class SubchannelState; class SubchannelState;
class SubchannelWrapper : public DelegatingSubchannel { class SubchannelWrapper : public DelegatingSubchannel {
public: public:
SubchannelWrapper(RefCountedPtr<SubchannelState> subchannel_state, SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,
RefCountedPtr<SubchannelState> subchannel_state,
RefCountedPtr<SubchannelInterface> subchannel) RefCountedPtr<SubchannelInterface> subchannel)
: DelegatingSubchannel(std::move(subchannel)), : DelegatingSubchannel(std::move(subchannel)),
work_serializer_(std::move(work_serializer)),
subchannel_state_(std::move(subchannel_state)) { subchannel_state_(std::move(subchannel_state)) {
if (subchannel_state_ != nullptr) { if (subchannel_state_ != nullptr) {
subchannel_state_->AddSubchannel(this); subchannel_state_->AddSubchannel(this);
@ -135,10 +138,21 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
} }
} }
~SubchannelWrapper() override { void Orphan() override {
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
if (subchannel_state_ != nullptr) { if (subchannel_state_ != nullptr) {
subchannel_state_->RemoveSubchannel(this); subchannel_state_->RemoveSubchannel(this);
} }
return;
}
WeakRefCountedPtr<SubchannelWrapper> self = WeakRef();
work_serializer_->Run(
[self = std::move(self)]() {
if (self->subchannel_state_ != nullptr) {
self->subchannel_state_->RemoveSubchannel(self.get());
}
},
DEBUG_LOCATION);
} }
void Eject(); void Eject();
@ -206,6 +220,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
bool ejected_; bool ejected_;
}; };
std::shared_ptr<WorkSerializer> work_serializer_;
RefCountedPtr<SubchannelState> subchannel_state_; RefCountedPtr<SubchannelState> subchannel_state_;
bool ejected_ = false; bool ejected_ = false;
WatcherWrapper* watcher_wrapper_ = nullptr; WatcherWrapper* watcher_wrapper_ = nullptr;
@ -716,8 +731,9 @@ RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
} }
} }
auto subchannel = MakeRefCounted<SubchannelWrapper>( auto subchannel = MakeRefCounted<SubchannelWrapper>(
subchannel_state, parent()->channel_control_helper()->CreateSubchannel( parent()->work_serializer(), subchannel_state,
std::move(address), args)); parent()->channel_control_helper()->CreateSubchannel(std::move(address),
args));
if (subchannel_state != nullptr) { if (subchannel_state != nullptr) {
subchannel_state->AddSubchannel(subchannel.get()); subchannel_state->AddSubchannel(subchannel.get());
} }

@ -29,6 +29,7 @@
#include <set> #include <set>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <type_traits>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -51,6 +52,7 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
@ -717,8 +719,19 @@ void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
} }
void XdsOverrideHostLb::SubchannelWrapper::Orphan() { void XdsOverrideHostLb::SubchannelWrapper::Orphan() {
if (!IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled()) {
key_.reset(); key_.reset();
wrapped_subchannel()->CancelConnectivityStateWatch(watcher_); wrapped_subchannel()->CancelConnectivityStateWatch(watcher_);
return;
}
WeakRefCountedPtr<SubchannelWrapper> self = WeakRef();
policy_->work_serializer()->Run(
[self = std::move(self)]() {
self->key_.reset();
self->wrapped_subchannel()->CancelConnectivityStateWatch(
self->watcher_);
},
DEBUG_LOCATION);
} }
grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper:: grpc_pollset_set* XdsOverrideHostLb::SubchannelWrapper::

Loading…
Cancel
Save