|
|
|
@ -353,7 +353,8 @@ class RlsLb final : public LoadBalancingPolicy { |
|
|
|
|
// is called after releasing it.
|
|
|
|
|
//
|
|
|
|
|
// Both methods grab the data they need from the parent object.
|
|
|
|
|
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
void StartUpdate(OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
void ExitIdleLocked() { |
|
|
|
@ -397,14 +398,14 @@ class RlsLb final : public LoadBalancingPolicy { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Note: We are forced to disable lock analysis here because
|
|
|
|
|
// Orphan() is called by Unref() which is called by RefCountedPtr<>, which
|
|
|
|
|
// Orphaned() is called by Unref() which is called by RefCountedPtr<>, which
|
|
|
|
|
// cannot have lock annotations for this particular caller.
|
|
|
|
|
void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS; |
|
|
|
|
|
|
|
|
|
RefCountedPtr<RlsLb> lb_policy_; |
|
|
|
|
std::string target_; |
|
|
|
|
|
|
|
|
|
bool is_shutdown_ = false; |
|
|
|
|
bool is_shutdown_ = false; // Protected by WorkSerializer
|
|
|
|
|
|
|
|
|
|
OrphanablePtr<ChildPolicyHandler> child_policy_; |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> pending_config_; |
|
|
|
@ -503,12 +504,25 @@ class RlsLb final : public LoadBalancingPolicy { |
|
|
|
|
// Returns a list of child policy wrappers on which FinishUpdate()
|
|
|
|
|
// needs to be called after releasing the lock.
|
|
|
|
|
std::vector<ChildPolicyWrapper*> OnRlsResponseLocked( |
|
|
|
|
ResponseInfo response, std::unique_ptr<BackOff> backoff_state) |
|
|
|
|
ResponseInfo response, std::unique_ptr<BackOff> backoff_state, |
|
|
|
|
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
// Moves entry to the end of the LRU list.
|
|
|
|
|
void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
// Takes entries from child_policy_wrappers_ and appends them to the end
|
|
|
|
|
// of \a child_policy_wrappers.
|
|
|
|
|
void TakeChildPolicyWrappers( |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>>* child_policy_wrappers) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { |
|
|
|
|
child_policy_wrappers->insert( |
|
|
|
|
child_policy_wrappers->end(), |
|
|
|
|
std::make_move_iterator(child_policy_wrappers_.begin()), |
|
|
|
|
std::make_move_iterator(child_policy_wrappers_.end())); |
|
|
|
|
child_policy_wrappers_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class BackoffTimer final : public InternallyRefCounted<BackoffTimer> { |
|
|
|
|
public: |
|
|
|
@ -566,19 +580,24 @@ class RlsLb final : public LoadBalancingPolicy { |
|
|
|
|
// the caller. Otherwise, the entry found is returned to the caller. The
|
|
|
|
|
// entry returned to the user is considered recently used and its order in
|
|
|
|
|
// the LRU list of the cache is updated.
|
|
|
|
|
Entry* FindOrInsert(const RequestKey& key) |
|
|
|
|
Entry* FindOrInsert(const RequestKey& key, |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>>* |
|
|
|
|
child_policy_wrappers_to_delete) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
// Resizes the cache. If the new cache size is greater than the current size
|
|
|
|
|
// of the cache, do nothing. Otherwise, evict the oldest entries that
|
|
|
|
|
// exceed the new size limit of the cache.
|
|
|
|
|
void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
void Resize(size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>* |
|
|
|
|
child_policy_wrappers_to_delete) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
// Resets backoff of all the cache entries.
|
|
|
|
|
void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
// Shutdown the cache; clean-up and orphan all the stored cache entries.
|
|
|
|
|
void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
GRPC_MUST_USE_RESULT std::vector<RefCountedPtr<ChildPolicyWrapper>> |
|
|
|
|
Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
void ReportMetricsLocked(CallbackMetricReporter& reporter) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
@ -594,7 +613,9 @@ class RlsLb final : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// Evicts oversized cache elements when the current size is greater than
|
|
|
|
|
// the specified limit.
|
|
|
|
|
void MaybeShrinkSize(size_t bytes) |
|
|
|
|
void MaybeShrinkSize(size_t bytes, |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>>* |
|
|
|
|
child_policy_wrappers_to_delete) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_); |
|
|
|
|
|
|
|
|
|
RlsLb* lb_policy_; |
|
|
|
@ -805,11 +826,9 @@ RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::ChildPolicyWrapper::Orphaned() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_.get() |
|
|
|
|
<< "] ChildPolicyWrapper=" << this << " [" << target_ |
|
|
|
|
<< "]: shutdown"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this |
|
|
|
|
<< " [" << target_ << "]: shutdown"; |
|
|
|
|
is_shutdown_ = true; |
|
|
|
|
lb_policy_->child_policy_map_.erase(target_); |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
@ -859,7 +878,8 @@ absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field, |
|
|
|
|
return Json::FromArray(std::move(array)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::ChildPolicyWrapper::StartUpdate() { |
|
|
|
|
void RlsLb::ChildPolicyWrapper::StartUpdate( |
|
|
|
|
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) { |
|
|
|
|
ValidationErrors errors; |
|
|
|
|
auto child_policy_config = InsertOrUpdateChildPolicyField( |
|
|
|
|
lb_policy_->config_->child_policy_config_target_field_name(), target_, |
|
|
|
@ -876,15 +896,13 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() { |
|
|
|
|
*child_policy_config); |
|
|
|
|
// Returned RLS target fails the validation.
|
|
|
|
|
if (!config.ok()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_.get() |
|
|
|
|
<< "] ChildPolicyWrapper=" << this << " [" << target_ |
|
|
|
|
<< "]: config failed to parse: " << config.status(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this |
|
|
|
|
<< " [" << target_ << "]: config failed to parse: " << config.status(); |
|
|
|
|
pending_config_.reset(); |
|
|
|
|
picker_ = MakeRefCounted<TransientFailurePicker>( |
|
|
|
|
absl::UnavailableError(config.status().message())); |
|
|
|
|
child_policy_.reset(); |
|
|
|
|
*child_policy_to_delete = std::move(child_policy_); |
|
|
|
|
} else { |
|
|
|
|
pending_config_ = std::move(*config); |
|
|
|
|
} |
|
|
|
@ -913,11 +931,10 @@ absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() { |
|
|
|
|
lb_policy_->interested_parties()); |
|
|
|
|
} |
|
|
|
|
// Send the child the updated config.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_.get() |
|
|
|
|
<< "] ChildPolicyWrapper=" << this << " [" << target_ |
|
|
|
|
<< "], updating child policy handler " << child_policy_.get(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this |
|
|
|
|
<< " [" << target_ << "], updating child policy handler " |
|
|
|
|
<< child_policy_.get(); |
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
update_args.config = std::move(pending_config_); |
|
|
|
|
update_args.addresses = lb_policy_->addresses_; |
|
|
|
@ -939,9 +956,9 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( |
|
|
|
|
<< ": UpdateState(state=" << ConnectivityStateName(state) |
|
|
|
|
<< ", status=" << status << ", picker=" << picker.get() << ")"; |
|
|
|
|
} |
|
|
|
|
if (wrapper_->is_shutdown_) return; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&wrapper_->lb_policy_->mu_); |
|
|
|
|
if (wrapper_->is_shutdown_) return; |
|
|
|
|
// TODO(roth): It looks like this ignores subsequent TF updates that
|
|
|
|
|
// might change the status used to fail picks, which seems wrong.
|
|
|
|
|
if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && |
|
|
|
@ -951,7 +968,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( |
|
|
|
|
wrapper_->connectivity_state_ = state; |
|
|
|
|
DCHECK(picker != nullptr); |
|
|
|
|
if (picker != nullptr) { |
|
|
|
|
wrapper_->picker_ = std::move(picker); |
|
|
|
|
// We want to unref the picker after we release the lock.
|
|
|
|
|
wrapper_->picker_.swap(picker); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
wrapper_->lb_policy_->UpdatePickerLocked(); |
|
|
|
@ -1199,18 +1217,19 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy, |
|
|
|
|
lb_policy_->cache_.lru_list_.end(), key)) {} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::Entry::Orphan() { |
|
|
|
|
// We should be holding RlsLB::mu_.
|
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " " |
|
|
|
|
<< lru_iterator_->ToString() << ": cache entry evicted"; |
|
|
|
|
is_shutdown_ = true; |
|
|
|
|
lb_policy_->cache_.lru_list_.erase(lru_iterator_); |
|
|
|
|
lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
|
|
|
|
|
CHECK(child_policy_wrappers_.empty()); |
|
|
|
|
backoff_state_.reset(); |
|
|
|
|
if (backoff_timer_ != nullptr) { |
|
|
|
|
backoff_timer_.reset(); |
|
|
|
|
lb_policy_->UpdatePickerAsync(); |
|
|
|
|
} |
|
|
|
|
child_policy_wrappers_.clear(); |
|
|
|
|
Unref(DEBUG_LOCATION, "Orphan"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1289,7 +1308,8 @@ void RlsLb::Cache::Entry::MarkUsed() { |
|
|
|
|
|
|
|
|
|
std::vector<RlsLb::ChildPolicyWrapper*> |
|
|
|
|
RlsLb::Cache::Entry::OnRlsResponseLocked( |
|
|
|
|
ResponseInfo response, std::unique_ptr<BackOff> backoff_state) { |
|
|
|
|
ResponseInfo response, std::unique_ptr<BackOff> backoff_state, |
|
|
|
|
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) { |
|
|
|
|
// Move the entry to the end of the LRU list.
|
|
|
|
|
MarkUsed(); |
|
|
|
|
// If the request failed, store the failed status and update the
|
|
|
|
@ -1350,7 +1370,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked( |
|
|
|
|
if (it == lb_policy_->child_policy_map_.end()) { |
|
|
|
|
auto new_child = MakeRefCounted<ChildPolicyWrapper>( |
|
|
|
|
lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target); |
|
|
|
|
new_child->StartUpdate(); |
|
|
|
|
new_child->StartUpdate(child_policy_to_delete); |
|
|
|
|
child_policies_to_finish_update.push_back(new_child.get()); |
|
|
|
|
new_child_policy_wrappers.emplace_back(std::move(new_child)); |
|
|
|
|
} else { |
|
|
|
@ -1387,12 +1407,15 @@ RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) { |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { |
|
|
|
|
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert( |
|
|
|
|
const RequestKey& key, std::vector<RefCountedPtr<ChildPolicyWrapper>>* |
|
|
|
|
child_policy_wrappers_to_delete) { |
|
|
|
|
auto it = map_.find(key); |
|
|
|
|
// If not found, create new entry.
|
|
|
|
|
if (it == map_.end()) { |
|
|
|
|
size_t entry_size = EntrySizeForKey(key); |
|
|
|
|
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size)); |
|
|
|
|
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size), |
|
|
|
|
child_policy_wrappers_to_delete); |
|
|
|
|
Entry* entry = new Entry( |
|
|
|
|
lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key); |
|
|
|
|
map_.emplace(key, OrphanablePtr<Entry>(entry)); |
|
|
|
@ -1410,11 +1433,13 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::Resize(size_t bytes) { |
|
|
|
|
void RlsLb::Cache::Resize(size_t bytes, |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>>* |
|
|
|
|
child_policy_wrappers_to_delete) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes"; |
|
|
|
|
size_limit_ = bytes; |
|
|
|
|
MaybeShrinkSize(size_limit_); |
|
|
|
|
MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::ResetAllBackoff() { |
|
|
|
@ -1424,17 +1449,22 @@ void RlsLb::Cache::ResetAllBackoff() { |
|
|
|
|
lb_policy_->UpdatePickerAsync(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::Shutdown() { |
|
|
|
|
std::vector<RefCountedPtr<RlsLb::ChildPolicyWrapper>> RlsLb::Cache::Shutdown() { |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>> |
|
|
|
|
child_policy_wrappers_to_delete; |
|
|
|
|
for (auto& entry : map_) { |
|
|
|
|
entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete); |
|
|
|
|
} |
|
|
|
|
map_.clear(); |
|
|
|
|
lru_list_.clear(); |
|
|
|
|
if (cleanup_timer_handle_.has_value() && |
|
|
|
|
lb_policy_->channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*cleanup_timer_handle_)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_ << "] cache cleanup timer canceled"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] cache cleanup timer canceled"; |
|
|
|
|
} |
|
|
|
|
cleanup_timer_handle_.reset(); |
|
|
|
|
return child_policy_wrappers_to_delete; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) { |
|
|
|
@ -1468,15 +1498,17 @@ void RlsLb::Cache::StartCleanupTimer() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::OnCleanupTimer() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_ << "] cache cleanup timer fired"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] cache cleanup timer fired"; |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>> |
|
|
|
|
child_policy_wrappers_to_delete; |
|
|
|
|
MutexLock lock(&lb_policy_->mu_); |
|
|
|
|
if (!cleanup_timer_handle_.has_value()) return; |
|
|
|
|
if (lb_policy_->is_shutdown_) return; |
|
|
|
|
for (auto it = map_.begin(); it != map_.end();) { |
|
|
|
|
if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) { |
|
|
|
|
size_ -= it->second->Size(); |
|
|
|
|
it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete); |
|
|
|
|
it = map_.erase(it); |
|
|
|
|
} else { |
|
|
|
|
++it; |
|
|
|
@ -1490,7 +1522,9 @@ size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) { |
|
|
|
|
return (key.Size() * 2) + sizeof(Entry); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { |
|
|
|
|
void RlsLb::Cache::MaybeShrinkSize( |
|
|
|
|
size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>* |
|
|
|
|
child_policy_wrappers_to_delete) { |
|
|
|
|
while (size_ > bytes) { |
|
|
|
|
auto lru_it = lru_list_.begin(); |
|
|
|
|
if (GPR_UNLIKELY(lru_it == lru_list_.end())) break; |
|
|
|
@ -1501,13 +1535,12 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] LRU eviction: removing entry " |
|
|
|
|
<< map_it->second.get() << " " << lru_it->ToString(); |
|
|
|
|
size_ -= map_it->second->Size(); |
|
|
|
|
map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete); |
|
|
|
|
map_.erase(map_it); |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_ |
|
|
|
|
<< "] LRU pass complete: desired size=" << bytes |
|
|
|
|
<< " size=" << size_; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ |
|
|
|
|
<< "] LRU pass complete: desired size=" << bytes << " size=" << size_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1517,11 +1550,10 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { |
|
|
|
|
void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange( |
|
|
|
|
grpc_connectivity_state new_state, const absl::Status& status) { |
|
|
|
|
auto* lb_policy = rls_channel_->lb_policy_.get(); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy << "] RlsChannel=" << rls_channel_.get() |
|
|
|
|
<< " StateWatcher=" << this << ": state changed to " |
|
|
|
|
<< ConnectivityStateName(new_state) << " (" << status << ")"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy << "] RlsChannel=" << rls_channel_.get() |
|
|
|
|
<< " StateWatcher=" << this << ": state changed to " |
|
|
|
|
<< ConnectivityStateName(new_state) << " (" << status << ")"; |
|
|
|
|
if (rls_channel_->is_shutdown_) return; |
|
|
|
|
MutexLock lock(&lb_policy->mu_); |
|
|
|
|
if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) { |
|
|
|
@ -1614,11 +1646,10 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy) |
|
|
|
|
channel_.reset(Channel::FromC( |
|
|
|
|
grpc_channel_create(lb_policy_->config_->lookup_service().c_str(), |
|
|
|
|
creds.get(), args.ToC().get()))); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this |
|
|
|
|
<< ": created channel " << channel_.get() << " for " |
|
|
|
|
<< lb_policy_->config_->lookup_service(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this |
|
|
|
|
<< ": created channel " << channel_.get() << " for " |
|
|
|
|
<< lb_policy_->config_->lookup_service(); |
|
|
|
|
if (channel_ != nullptr) { |
|
|
|
|
// Set up channelz linkage.
|
|
|
|
|
channelz::ChannelNode* child_channelz_node = channel_->channelz_node(); |
|
|
|
@ -1821,19 +1852,22 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) { |
|
|
|
|
grpc_call_unref(call_); |
|
|
|
|
call_ = nullptr; |
|
|
|
|
// Return result to cache.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << lb_policy_.get() << "] rls_request=" << this |
|
|
|
|
<< " " << key_.ToString() |
|
|
|
|
<< ": response info: " << response.ToString(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " " |
|
|
|
|
<< key_.ToString() << ": response info: " << response.ToString(); |
|
|
|
|
std::vector<ChildPolicyWrapper*> child_policies_to_finish_update; |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>> |
|
|
|
|
child_policy_wrappers_to_delete; |
|
|
|
|
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&lb_policy_->mu_); |
|
|
|
|
if (lb_policy_->is_shutdown_) return; |
|
|
|
|
rls_channel_->ReportResponseLocked(response.status.ok()); |
|
|
|
|
Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_); |
|
|
|
|
Cache::Entry* cache_entry = |
|
|
|
|
lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete); |
|
|
|
|
child_policies_to_finish_update = cache_entry->OnRlsResponseLocked( |
|
|
|
|
std::move(response), std::move(backoff_state_)); |
|
|
|
|
std::move(response), std::move(backoff_state_), |
|
|
|
|
&child_policy_to_delete); |
|
|
|
|
lb_policy_->request_map_.erase(key_); |
|
|
|
|
} |
|
|
|
|
// Now that we've released the lock, finish the update on any newly
|
|
|
|
@ -1932,19 +1966,8 @@ RlsLb::RlsLb(Args args) |
|
|
|
|
instance_uuid_(channel_args() |
|
|
|
|
.GetOwnedString(GRPC_ARG_TEST_ONLY_RLS_INSTANCE_ID) |
|
|
|
|
.value_or(GenerateUUID())), |
|
|
|
|
cache_(this), |
|
|
|
|
registered_metric_callback_( |
|
|
|
|
channel_control_helper()->GetStatsPluginGroup().RegisterCallback( |
|
|
|
|
[rls_lb = RefAsSubclass<RlsLb>(DEBUG_LOCATION, |
|
|
|
|
"RlsLB Metric Callback")]( |
|
|
|
|
CallbackMetricReporter& reporter) { |
|
|
|
|
MutexLock lock(&rls_lb->mu_); |
|
|
|
|
rls_lb->cache_.ReportMetricsLocked(reporter); |
|
|
|
|
}, |
|
|
|
|
Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] policy created"; |
|
|
|
|
} |
|
|
|
|
cache_(this) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool EndpointsEqual( |
|
|
|
@ -1969,9 +1992,7 @@ bool EndpointsEqual( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] policy updated"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy updated"; |
|
|
|
|
update_in_progress_ = true; |
|
|
|
|
// Swap out config.
|
|
|
|
|
RefCountedPtr<RlsLbConfig> old_config = std::move(config_); |
|
|
|
@ -2004,16 +2025,14 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
if (old_config == nullptr || |
|
|
|
|
config_->default_target() != old_config->default_target()) { |
|
|
|
|
if (config_->default_target().empty()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] unsetting default target"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] unsetting default target"; |
|
|
|
|
default_child_policy_.reset(); |
|
|
|
|
} else { |
|
|
|
|
auto it = child_policy_map_.find(config_->default_target()); |
|
|
|
|
if (it == child_policy_map_.end()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] creating new default target"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] creating new default target"; |
|
|
|
|
default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>( |
|
|
|
|
RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"), |
|
|
|
|
config_->default_target()); |
|
|
|
@ -2027,6 +2046,9 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Now grab the lock to swap out the state it guards.
|
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>> |
|
|
|
|
child_policy_wrappers_to_delete; |
|
|
|
|
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
// Swap out RLS channel if needed.
|
|
|
|
@ -2038,28 +2060,27 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
// Resize cache if needed.
|
|
|
|
|
if (old_config == nullptr || |
|
|
|
|
config_->cache_size_bytes() != old_config->cache_size_bytes()) { |
|
|
|
|
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes())); |
|
|
|
|
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()), |
|
|
|
|
&child_policy_wrappers_to_delete); |
|
|
|
|
} |
|
|
|
|
// Start update of child policies if needed.
|
|
|
|
|
if (update_child_policies) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] starting child policy updates"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] starting child policy updates"; |
|
|
|
|
for (auto& p : child_policy_map_) { |
|
|
|
|
p.second->StartUpdate(); |
|
|
|
|
p.second->StartUpdate(&child_policy_to_delete); |
|
|
|
|
} |
|
|
|
|
} else if (created_default_child) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] starting default child policy update"; |
|
|
|
|
default_child_policy_->StartUpdate(); |
|
|
|
|
default_child_policy_->StartUpdate(&child_policy_to_delete); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Now that we've released the lock, finish update of child policies.
|
|
|
|
|
std::vector<std::string> errors; |
|
|
|
|
if (update_child_policies) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] finishing child policy updates"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] finishing child policy updates"; |
|
|
|
|
for (auto& p : child_policy_map_) { |
|
|
|
|
absl::Status status = p.second->MaybeFinishUpdate(); |
|
|
|
|
if (!status.ok()) { |
|
|
|
@ -2077,6 +2098,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
update_in_progress_ = false; |
|
|
|
|
// On the initial update only, we set the gauge metric callback. We
|
|
|
|
|
// can't do this before the initial update, because otherwise the
|
|
|
|
|
// callback could be invoked before we've set state that we need for
|
|
|
|
|
// the label values (e.g., we'd add metrics with empty string for the
|
|
|
|
|
// RLS server name).
|
|
|
|
|
if (registered_metric_callback_ == nullptr) { |
|
|
|
|
registered_metric_callback_ = |
|
|
|
|
channel_control_helper()->GetStatsPluginGroup().RegisterCallback( |
|
|
|
|
[this](CallbackMetricReporter& reporter) { |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
cache_.ReportMetricsLocked(reporter); |
|
|
|
|
}, |
|
|
|
|
Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries); |
|
|
|
|
} |
|
|
|
|
// In principle, we need to update the picker here only if the config
|
|
|
|
|
// fields used by the picker have changed. However, it seems fragile
|
|
|
|
|
// to check individual fields, since the picker logic could change in
|
|
|
|
@ -2111,18 +2146,22 @@ void RlsLb::ResetBackoffLocked() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::ShutdownLocked() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] policy shutdown"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown"; |
|
|
|
|
registered_metric_callback_.reset(); |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
is_shutdown_ = true; |
|
|
|
|
config_.reset(DEBUG_LOCATION, "ShutdownLocked"); |
|
|
|
|
RefCountedPtr<ChildPolicyWrapper> child_policy_to_delete; |
|
|
|
|
std::vector<RefCountedPtr<ChildPolicyWrapper>> |
|
|
|
|
child_policy_wrappers_to_delete; |
|
|
|
|
OrphanablePtr<RlsChannel> rls_channel_to_delete; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
is_shutdown_ = true; |
|
|
|
|
config_.reset(DEBUG_LOCATION, "ShutdownLocked"); |
|
|
|
|
child_policy_wrappers_to_delete = cache_.Shutdown(); |
|
|
|
|
request_map_.clear(); |
|
|
|
|
rls_channel_to_delete = std::move(rls_channel_); |
|
|
|
|
child_policy_to_delete = std::move(default_child_policy_); |
|
|
|
|
} |
|
|
|
|
channel_args_ = ChannelArgs(); |
|
|
|
|
cache_.Shutdown(); |
|
|
|
|
request_map_.clear(); |
|
|
|
|
rls_channel_.reset(); |
|
|
|
|
default_child_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::UpdatePickerAsync() { |
|
|
|
@ -2155,9 +2194,7 @@ void RlsLb::UpdatePickerLocked() { |
|
|
|
|
// all children. This avoids unnecessary picker churn while an update
|
|
|
|
|
// is being propagated to our children.
|
|
|
|
|
if (update_in_progress_) return; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) { |
|
|
|
|
LOG(INFO) << "[rlslb " << this << "] updating picker"; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] updating picker"; |
|
|
|
|
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; |
|
|
|
|
if (!child_policy_map_.empty()) { |
|
|
|
|
state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
|
|
|
|