priority LB: avoid possibility of rescheduling a timer before it fires (#29188)

* priority LB: avoid possibility of rescheduling a timer before it fires

* clang-format

* fix memory leak

* small change, just to be paranoid

* inline StartFailoverTimerLocked()

* initialize timer_pending_ to true

* don't check shutting_down_ in timer callbacks
pull/29231/head
Mark D. Roth 3 years ago committed by GitHub
parent 2d34ccff42
commit 70fa55155b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 314
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc

@ -106,7 +106,6 @@ class PriorityLb : public LoadBalancingPolicy {
void ResetBackoffLocked();
void DeactivateLocked();
void MaybeReactivateLocked();
void MaybeCancelFailoverTimerLocked();
void Orphan() override;
@ -122,9 +121,7 @@ class PriorityLb : public LoadBalancingPolicy {
return connectivity_status_;
}
bool failover_timer_callback_pending() const {
return failover_timer_callback_pending_;
}
bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
private:
// A simple wrapper for ref-counting a picker from the child policy.
@ -170,6 +167,38 @@ class PriorityLb : public LoadBalancingPolicy {
RefCountedPtr<ChildPriority> priority_;
};
class DeactivationTimer : public InternallyRefCounted<DeactivationTimer> {
public:
explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle);
RefCountedPtr<ChildPriority> child_priority_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
};
class FailoverTimer : public InternallyRefCounted<FailoverTimer> {
public:
explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle);
RefCountedPtr<ChildPriority> child_priority_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
@ -178,13 +207,6 @@ class PriorityLb : public LoadBalancingPolicy {
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker);
void StartFailoverTimerLocked();
static void OnFailoverTimer(void* arg, grpc_error_handle error);
void OnFailoverTimerLocked(grpc_error_handle error);
static void OnDeactivationTimer(void* arg, grpc_error_handle error);
void OnDeactivationTimerLocked(grpc_error_handle error);
RefCountedPtr<PriorityLb> priority_policy_;
const std::string name_;
bool ignore_reresolution_requests_ = false;
@ -195,15 +217,8 @@ class PriorityLb : public LoadBalancingPolicy {
absl::Status connectivity_status_;
RefCountedPtr<RefCountedPicker> picker_wrapper_;
// States for delayed removal.
grpc_timer deactivation_timer_;
grpc_closure on_deactivation_timer_;
bool deactivation_timer_callback_pending_ = false;
// States of failover.
grpc_timer failover_timer_;
grpc_closure on_failover_timer_;
bool failover_timer_callback_pending_ = false;
OrphanablePtr<DeactivationTimer> deactivation_timer_;
OrphanablePtr<FailoverTimer> failover_timer_;
};
~PriorityLb() override;
@ -451,7 +466,7 @@ void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
}
// Child is not READY or IDLE.
// If its failover timer is still pending, give it time to fire.
if (child->failover_timer_callback_pending()) {
if (child->FailoverTimerPending()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] priority %u, child %s: child still "
@ -501,6 +516,132 @@ void PriorityLb::SelectPriorityLocked(uint32_t priority) {
child->GetPicker());
}
//
// PriorityLb::ChildPriority::DeactivationTimer
//
PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
RefCountedPtr<PriorityLb::ChildPriority> child_priority)
: child_priority_(std::move(child_priority)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivating -- will remove in "
"%" PRId64 "ms",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get(),
kChildRetentionInterval.millis());
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval,
&on_timer_);
}
void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
if (timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
}
Unref();
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<DeactivationTimer*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->child_priority_->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked(
grpc_error_handle error) {
if (error == GRPC_ERROR_NONE && timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivation timer fired, "
"deleting child",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
child_priority_->priority_policy_->DeleteChild(child_priority_.get());
}
Unref(DEBUG_LOCATION, "Timer");
GRPC_ERROR_UNREF(error);
}
//
// PriorityLb::ChildPriority::FailoverTimer
//
PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
RefCountedPtr<PriorityLb::ChildPriority> child_priority)
: child_priority_(std::move(child_priority)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(
GPR_INFO,
"[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
"ms",
child_priority_->priority_policy_.get(), child_priority_->name_.c_str(),
child_priority_.get(),
child_priority_->priority_policy_->child_failover_timeout_.millis());
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() +
child_priority_->priority_policy_->child_failover_timeout_,
&on_timer_);
}
void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
if (timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): cancelling failover timer",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
}
Unref();
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FailoverTimer*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->child_priority_->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked(
grpc_error_handle error) {
if (error == GRPC_ERROR_NONE && timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): failover timer fired, "
"reporting TRANSIENT_FAILURE",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
child_priority_->OnConnectivityStateUpdateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
nullptr);
}
Unref(DEBUG_LOCATION, "Timer");
GRPC_ERROR_UNREF(error);
}
//
// PriorityLb::ChildPriority
//
@ -512,12 +653,8 @@ PriorityLb::ChildPriority::ChildPriority(
gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
priority_policy_.get(), name_.c_str(), this);
}
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
grpc_schedule_on_exec_ctx);
// Start the failover timer.
StartFailoverTimerLocked();
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
void PriorityLb::ChildPriority::Orphan() {
@ -525,10 +662,8 @@ void PriorityLb::ChildPriority::Orphan() {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
priority_policy_.get(), name_.c_str(), this);
}
MaybeCancelFailoverTimerLocked();
if (deactivation_timer_callback_pending_) {
grpc_timer_cancel(&deactivation_timer_);
}
failover_timer_.reset();
deactivation_timer_.reset();
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
@ -537,9 +672,6 @@ void PriorityLb::ChildPriority::Orphan() {
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
if (deactivation_timer_callback_pending_) {
grpc_timer_cancel(&deactivation_timer_);
}
Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
}
@ -600,9 +732,8 @@ PriorityLb::ChildPriority::CreateChildPolicyLocked(
}
void PriorityLb::ChildPriority::ExitIdleLocked() {
if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
!failover_timer_callback_pending_) {
StartFailoverTimerLocked();
if (connectivity_state_ == GRPC_CHANNEL_IDLE && failover_timer_ == nullptr) {
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
child_policy_->ExitIdleLocked();
}
@ -628,122 +759,21 @@ void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
// If READY or IDLE or TRANSIENT_FAILURE, cancel failover timer.
if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE ||
state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
MaybeCancelFailoverTimerLocked();
failover_timer_.reset();
}
// Notify the parent policy.
priority_policy_->HandleChildConnectivityStateChangeLocked(this);
}
void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(
GPR_INFO,
"[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
"ms",
priority_policy_.get(), name_.c_str(), this,
priority_policy_->child_failover_timeout_.millis());
}
Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
grpc_timer_init(
&failover_timer_,
ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_,
&on_failover_timer_);
failover_timer_callback_pending_ = true;
}
void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
if (failover_timer_callback_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): cancelling failover timer",
priority_policy_.get(), name_.c_str(), this);
}
grpc_timer_cancel(&failover_timer_);
failover_timer_callback_pending_ = false;
}
}
void PriorityLb::ChildPriority::OnFailoverTimer(void* arg,
grpc_error_handle error) {
ChildPriority* self = static_cast<ChildPriority*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error_handle error) {
if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
!priority_policy_->shutting_down_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): failover timer fired, "
"reporting TRANSIENT_FAILURE",
priority_policy_.get(), name_.c_str(), this);
}
failover_timer_callback_pending_ = false;
OnConnectivityStateUpdateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
nullptr);
}
Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
GRPC_ERROR_UNREF(error);
}
void PriorityLb::ChildPriority::DeactivateLocked() {
// If already deactivated, don't do it again.
if (deactivation_timer_callback_pending_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivating -- will remove in "
"%" PRId64 "ms.",
priority_policy_.get(), name_.c_str(), this,
kChildRetentionInterval.millis());
}
MaybeCancelFailoverTimerLocked();
// Start a timer to delete the child.
Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
grpc_timer_init(&deactivation_timer_,
ExecCtx::Get()->Now() + kChildRetentionInterval,
&on_deactivation_timer_);
deactivation_timer_callback_pending_ = true;
if (deactivation_timer_ != nullptr) return;
failover_timer_.reset();
deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
}
void PriorityLb::ChildPriority::MaybeReactivateLocked() {
if (deactivation_timer_callback_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
priority_policy_.get(), name_.c_str(), this);
}
deactivation_timer_callback_pending_ = false;
grpc_timer_cancel(&deactivation_timer_);
}
}
void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
grpc_error_handle error) {
ChildPriority* self = static_cast<ChildPriority*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnDeactivationTimerLocked(error); },
DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::OnDeactivationTimerLocked(
grpc_error_handle error) {
if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
!priority_policy_->shutting_down_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivation timer fired, "
"deleting child",
priority_policy_.get(), name_.c_str(), this);
}
deactivation_timer_callback_pending_ = false;
priority_policy_->DeleteChild(this);
}
Unref(DEBUG_LOCATION, "ChildPriority+timer");
GRPC_ERROR_UNREF(error);
deactivation_timer_.reset();
}
//

Loading…
Cancel
Save