[ref-counts] DualRefCounted orphanability is different to Orphanable orphanability (#36194)

Previously we wrote `DualRefCounted : public Orphanable`, but this is wrong: `Orphan`-ing is a private implementation detail to `DualRefCounted`, but a public interface to `Orphanable`.

This bug means that it's possible to write `OrphanablePtr<T>` when `T` is derived from `DualRefCounted`, leading to hard to diagnose bugs - especially when moving a previously `Orphanable` type to be `DualRefCounted`.

This change removes the inheritance from `Orphanable`, and instead adds an overridable method `Orphaned` that implementors of `DualRefCounted` can implement.

In this way we get:
* compiler errors if someone chooses to write `OrphanablePtr` for one of these types
* compiler errors if someone implements `Orphan()` instead of `Orphaned()` in the wrong place (or vice versa)

Closes #36194

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36194 from ctiller:orf b96b831a96
PiperOrigin-RevId: 620916632
pull/36005/head^2
Craig Tiller 11 months ago committed by Copybara-Service
parent 8b8f43aecf
commit c079e533de
  1. 2
      src/core/client_channel/client_channel_filter.cc
  2. 2
      src/core/client_channel/subchannel.cc
  3. 6
      src/core/client_channel/subchannel.h
  4. 4
      src/core/ext/xds/xds_client.cc
  5. 8
      src/core/ext/xds/xds_client.h
  6. 4
      src/core/ext/xds/xds_client_grpc.cc
  7. 3
      src/core/ext/xds/xds_client_grpc.h
  8. 14
      src/core/ext/xds/xds_server_config_fetcher.cc
  9. 13
      src/core/lib/gprpp/dual_ref_counted.h
  10. 2
      src/core/lib/security/authorization/grpc_authorization_policy_provider.cc
  11. 4
      src/core/lib/security/authorization/grpc_authorization_policy_provider.h
  12. 2
      src/core/lib/surface/legacy_channel.cc
  13. 19
      src/core/load_balancing/grpclb/grpclb.cc
  14. 6
      src/core/load_balancing/health_check_client.cc
  15. 3
      src/core/load_balancing/health_check_client_internal.h
  16. 5
      src/core/load_balancing/lb_policy.h
  17. 2
      src/core/load_balancing/oob_backend_metric.cc
  18. 8
      src/core/load_balancing/oob_backend_metric_internal.h
  19. 36
      src/core/load_balancing/outlier_detection/outlier_detection.cc
  20. 101
      src/core/load_balancing/rls/rls.cc
  21. 5
      src/core/load_balancing/subchannel_interface.h
  22. 43
      src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc
  23. 7
      src/core/load_balancing/xds/xds_override_host.cc
  24. 2
      src/core/resolver/xds/xds_dependency_manager.cc
  25. 4
      src/core/resolver/xds/xds_dependency_manager.h
  26. 12
      src/core/resolver/xds/xds_resolver.cc
  27. 2
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  28. 4
      test/core/event_engine/posix/event_poller_posix_test.cc
  29. 2
      test/core/event_engine/posix/posix_endpoint_test.cc
  30. 4
      test/core/gprpp/dual_ref_counted_test.cc
  31. 6
      test/core/gprpp/ref_counted_ptr_test.cc
  32. 2
      test/core/server_config_selector/server_config_selector_test.cc

@ -684,7 +684,7 @@ class ClientChannelFilter::SubchannelWrapper : public SubchannelInterface {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
}
void Orphan() override {
void Orphaned() override {
if (!IsWorkSerializerDispatchEnabled()) return;
// Make sure we clean up the channel's subchannel maps inside the
// WorkSerializer.

@ -619,7 +619,7 @@ void Subchannel::ResetBackoff() {
work_serializer_.DrainQueue();
}
void Subchannel::Orphan() {
void Subchannel::Orphaned() {
// The subchannel_pool is only used once here in this subchannel, so the
// access can be outside of the lock.
if (subchannel_pool_ != nullptr) {

@ -250,9 +250,6 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Resets the connection backoff of the subchannel.
void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
// Tears down any existing connection, and arranges for destruction
void Orphan() override ABSL_LOCKS_EXCLUDED(mu_);
// Access to data producer map.
// We do not hold refs to the data producer; the implementation is
// expected to register itself upon construction and remove itself
@ -277,6 +274,9 @@ class Subchannel : public DualRefCounted<Subchannel> {
}
private:
// Tears down any existing connection, and arranges for destruction
void Orphaned() override ABSL_LOCKS_EXCLUDED(mu_);
// A linked list of ConnectivityStateWatcherInterfaces that are monitoring
// the subchannel's state.
class ConnectivityStateWatcherList {

@ -485,7 +485,7 @@ XdsClient::XdsChannel::~XdsChannel() {
// use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
// called from DualRefCounted::Unref, which cannot have a lock annotation for
// a lock in this subclass.
void XdsClient::XdsChannel::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS {
void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] orphaning xds channel %p for server %s",
xds_client(), this, server_.server_uri().c_str());
@ -1533,7 +1533,7 @@ XdsClient::~XdsClient() {
}
}
void XdsClient::Orphan() {
void XdsClient::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
}

@ -104,8 +104,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
return transport_factory_.get();
}
void Orphan() override;
// Start and cancel watch for a resource.
//
// The XdsClient takes ownership of the watcher, but the caller may
@ -161,6 +159,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
}
protected:
void Orphaned() override;
Mutex* mu() ABSL_LOCK_RETURNED(&mu_) { return &mu_; }
// Dumps the active xDS config to the provided
@ -220,8 +220,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
const XdsBootstrap::XdsServer& server);
~XdsChannel() override;
void Orphan() override;
XdsClient* xds_client() const { return xds_client_.get(); }
AdsCall* ads_call() const;
LrsCall* lrs_call() const;
@ -246,6 +244,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
absl::string_view server_uri() const { return server_.server_uri(); }
private:
void Orphaned() override;
void OnConnectivityFailure(absl::Status status);
// Enqueues error notifications to watchers. Caller must drain

@ -305,9 +305,9 @@ GrpcXdsClient::GrpcXdsClient(
},
{kMetricConnected, kMetricResources})) {}
void GrpcXdsClient::Orphan() {
void GrpcXdsClient::Orphaned() {
registered_metric_callback_.reset();
XdsClient::Orphan();
XdsClient::Orphaned();
MutexLock lock(g_mu);
auto it = g_xds_client_map->find(key_);
if (it != g_xds_client_map->end() && it->second == this) {

@ -66,8 +66,6 @@ class GrpcXdsClient : public XdsClient {
const ChannelArgs& args,
OrphanablePtr<XdsTransportFactory> transport_factory);
void Orphan() override;
// Helpers for encoding the XdsClient object in channel args.
static absl::string_view ChannelArgName() {
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_client";
@ -91,6 +89,7 @@ class GrpcXdsClient : public XdsClient {
class MetricsReporter;
void ReportCallbackMetrics(CallbackMetricReporter& reporter);
void Orphaned() override;
std::string key_;
OrphanablePtr<CertificateProviderStore> certificate_provider_store_;

@ -215,8 +215,6 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
absl::StatusOr<ChannelArgs> UpdateChannelArgsForConnection(
const ChannelArgs& args, grpc_endpoint* tcp) override;
void Orphan() override;
// Invoked by ListenerWatcher to start fetching referenced RDS resources.
void StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ListenerWatcher::mu_);
@ -246,6 +244,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
CreateOrGetXdsCertificateProviderFromFilterChainData(
const XdsListenerResource::FilterChainData* filter_chain);
void Orphaned() override;
// Helper functions invoked by RouteConfigWatcher when there are updates to
// RDS resources.
@ -415,11 +414,11 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
static_resource_.value(), http_filters_);
}
void Orphan() override {}
void CancelWatch() override { watcher_.reset(); }
private:
void Orphaned() override {}
RefCountedPtr<GrpcXdsClient> xds_client_;
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
static_resource_;
@ -449,8 +448,6 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
xds_client_.reset(DEBUG_LOCATION, "DynamicXdsServerConfigSelectorProvider");
}
void Orphan() override;
absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch(
std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
watcher) override;
@ -459,6 +456,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
private:
class RouteConfigWatcher;
void Orphaned() override;
void OnRouteConfigChanged(
std::shared_ptr<const XdsRouteConfigResource> rds_update);
void OnError(absl::Status status);
@ -783,7 +781,7 @@ void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
}
void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Orphan() {
Orphaned() {
MutexLock lock(&mu_);
// Cancel the RDS watches to clear up the weak refs
for (const auto& entry : rds_map_) {
@ -1281,7 +1279,7 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
}
void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
DynamicXdsServerConfigSelectorProvider::Orphan() {
DynamicXdsServerConfigSelectorProvider::Orphaned() {
XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), resource_name_,
route_config_watcher_,
false /* delay_unsubscription */);

@ -39,19 +39,19 @@ namespace grpc_core {
//
// Each class of refs can be incremented and decremented independently.
// Objects start with 1 strong ref and 0 weak refs at instantiation.
// When the strong refcount reaches 0, the object's Orphan() method is called.
// When the strong refcount reaches 0, the object's Orphaned() method is called.
// When the weak refcount reaches 0, the object is destroyed.
//
// This will be used by CRTP (curiously-recurring template pattern), e.g.:
// class MyClass : public RefCounted<MyClass> { ... };
template <typename Child>
class DualRefCounted : public Orphanable {
class DualRefCounted {
public:
// Not copyable nor movable.
DualRefCounted(const DualRefCounted&) = delete;
DualRefCounted& operator=(const DualRefCounted&) = delete;
~DualRefCounted() override = default;
virtual ~DualRefCounted() = default;
GRPC_MUST_USE_RESULT RefCountedPtr<Child> Ref() {
IncrementRefCount();
@ -93,7 +93,7 @@ class DualRefCounted : public Orphanable {
GPR_ASSERT(strong_refs > 0);
#endif
if (GPR_UNLIKELY(strong_refs == 1)) {
Orphan();
Orphaned();
}
// Now drop the weak ref.
WeakUnref();
@ -116,7 +116,7 @@ class DualRefCounted : public Orphanable {
(void)reason;
#endif
if (GPR_UNLIKELY(strong_refs == 1)) {
Orphan();
Orphaned();
}
// Now drop the weak ref.
WeakUnref(location, reason);
@ -257,6 +257,9 @@ class DualRefCounted : public Orphanable {
refs_(MakeRefPair(initial_refcount, 0)) {
}
// Ref count has dropped to zero, so the object is now orphaned.
virtual void Orphaned() = 0;
private:
// Allow RefCountedPtr<> to access IncrementRefCount().
template <typename T>

@ -179,7 +179,7 @@ absl::Status FileWatcherAuthorizationPolicyProvider::ForceUpdate() {
return absl::OkStatus();
}
void FileWatcherAuthorizationPolicyProvider::Orphan() {
void FileWatcherAuthorizationPolicyProvider::Orphaned() {
gpr_event_set(&shutdown_event_, reinterpret_cast<void*>(1));
if (refresh_thread_ != nullptr) {
refresh_thread_->Join();

@ -56,7 +56,7 @@ class StaticDataAuthorizationPolicyProvider
return {allow_engine_, deny_engine_};
}
void Orphan() override {}
void Orphaned() override {}
private:
RefCountedPtr<AuthorizationEngine> allow_engine_;
@ -87,7 +87,7 @@ class FileWatcherAuthorizationPolicyProvider
void SetCallbackForTesting(
std::function<void(bool contents_changed, absl::Status Status)> cb);
void Orphan() override;
void Orphaned() override;
AuthorizationEngines engines() override {
MutexLock lock(&mu_);

@ -309,7 +309,7 @@ class LegacyChannel::StateWatcher : public DualRefCounted<StateWatcher> {
}
// Invoked when both strong refs are released.
void Orphan() override {
void Orphaned() override {
WeakRef().release(); // Take a weak ref until completion is finished.
grpc_error_handle error =
timer_fired_

@ -140,8 +140,8 @@
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/lb_policy_registry.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/core/resolver/fake/fake_resolver.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/resolver/fake/fake_resolver.h"
#include "src/core/resolver/resolver.h"
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -317,7 +317,11 @@ class GrpcLb : public LoadBalancingPolicy {
lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
void Orphan() override {
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
void Orphaned() override {
if (!IsWorkSerializerDispatchEnabled()) {
if (!lb_policy_->shutting_down_) {
lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
@ -334,10 +338,6 @@ class GrpcLb : public LoadBalancingPolicy {
DEBUG_LOCATION);
}
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
RefCountedPtr<GrpcLb> lb_policy_;
std::string lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
@ -1627,10 +1627,9 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() {
if (lb_channel_ == nullptr) {
std::string uri_str =
absl::StrCat("fake:///", channel_control_helper()->GetAuthority());
lb_channel_.reset(
Channel::FromC(
grpc_channel_create(uri_str.c_str(), channel_credentials.get(),
lb_channel_args.ToC().get())));
lb_channel_.reset(Channel::FromC(
grpc_channel_create(uri_str.c_str(), channel_credentials.get(),
lb_channel_args.ToC().get())));
GPR_ASSERT(lb_channel_ != nullptr);
// Set up channelz linkage.
channelz::ChannelNode* child_channelz_node = lb_channel_->channelz_node();

@ -42,7 +42,6 @@
#include "src/core/client_channel/client_channel_channelz.h"
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/load_balancing/health_check_client_internal.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_stream_client.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
@ -59,9 +58,10 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/health_check_client_internal.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/proto/grpc/health/v1/health.upb.h"
namespace grpc_core {
@ -362,7 +362,7 @@ void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
}
void HealthProducer::Orphan() {
void HealthProducer::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
gpr_log(GPR_INFO, "HealthProducer %p: shutting down", this);
}

@ -59,8 +59,6 @@ class HealthProducer : public Subchannel::DataProducerInterface {
void Start(RefCountedPtr<Subchannel> subchannel);
void Orphan() override;
static UniqueTypeName Type() {
static UniqueTypeName::Factory kFactory("health_check");
return kFactory.Create();
@ -139,6 +137,7 @@ class HealthProducer : public Subchannel::DataProducerInterface {
// Handles a connectivity state change on the subchannel.
void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status);
void Orphaned() override;
RefCountedPtr<Subchannel> subchannel_;
ConnectivityWatcher* connectivity_watcher_;

@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/debug/trace.h"
@ -51,6 +50,7 @@
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/core/resolver/endpoint_addresses.h"
@ -275,7 +275,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual PickResult Pick(PickArgs args) = 0;
void Orphan() override {}
protected:
void Orphaned() override {}
};
/// A proxy object implemented by the client channel and used by the

@ -221,7 +221,7 @@ void OrcaProducer::Start(RefCountedPtr<Subchannel> subchannel) {
subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
}
void OrcaProducer::Orphan() {
void OrcaProducer::Orphaned() {
{
MutexLock lock(&mu_);
stream_client_.reset();

@ -28,8 +28,6 @@
#include <grpc/impl/connectivity_state.h>
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/oob_backend_metric.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/client_channel/subchannel_stream_client.h"
@ -38,6 +36,8 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/oob_backend_metric.h"
namespace grpc_core {
@ -50,8 +50,6 @@ class OrcaProducer : public Subchannel::DataProducerInterface {
public:
void Start(RefCountedPtr<Subchannel> subchannel);
void Orphan() override;
static UniqueTypeName Type() {
static UniqueTypeName::Factory kFactory("orca");
return kFactory.Create();
@ -67,6 +65,8 @@ class OrcaProducer : public Subchannel::DataProducerInterface {
class ConnectivityWatcher;
class OrcaStreamEventHandler;
void Orphaned() override;
// Returns the minimum requested reporting interval across all watchers.
Duration GetMinIntervalLocked() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);

@ -44,8 +44,6 @@
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/load_balancing/health_check_client_internal.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
@ -66,7 +64,9 @@
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/load_balancing/delegating_helper.h"
#include "src/core/load_balancing/health_check_client_internal.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/lb_policy_registry.h"
@ -144,22 +144,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
}
}
void Orphan() override {
if (!IsWorkSerializerDispatchEnabled()) {
if (subchannel_state_ != nullptr) {
subchannel_state_->RemoveSubchannel(this);
}
return;
}
work_serializer_->Run(
[self = WeakRefAsSubclass<SubchannelWrapper>()]() {
if (self->subchannel_state_ != nullptr) {
self->subchannel_state_->RemoveSubchannel(self.get());
}
},
DEBUG_LOCATION);
}
void Eject();
void Uneject();
@ -228,6 +212,22 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
bool ejected_;
};
void Orphaned() override {
if (!IsWorkSerializerDispatchEnabled()) {
if (subchannel_state_ != nullptr) {
subchannel_state_->RemoveSubchannel(this);
}
return;
}
work_serializer_->Run(
[self = WeakRefAsSubclass<SubchannelWrapper>()]() {
if (self->subchannel_state_ != nullptr) {
self->subchannel_state_->RemoveSubchannel(self.get());
}
},
DEBUG_LOCATION);
}
std::shared_ptr<WorkSerializer> work_serializer_;
RefCountedPtr<SubchannelState> subchannel_state_;
bool ejected_ = false;

@ -67,7 +67,6 @@
#include <grpc/support/log.h>
#include "src/core/client_channel/client_channel_filter.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
@ -94,19 +93,20 @@
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/json/json_writer.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/service_config/service_config_impl.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/load_balancing/delegating_helper.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/lb_policy_registry.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/resolver/resolver_registry.h"
#include "src/core/service_config/service_config_impl.h"
#include "src/proto/grpc/lookup/v1/rls.upb.h"
using ::grpc_event_engine::experimental::EventEngine;
@ -127,8 +127,7 @@ constexpr absl::string_view kMetricLabelPickResult = "grpc.lb.pick_result";
const auto kMetricCacheSize =
GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
"grpc.lb.rls.cache_size", "EXPERIMENTAL. Size of the RLS cache.",
"By",
"grpc.lb.rls.cache_size", "EXPERIMENTAL. Size of the RLS cache.", "By",
{kMetricLabelTarget, kMetricLabelRlsServerTarget,
kMetricLabelRlsInstanceUuid},
{}, false);
@ -167,8 +166,7 @@ const auto kMetricFailedPicks =
"grpc.lb.rls.failed_picks",
"EXPERIMENTAL. Number of LB picks failed due to either a failed RLS "
"request or the RLS channel being throttled.",
"{pick}", {kMetricLabelTarget, kMetricLabelRlsServerTarget}, {},
false);
"{pick}", {kMetricLabelTarget, kMetricLabelRlsServerTarget}, {}, false);
constexpr absl::string_view kRls = "rls_experimental";
const char kGrpc[] = "grpc";
@ -329,11 +327,6 @@ class RlsLb : public LoadBalancingPolicy {
public:
ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, std::string target);
// Note: We are forced to disable lock analysis here because
// Orphan() is called by OrphanablePtr<>, which cannot have lock
// annotations for this particular caller.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
const std::string& target() const { return target_; }
PickResult Pick(PickArgs args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
@ -401,6 +394,11 @@ class RlsLb : public LoadBalancingPolicy {
WeakRefCountedPtr<ChildPolicyWrapper> wrapper_;
};
// Note: We are forced to disable lock analysis here because
// Orphan() is called by Unref() which is called by RefCountedPtr<>, which
// cannot have lock annotations for this particular caller.
void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
RefCountedPtr<RlsLb> lb_policy_;
std::string target_;
@ -804,7 +802,7 @@ RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
lb_policy_->child_policy_map_.emplace(target_, this);
}
void RlsLb::ChildPolicyWrapper::Orphan() {
void RlsLb::ChildPolicyWrapper::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] ChildPolicyWrapper=%p [%s]: shutdown",
lb_policy_.get(), this, target_.c_str());
@ -1092,8 +1090,8 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) {
if (entry->backoff_time() >= now) {
return PickFromDefaultTargetOrFail(
"RLS call in backoff", args,
absl::UnavailableError(absl::StrCat(
"RLS request failed: ", entry->status().ToString())));
absl::UnavailableError(absl::StrCat("RLS request failed: ",
entry->status().ToString())));
}
}
// RLS call pending. Queue the pick.
@ -1112,8 +1110,8 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
lb_policy_.get(), this, reason);
}
auto pick_result = default_child_policy_->Pick(args);
lb_policy_->MaybeExportPickCount(
kMetricDefaultTargetPicks, config_->default_target(), pick_result);
lb_policy_->MaybeExportPickCount(kMetricDefaultTargetPicks,
config_->default_target(), pick_result);
return pick_result;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
@ -1122,10 +1120,10 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
}
auto& stats_plugins =
lb_policy_->channel_control_helper()->GetStatsPluginGroup();
stats_plugins.AddCounter(
kMetricFailedPicks, 1,
{lb_policy_->channel_control_helper()->GetTarget(),
config_->lookup_service()}, {});
stats_plugins.AddCounter(kMetricFailedPicks, 1,
{lb_policy_->channel_control_helper()->GetTarget(),
config_->lookup_service()},
{});
return PickResult::Fail(std::move(status));
}
@ -1271,8 +1269,8 @@ LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
}
auto pick_result = child_policy_wrapper->Pick(args);
lb_policy_->MaybeExportPickCount(
kMetricTargetPicks, child_policy_wrapper->target(), pick_result);
lb_policy_->MaybeExportPickCount(kMetricTargetPicks,
child_policy_wrapper->target(), pick_result);
return pick_result;
}
@ -1453,16 +1451,16 @@ void RlsLb::Cache::Shutdown() {
}
void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
reporter.Report(kMetricCacheSize, size_,
{lb_policy_->channel_control_helper()->GetTarget(),
lb_policy_->config_->lookup_service(),
lb_policy_->instance_uuid_},
{});
reporter.Report(kMetricCacheEntries, map_.size(),
{lb_policy_->channel_control_helper()->GetTarget(),
lb_policy_->config_->lookup_service(),
lb_policy_->instance_uuid_},
{});
reporter.Report(
kMetricCacheSize, size_,
{lb_policy_->channel_control_helper()->GetTarget(),
lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
{});
reporter.Report(
kMetricCacheEntries, map_.size(),
{lb_policy_->channel_control_helper()->GetTarget(),
lb_policy_->config_->lookup_service(), lb_policy_->instance_uuid_},
{});
}
void RlsLb::Cache::StartCleanupTimer() {
@ -1630,10 +1628,9 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
args = args.Set(GRPC_ARG_SERVICE_CONFIG, service_config)
.Set(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 1);
}
channel_.reset(
Channel::FromC(
grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
creds.get(), args.ToC().get())));
channel_.reset(Channel::FromC(
grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
creds.get(), args.ToC().get())));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] RlsChannel=%p: created channel %p for %s",
lb_policy_.get(), this, channel_.get(),
@ -1947,9 +1944,9 @@ std::string GenerateUUID() {
RlsLb::RlsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
instance_uuid_(
channel_args().GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
.value_or(GenerateUUID())),
instance_uuid_(channel_args()
.GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID)
.value_or(GenerateUUID())),
cache_(this),
registered_metric_callback_(
channel_control_helper()->GetStatsPluginGroup().RegisterCallback(
@ -2229,26 +2226,20 @@ void RlsLb::UpdatePickerLocked() {
void RlsLb::MaybeExportPickCount(
GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
absl::string_view target, const PickResult& pick_result) {
absl::string_view pick_result_string =
Match(pick_result.result,
[](const LoadBalancingPolicy::PickResult::Complete&) {
return "complete";
},
[](const LoadBalancingPolicy::PickResult::Queue&) {
return "";
},
[](const LoadBalancingPolicy::PickResult::Fail&) {
return "fail";
},
[](const LoadBalancingPolicy::PickResult::Drop&) {
return "drop";
});
absl::string_view pick_result_string = Match(
pick_result.result,
[](const LoadBalancingPolicy::PickResult::Complete&) {
return "complete";
},
[](const LoadBalancingPolicy::PickResult::Queue&) { return ""; },
[](const LoadBalancingPolicy::PickResult::Fail&) { return "fail"; },
[](const LoadBalancingPolicy::PickResult::Drop&) { return "drop"; });
if (pick_result_string.empty()) return; // Don't report queued picks.
auto& stats_plugins = channel_control_helper()->GetStatsPluginGroup();
stats_plugins.AddCounter(
handle, 1,
{channel_control_helper()->GetTarget(), config_->lookup_service(),
target, pick_result_string},
{channel_control_helper()->GetTarget(), config_->lookup_service(), target,
pick_result_string},
{});
}

@ -63,8 +63,6 @@ class SubchannelInterface : public DualRefCounted<SubchannelInterface> {
~SubchannelInterface() override = default;
void Orphan() override {}
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered ~immediately.
// Subsequent callbacks will be delivered as the subchannel's state
@ -100,6 +98,9 @@ class SubchannelInterface : public DualRefCounted<SubchannelInterface> {
// Cancels a data watch.
virtual void CancelDataWatcher(DataWatcherInterface* watcher) = 0;
protected:
void Orphaned() override {}
};
// A class that delegates to another subchannel, to be used in cases

@ -67,12 +67,12 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/endpoint_list.h"
#include "src/core/load_balancing/oob_backend_metric.h"
#include "src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h"
#include "src/core/load_balancing/weighted_target/weighted_target.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/oob_backend_metric.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h"
#include "src/core/load_balancing/weighted_target/weighted_target.h"
#include "src/core/resolver/endpoint_addresses.h"
namespace grpc_core {
@ -85,13 +85,12 @@ constexpr absl::string_view kWeightedRoundRobin = "weighted_round_robin";
constexpr absl::string_view kMetricLabelLocality = "grpc.lb.locality";
const auto kMetricRrFallback =
GlobalInstrumentsRegistry::RegisterUInt64Counter(
"grpc.lb.wrr.rr_fallback",
"EXPERIMENTAL. Number of scheduler updates in which there were not "
"enough endpoints with valid weight, which caused the WRR policy to "
"fall back to RR behavior.",
"{update}", {kMetricLabelTarget}, {kMetricLabelLocality}, false);
const auto kMetricRrFallback = GlobalInstrumentsRegistry::RegisterUInt64Counter(
"grpc.lb.wrr.rr_fallback",
"EXPERIMENTAL. Number of scheduler updates in which there were not "
"enough endpoints with valid weight, which caused the WRR policy to "
"fall back to RR behavior.",
"{update}", {kMetricLabelTarget}, {kMetricLabelLocality}, false);
const auto kMetricEndpointWeightNotYetUsable =
GlobalInstrumentsRegistry::RegisterUInt64Counter(
@ -319,8 +318,6 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
PickResult Pick(PickArgs args) override;
void Orphan() override;
private:
// A call tracker that collects per-call endpoint utilization reports.
class SubchannelCallTracker : public SubchannelCallTrackerInterface {
@ -352,6 +349,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
RefCountedPtr<EndpointWeight> weight;
};
void Orphaned() override;
// Returns the index into endpoints_ to be picked.
size_t PickIndex();
@ -556,7 +555,7 @@ WeightedRoundRobin::Picker::~Picker() {
}
}
void WeightedRoundRobin::Picker::Orphan() {
void WeightedRoundRobin::Picker::Orphaned() {
MutexLock lock(&timer_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p picker %p] cancelling timer", wrr_.get(), this);
@ -616,16 +615,16 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
now, config_->weight_expiration_period(), config_->blackout_period(),
&num_not_yet_usable, &num_stale);
weights.push_back(weight);
stats_plugins.RecordHistogram(
kMetricEndpointWeights, weight,
{wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_});
stats_plugins.RecordHistogram(kMetricEndpointWeights, weight,
{wrr_->channel_control_helper()->GetTarget()},
{wrr_->locality_name_});
}
stats_plugins.AddCounter(
kMetricEndpointWeightNotYetUsable, num_not_yet_usable,
{wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_});
stats_plugins.AddCounter(
kMetricEndpointWeightStale, num_stale,
{wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_});
stats_plugins.AddCounter(kMetricEndpointWeightStale, num_stale,
{wrr_->channel_control_helper()->GetTarget()},
{wrr_->locality_name_});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p picker %p] new weights: %s", wrr_.get(), this,
absl::StrJoin(weights, " ").c_str());
@ -645,9 +644,9 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
gpr_log(GPR_INFO, "[WRR %p picker %p] no scheduler, falling back to RR",
wrr_.get(), this);
}
stats_plugins.AddCounter(
kMetricRrFallback, 1, {wrr_->channel_control_helper()->GetTarget()},
{wrr_->locality_name_});
stats_plugins.AddCounter(kMetricRrFallback, 1,
{wrr_->channel_control_helper()->GetTarget()},
{wrr_->locality_name_});
}
{
MutexLock lock(&scheduler_mu_);

@ -48,7 +48,6 @@
#include <grpc/support/log.h>
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/address_utils/parse_address.h"
@ -75,6 +74,7 @@
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/load_balancing/delegating_helper.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
@ -182,8 +182,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
WeakRefCountedPtr<SubchannelWrapper> subchannel_;
};
void Orphan() override;
void Orphaned() override;
void UpdateConnectivityState(grpc_connectivity_state state,
absl::Status status);
@ -1112,7 +1111,7 @@ void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch(
}
}
void XdsOverrideHostLb::SubchannelWrapper::Orphan() {
void XdsOverrideHostLb::SubchannelWrapper::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] subchannel wrapper %p orphaned",

@ -330,7 +330,7 @@ class XdsDependencyManager::DnsResultHandler : public Resolver::ResultHandler {
// XdsDependencyManager::ClusterSubscription
//
void XdsDependencyManager::ClusterSubscription::Orphan() {
void XdsDependencyManager::ClusterSubscription::Orphaned() {
dependency_mgr_->work_serializer_->Run(
[self = WeakRef()]() {
self->dependency_mgr_->OnClusterSubscriptionUnref(self->cluster_name_,

@ -124,11 +124,11 @@ class XdsDependencyManager : public RefCounted<XdsDependencyManager>,
: cluster_name_(cluster_name),
dependency_mgr_(std::move(dependency_mgr)) {}
void Orphan() override;
absl::string_view cluster_name() const { return cluster_name_; }
private:
void Orphaned() override;
std::string cluster_name_;
RefCountedPtr<XdsDependencyManager> dependency_mgr_;
};

@ -79,20 +79,20 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/resolver/resolver.h"
#include "src/core/resolver/resolver_factory.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/service_config/service_config.h"
#include "src/core/service_config/service_config_impl.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/core/load_balancing/ring_hash/ring_hash.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/resolver/resolver.h"
#include "src/core/resolver/resolver_factory.h"
#include "src/core/resolver/xds/xds_dependency_manager.h"
#include "src/core/resolver/xds/xds_resolver_attributes.h"
#include "src/core/resolver/xds/xds_resolver_trace.h"
#include "src/core/service_config/service_config.h"
#include "src/core/service_config/service_config_impl.h"
namespace grpc_core {
@ -173,7 +173,7 @@ class XdsResolver : public Resolver {
cluster_subscription_(std::move(cluster_subscription)),
cluster_key_(cluster_key) {}
void Orphan() override {
void Orphaned() override {
XdsResolver* resolver_ptr = resolver_.get();
resolver_ptr->work_serializer_->Run(
[resolver = std::move(resolver_)]() {

@ -510,7 +510,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
picker_.get());
}
void Orphan() override {
void Orphaned() override {
absl::Notification notification;
ExecCtx exec_ctx;
test_->work_serializer_->Run(

@ -585,7 +585,7 @@ class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
~WakeupFdHandle() override { delete on_read_; }
void Orphan() override {
void Orphaned() override {
// Once the handle has orphaned itself, decrement
// kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
// send a Kick to the poller.
@ -650,7 +650,7 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
}
WeakRef().release();
}
void Orphan() override { signal.Notify(); }
void Orphaned() override { signal.Notify(); }
void Start() {
// Start executing Work(..).
scheduler_->Run([this]() { Work(); });

@ -156,7 +156,7 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
: engine_(std::move(engine)), poller_(poller) {
WeakRef().release();
}
void Orphan() override { signal.Notify(); }
void Orphaned() override { signal.Notify(); }
void Start() {
// Start executing Work(..).
engine_->Run([this]() { Work(); });

@ -31,7 +31,7 @@ class Foo : public DualRefCounted<Foo> {
Foo() = default;
~Foo() override { GPR_ASSERT(shutting_down_); }
void Orphan() override { shutting_down_ = true; }
void Orphaned() override { shutting_down_ = true; }
private:
bool shutting_down_ = false;
@ -94,7 +94,7 @@ class FooWithTracing : public DualRefCounted<FooWithTracing> {
FooWithTracing() : DualRefCounted("FooWithTracing") {}
~FooWithTracing() override { GPR_ASSERT(shutting_down_); }
void Orphan() override { shutting_down_ = true; }
void Orphaned() override { shutting_down_ = true; }
private:
bool shutting_down_ = false;

@ -276,7 +276,7 @@ class Bar : public DualRefCounted<Bar> {
~Bar() override { GPR_ASSERT(shutting_down_); }
void Orphan() override { shutting_down_ = true; }
void Orphaned() override { shutting_down_ = true; }
int value() const { return value_; }
@ -432,7 +432,7 @@ class BarWithTracing : public DualRefCounted<BarWithTracing> {
~BarWithTracing() override { GPR_ASSERT(shutting_down_); }
void Orphan() override { shutting_down_ = true; }
void Orphaned() override { shutting_down_ = true; }
private:
bool shutting_down_ = false;
@ -455,7 +455,7 @@ class WeakBaseClass : public DualRefCounted<WeakBaseClass> {
~WeakBaseClass() override { GPR_ASSERT(shutting_down_); }
void Orphan() override { shutting_down_ = true; }
void Orphaned() override { shutting_down_ = true; }
private:
bool shutting_down_ = false;

@ -36,7 +36,7 @@ class TestServerConfigSelectorProvider : public ServerConfigSelectorProvider {
return absl::UnavailableError("Test ServerConfigSelector");
}
void Orphan() override {}
void Orphaned() override {}
void CancelWatch() override {}
};

Loading…
Cancel
Save