EventEngine::RunAt(): Weighted Target LB Policy removal timer (#30013)

* EventEngine::RunAt(): Weighted Target LB Policy removal timer

* inline the OnTimer method

* fix

* mutable

* fix mutable

* RunAfter EE API

* sanitize
pull/29999/head^2
AJ Heller 3 years ago committed by GitHub
parent 19e6a6d232
commit d374a2ea9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 53
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc

@ -4361,15 +4361,15 @@ grpc_cc_library(
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
"absl/types:optional",
], ],
language = "c++", language = "c++",
tags = ["grpc-autodeps"], tags = ["grpc-autodeps"],
deps = [ deps = [
"channel_args", "channel_args",
"closure",
"debug_location", "debug_location",
"default_event_engine_factory_hdrs",
"error", "error",
"exec_ctx",
"gpr_base", "gpr_base",
"gpr_platform", "gpr_platform",
"grpc_base", "grpc_base",
@ -4377,7 +4377,6 @@ grpc_cc_library(
"grpc_codegen", "grpc_codegen",
"grpc_lb_address_filtering", "grpc_lb_address_filtering",
"grpc_trace", "grpc_trace",
"iomgr_timer",
"json", "json",
"orphanable", "orphanable",
"ref_counted", "ref_counted",

@ -32,7 +32,9 @@
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/strings/str_cat.h" #include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h> #include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -45,17 +47,15 @@
#include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/json/json.h" #include "src/core/lib/json/json.h"
#include "src/core/lib/resolver/server_address.h" #include "src/core/lib/resolver/server_address.h"
@ -69,6 +69,9 @@ TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
namespace { namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr char kWeightedTarget[] = "weighted_target_experimental"; constexpr char kWeightedTarget[] = "weighted_target_experimental";
// How long we keep a child around for after it has been removed from // How long we keep a child around for after it has been removed from
@ -191,13 +194,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void Orphan() override; void Orphan() override;
private: private:
static void OnTimer(void* arg, grpc_error_handle error); void OnTimerLocked();
void OnTimerLocked(grpc_error_handle error);
RefCountedPtr<WeightedChild> weighted_child_; RefCountedPtr<WeightedChild> weighted_child_;
grpc_timer timer_; absl::optional<EventEngine::TaskHandle> timer_handle_;
grpc_closure on_timer_;
bool timer_pending_ = true;
}; };
// Methods for dealing with the child policy. // Methods for dealing with the child policy.
@ -444,14 +444,16 @@ void WeightedTargetLb::UpdateStateLocked() {
WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer( WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child) RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
: weighted_child_(std::move(weighted_child)) { : weighted_child_(std::move(weighted_child)) {
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr); timer_handle_ = GetDefaultEventEngine()->RunAfter(
Ref().release(); kChildRetentionInterval, [self = Ref()]() mutable {
grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval, self->weighted_child_->weighted_target_policy_->work_serializer()->Run(
&on_timer_); [self = std::move(self)] { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
} }
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() { void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
if (timer_pending_) { if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: cancelling " "[weighted_target_lb %p] WeightedChild %p %s: cancelling "
@ -459,29 +461,16 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
weighted_child_->weighted_target_policy_.get(), weighted_child_->weighted_target_policy_.get(),
weighted_child_.get(), weighted_child_->name_.c_str()); weighted_child_.get(), weighted_child_->name_.c_str());
} }
timer_pending_ = false; GetDefaultEventEngine()->Cancel(*timer_handle_);
grpc_timer_cancel(&timer_);
} }
Unref(); Unref();
} }
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimer( void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
void* arg, grpc_error_handle error) { GPR_ASSERT(timer_handle_.has_value());
auto* self = static_cast<DelayedRemovalTimer*>(arg); timer_handle_.reset();
(void)GRPC_ERROR_REF(error); // ref owned by lambda weighted_child_->weighted_target_policy_->targets_.erase(
self->weighted_child_->weighted_target_policy_->work_serializer()->Run( weighted_child_->name_);
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked(
grpc_error_handle error) {
if (GRPC_ERROR_IS_NONE(error) && timer_pending_) {
timer_pending_ = false;
weighted_child_->weighted_target_policy_->targets_.erase(
weighted_child_->name_);
}
GRPC_ERROR_UNREF(error);
Unref();
} }
// //

Loading…
Cancel
Save