weighted_target_lb: avoid possibility of rescheduling a timer before it fires (#29203)

* weighted_target_lb: avoid possibility of rescheduling a timer before it fires

* don't check shutdown_ or weight_ in timer callback

* fix memory leak
pull/29226/head
Mark D. Roth 3 years ago committed by GitHub
parent 70fa55155b
commit 6206700787
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 113
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc

@ -158,6 +158,23 @@ class WeightedTargetLb : public LoadBalancingPolicy {
RefCountedPtr<WeightedChild> weighted_child_; RefCountedPtr<WeightedChild> weighted_child_;
}; };
class DelayedRemovalTimer
: public InternallyRefCounted<DelayedRemovalTimer> {
public:
explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle error);
RefCountedPtr<WeightedChild> weighted_child_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
};
// Methods for dealing with the child policy. // Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args); const grpc_channel_args* args);
@ -166,9 +183,6 @@ class WeightedTargetLb : public LoadBalancingPolicy {
grpc_connectivity_state state, const absl::Status& status, grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker); std::unique_ptr<SubchannelPicker> picker);
static void OnDelayedRemovalTimer(void* arg, grpc_error_handle error);
void OnDelayedRemovalTimerLocked(grpc_error_handle error);
// The owning LB policy. // The owning LB policy.
RefCountedPtr<WeightedTargetLb> weighted_target_policy_; RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
@ -182,11 +196,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
bool seen_failure_since_ready_ = false; bool seen_failure_since_ready_ = false;
// States for delayed removal. OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
bool shutdown_ = false;
}; };
~WeightedTargetLb() override; ~WeightedTargetLb() override;
@ -401,6 +411,53 @@ void WeightedTargetLb::UpdateStateLocked() {
std::move(picker)); std::move(picker));
} }
//
// WeightedTargetLb::WeightedChild::DelayedRemovalTimer
//
WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
: weighted_child_(std::move(weighted_child)) {
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref().release();
grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval,
&on_timer_);
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
if (timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: cancelling "
"delayed removal timer",
weighted_child_->weighted_target_policy_.get(),
weighted_child_.get(), weighted_child_->name_.c_str());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
}
Unref();
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<DelayedRemovalTimer*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->weighted_child_->weighted_target_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked(
grpc_error_handle error) {
if (error == GRPC_ERROR_NONE && timer_pending_) {
timer_pending_ = false;
weighted_child_->weighted_target_policy_->targets_.erase(
weighted_child_->name_);
}
GRPC_ERROR_UNREF(error);
Unref();
}
// //
// WeightedTargetLb::WeightedChild // WeightedTargetLb::WeightedChild
// //
@ -413,8 +470,6 @@ WeightedTargetLb::WeightedChild::WeightedChild(
gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s", gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
weighted_target_policy_.get(), this, name_.c_str()); weighted_target_policy_.get(), this, name_.c_str());
} }
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
} }
WeightedTargetLb::WeightedChild::~WeightedChild() { WeightedTargetLb::WeightedChild::~WeightedChild() {
@ -441,11 +496,7 @@ void WeightedTargetLb::WeightedChild::Orphan() {
// Drop our ref to the child's picker, in case it's holding a ref to // Drop our ref to the child's picker, in case it's holding a ref to
// the child. // the child.
picker_wrapper_.reset(); picker_wrapper_.reset();
if (delayed_removal_timer_callback_pending_) { delayed_removal_timer_.reset();
delayed_removal_timer_callback_pending_ = false;
grpc_timer_cancel(&delayed_removal_timer_);
}
shutdown_ = true;
Unref(); Unref();
} }
@ -484,14 +535,13 @@ void WeightedTargetLb::WeightedChild::UpdateLocked(
// Update child weight. // Update child weight.
weight_ = config.weight; weight_ = config.weight;
// Reactivate if needed. // Reactivate if needed.
if (delayed_removal_timer_callback_pending_) { if (delayed_removal_timer_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: reactivating", "[weighted_target_lb %p] WeightedChild %p %s: reactivating",
weighted_target_policy_.get(), this, name_.c_str()); weighted_target_policy_.get(), this, name_.c_str());
} }
delayed_removal_timer_callback_pending_ = false; delayed_removal_timer_.reset();
grpc_timer_cancel(&delayed_removal_timer_);
} }
// Create child policy if needed. // Create child policy if needed.
if (child_policy_ == nullptr) { if (child_policy_ == nullptr) {
@ -561,31 +611,8 @@ void WeightedTargetLb::WeightedChild::DeactivateLocked() {
// Set the child weight to 0 so that future picker won't contain this child. // Set the child weight to 0 so that future picker won't contain this child.
weight_ = 0; weight_ = 0;
// Start a timer to delete the child. // Start a timer to delete the child.
Ref(DEBUG_LOCATION, "WeightedChild+timer").release(); delayed_removal_timer_ = MakeOrphanable<DelayedRemovalTimer>(
delayed_removal_timer_callback_pending_ = true; Ref(DEBUG_LOCATION, "DelayedRemovalTimer"));
grpc_timer_init(&delayed_removal_timer_,
ExecCtx::Get()->Now() + kChildRetentionInterval,
&on_delayed_removal_timer_);
}
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(
void* arg, grpc_error_handle error) {
WeightedChild* self = static_cast<WeightedChild*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->weighted_target_policy_->work_serializer()->Run(
[self, error]() { self->OnDelayedRemovalTimerLocked(error); },
DEBUG_LOCATION);
}
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked(
grpc_error_handle error) {
if (error == GRPC_ERROR_NONE && delayed_removal_timer_callback_pending_ &&
!shutdown_ && weight_ == 0) {
delayed_removal_timer_callback_pending_ = false;
weighted_target_policy_->targets_.erase(name_);
}
Unref(DEBUG_LOCATION, "WeightedChild+timer");
GRPC_ERROR_UNREF(error);
} }
// //

Loading…
Cancel
Save