EventEngine::RunAfter: XDS Cluster Manager (#30044)

* EventEngine::RunAfter: XDS Cluster Manager

* fix

* reviewer feedback

* sanity
pull/31162/head
AJ Heller 3 years ago committed by GitHub
parent 71f4fa7f92
commit fcf8e2431c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      BUILD
  2. 71
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc

@ -5032,20 +5032,22 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
deps = [
"channel_args",
"closure",
"config",
"debug_location",
"default_event_engine",
"error",
"gpr",
"gpr_platform",
"grpc_base",
"grpc_client_channel",
"grpc_public_hdrs",
"grpc_resolver_xds_header",
"grpc_trace",
"iomgr_timer",
"json",
"json_args",
"json_object_loader",

@ -31,7 +31,9 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
@ -41,6 +43,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -48,10 +51,7 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -62,14 +62,16 @@
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
#define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
namespace grpc_core {
TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
constexpr absl::string_view kXdsClusterManager =
"xds_cluster_manager_experimental";
@ -206,8 +208,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
static void OnDelayedRemovalTimer(void* arg, grpc_error_handle error);
void OnDelayedRemovalTimerLocked(grpc_error_handle error);
void OnDelayedRemovalTimerLocked();
// The owning LB policy.
RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
@ -221,10 +222,9 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
bool shutdown_ = false;
std::shared_ptr<EventEngine> engine_;
};
~XdsClusterManagerLb() override;
@ -430,14 +430,13 @@ XdsClusterManagerLb::ClusterChild::ClusterChild(
RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
const std::string& name)
: xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
name_(name) {
name_(name),
engine_(GetDefaultEventEngine()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
xds_cluster_manager_policy_.get(), this, name_.c_str());
}
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
}
XdsClusterManagerLb::ClusterChild::~ClusterChild() {
@ -466,8 +465,8 @@ void XdsClusterManagerLb::ClusterChild::Orphan() {
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
if (delayed_removal_timer_handle_.has_value()) {
engine_->Cancel(*delayed_removal_timer_handle_);
}
shutdown_ = true;
Unref();
@ -509,9 +508,9 @@ absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.
// Reactivate if needed.
if (delayed_removal_timer_callback_pending_) {
delayed_removal_timer_callback_pending_ = false;
grpc_timer_cancel(&delayed_removal_timer_);
if (delayed_removal_timer_handle_.has_value() &&
engine_->Cancel(*delayed_removal_timer_handle_)) {
delayed_removal_timer_handle_.reset();
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
@ -544,33 +543,23 @@ void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
// If already deactivated, don't do that again.
if (delayed_removal_timer_callback_pending_) return;
if (delayed_removal_timer_handle_.has_value()) return;
// Set the child weight to 0 so that future picker won't contain this child.
// Start a timer to delete the child.
Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
grpc_timer_init(&delayed_removal_timer_,
Timestamp::Now() +
Duration::Milliseconds(
GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS),
&on_delayed_removal_timer_);
delayed_removal_timer_callback_pending_ = true;
}
void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
void* arg, grpc_error_handle error) {
ClusterChild* self = static_cast<ClusterChild*>(arg);
self->xds_cluster_manager_policy_->work_serializer()->Run(
[self, error]() { self->OnDelayedRemovalTimerLocked(error); },
DEBUG_LOCATION);
delayed_removal_timer_handle_ = engine_->RunAfter(
kChildRetentionInterval,
[self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
self->xds_cluster_manager_policy_->work_serializer()->Run(
[self = std::move(self)]() { self->OnDelayedRemovalTimerLocked(); },
DEBUG_LOCATION);
});
}
void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
grpc_error_handle error) {
delayed_removal_timer_callback_pending_ = false;
if (error.ok() && !shutdown_) {
void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() {
delayed_removal_timer_handle_.reset();
if (!shutdown_) {
xds_cluster_manager_policy_->children_.erase(name_);
}
Unref(DEBUG_LOCATION, "ClusterChild+timer");
}
//
@ -699,7 +688,7 @@ class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
}
} // namespace
absl::string_view name() const override { return kXdsClusterManager; }
@ -717,7 +706,7 @@ class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
json, JsonArgs(),
"errors validating xds_cluster_manager LB policy config");
}
};
}; // namespace grpc_core
} // namespace

Loading…
Cancel
Save