From fcf8e2431cb1cbd6c7349f6b97df8c4089c2d195 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 28 Sep 2022 18:21:54 -0700 Subject: [PATCH] EventEngine::RunAfter: XDS Cluster Manager (#30044) * EventEngine::RunAfter: XDS Cluster Manager * fix * reviewer feedback * sanity --- BUILD | 6 +- .../lb_policy/xds/xds_cluster_manager.cc | 71 ++++++++----------- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/BUILD b/BUILD index 73c518be8bc..affeb5fc2c7 100644 --- a/BUILD +++ b/BUILD @@ -5032,20 +5032,22 @@ grpc_cc_library( "absl/status", "absl/status:statusor", "absl/strings", + "absl/types:optional", ], language = "c++", deps = [ "channel_args", - "closure", "config", "debug_location", + "default_event_engine", + "error", "gpr", + "gpr_platform", "grpc_base", "grpc_client_channel", "grpc_public_hdrs", "grpc_resolver_xds_header", "grpc_trace", - "iomgr_timer", "json", "json_args", "json_object_loader", diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index 11580287058..12c74b758a9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -31,7 +31,9 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include #include #include @@ -41,6 +43,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -48,10 +51,7 @@ #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" @@ -62,14 +62,16 @@ #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" -#define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000) - namespace grpc_core { TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb"); namespace { +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + +constexpr Duration kChildRetentionInterval = Duration::Minutes(15); constexpr absl::string_view kXdsClusterManager = "xds_cluster_manager_experimental"; @@ -206,8 +208,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { OrphanablePtr CreateChildPolicyLocked( const ChannelArgs& args); - static void OnDelayedRemovalTimer(void* arg, grpc_error_handle error); - void OnDelayedRemovalTimerLocked(grpc_error_handle error); + void OnDelayedRemovalTimerLocked(); // The owning LB policy. RefCountedPtr xds_cluster_manager_policy_; @@ -221,10 +222,9 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; // States for delayed removal. - grpc_timer delayed_removal_timer_; - grpc_closure on_delayed_removal_timer_; - bool delayed_removal_timer_callback_pending_ = false; + absl::optional delayed_removal_timer_handle_; bool shutdown_ = false; + std::shared_ptr engine_; }; ~XdsClusterManagerLb() override; @@ -430,14 +430,13 @@ XdsClusterManagerLb::ClusterChild::ClusterChild( RefCountedPtr xds_cluster_manager_policy, const std::string& name) : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)), - name_(name) { + name_(name), + engine_(GetDefaultEventEngine()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] created ClusterChild %p for %s", xds_cluster_manager_policy_.get(), this, name_.c_str()); } - GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, - grpc_schedule_on_exec_ctx); } XdsClusterManagerLb::ClusterChild::~ClusterChild() { @@ -466,8 +465,8 @@ void XdsClusterManagerLb::ClusterChild::Orphan() { // Drop our ref to the child's picker, in case it's holding a ref to // the child. picker_wrapper_.reset(); - if (delayed_removal_timer_callback_pending_) { - grpc_timer_cancel(&delayed_removal_timer_); + if (delayed_removal_timer_handle_.has_value()) { + engine_->Cancel(*delayed_removal_timer_handle_); } shutdown_ = true; Unref(); @@ -509,9 +508,9 @@ absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked( if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus(); // Update child weight. // Reactivate if needed. - if (delayed_removal_timer_callback_pending_) { - delayed_removal_timer_callback_pending_ = false; - grpc_timer_cancel(&delayed_removal_timer_); + if (delayed_removal_timer_handle_.has_value() && + engine_->Cancel(*delayed_removal_timer_handle_)) { + delayed_removal_timer_handle_.reset(); } // Create child policy if needed. if (child_policy_ == nullptr) { @@ -544,33 +543,23 @@ void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() { void XdsClusterManagerLb::ClusterChild::DeactivateLocked() { // If already deactivated, don't do that again. - if (delayed_removal_timer_callback_pending_) return; + if (delayed_removal_timer_handle_.has_value()) return; // Set the child weight to 0 so that future picker won't contain this child. // Start a timer to delete the child. - Ref(DEBUG_LOCATION, "ClusterChild+timer").release(); - grpc_timer_init(&delayed_removal_timer_, - Timestamp::Now() + - Duration::Milliseconds( - GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS), - &on_delayed_removal_timer_); - delayed_removal_timer_callback_pending_ = true; -} - -void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer( - void* arg, grpc_error_handle error) { - ClusterChild* self = static_cast(arg); - self->xds_cluster_manager_policy_->work_serializer()->Run( - [self, error]() { self->OnDelayedRemovalTimerLocked(error); }, - DEBUG_LOCATION); + delayed_removal_timer_handle_ = engine_->RunAfter( + kChildRetentionInterval, + [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable { + self->xds_cluster_manager_policy_->work_serializer()->Run( + [self = std::move(self)]() { self->OnDelayedRemovalTimerLocked(); }, + DEBUG_LOCATION); + }); } -void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked( - grpc_error_handle error) { - delayed_removal_timer_callback_pending_ = false; - if (error.ok() && !shutdown_) { +void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() { + delayed_removal_timer_handle_.reset(); + if (!shutdown_) { xds_cluster_manager_policy_->children_.erase(name_); } - Unref(DEBUG_LOCATION, "ClusterChild+timer"); } // @@ -699,7 +688,7 @@ class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); - } + } // namespace absl::string_view name() const override { return kXdsClusterManager; } @@ -717,7 +706,7 @@ class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory { json, JsonArgs(), "errors validating xds_cluster_manager LB policy config"); } -}; +}; // namespace grpc_core } // namespace