EventEngine::RunAfter: Priority LB (#30045)

* EventEngine::RunAfter: Priority LB

* Have the ChildPriority own an EventEngine

* Automated change: Fix sanity tests

* fix use after move; add exec_ctx

* Automated change: Fix sanity tests

* fix cancellation ordering problem

* reviewer feedback

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/31842/head
AJ Heller 2 years ago committed by GitHub
parent 89f3b1f293
commit f17592d48d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/BUILD
  2. 107
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc

@ -4283,8 +4283,6 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"closure",
"error",
"grpc_lb_address_filtering",
"json",
"json_args",
@ -4298,11 +4296,11 @@ grpc_cc_library(
"validation_errors",
"//:config",
"//:debug_location",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_trace",
"//:iomgr_timer",
"//:orphanable",
"//:ref_counted_ptr",
"//:server_address",

@ -49,10 +49,8 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -69,6 +67,8 @@ TraceFlag grpc_lb_priority_trace(false, "priority_lb");
namespace {
using ::grpc_event_engine::experimental::EventEngine;
constexpr absl::string_view kPriority = "priority_experimental";
// How long we keep a child around for after it is no longer being used
@ -175,7 +175,7 @@ class PriorityLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -190,13 +190,10 @@ class PriorityLb : public LoadBalancingPolicy {
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle);
void OnTimerLocked();
RefCountedPtr<ChildPriority> child_priority_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
absl::optional<EventEngine::TaskHandle> timer_handle_;
};
class FailoverTimer : public InternallyRefCounted<FailoverTimer> {
@ -206,13 +203,10 @@ class PriorityLb : public LoadBalancingPolicy {
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle);
void OnTimerLocked();
RefCountedPtr<ChildPriority> child_priority_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
absl::optional<EventEngine::TaskHandle> timer_handle_;
};
// Methods for dealing with the child policy.
@ -522,35 +516,38 @@ PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
child_priority_->name_.c_str(), child_priority_.get(),
kChildRetentionInterval.millis());
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(&timer_, Timestamp::Now() + kChildRetentionInterval,
&on_timer_);
timer_handle_ =
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION,
"Timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->child_priority_->priority_policy_->work_serializer()->Run(
[self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
if (timer_pending_) {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref();
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<DeactivationTimer*>(arg);
self->child_priority_->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked(
grpc_error_handle error) {
if (error.ok() && timer_pending_) {
void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() {
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivation timer fired, "
@ -558,10 +555,8 @@ void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked(
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
child_priority_->priority_policy_->DeleteChild(child_priority_.get());
}
Unref(DEBUG_LOCATION, "Timer");
}
//
@ -580,39 +575,40 @@ PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
child_priority_.get(),
child_priority_->priority_policy_->child_failover_timeout_.millis());
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(
&timer_,
Timestamp::Now() +
child_priority_->priority_policy_->child_failover_timeout_,
&on_timer_);
timer_handle_ =
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->RunAfter(
child_priority_->priority_policy_->child_failover_timeout_,
[self = Ref(DEBUG_LOCATION, "Timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->child_priority_->priority_policy_->work_serializer()
->Run([self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
if (timer_pending_) {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): cancelling failover timer",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref();
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FailoverTimer*>(arg);
self->child_priority_->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked(
grpc_error_handle error) {
if (error.ok() && timer_pending_) {
void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() {
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): failover timer fired, "
@ -620,13 +616,11 @@ void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked(
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
child_priority_->OnConnectivityStateUpdateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
nullptr);
}
Unref(DEBUG_LOCATION, "Timer");
}
//
@ -826,8 +820,7 @@ absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() {
return priority_->priority_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
PriorityLb::ChildPriority::Helper::GetEventEngine() {
EventEngine* PriorityLb::ChildPriority::Helper::GetEventEngine() {
return priority_->priority_policy_->channel_control_helper()
->GetEventEngine();
}

Loading…
Cancel
Save