|
|
|
@ -1040,8 +1040,8 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) { |
|
|
|
|
lb_policy_->channel_control_helper()->GetAuthority(), |
|
|
|
|
args.initial_metadata)}; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this < < < < |
|
|
|
|
": request keys: " << key.ToString(); |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this |
|
|
|
|
<< ": request keys: " << key.ToString(); |
|
|
|
|
Timestamp now = Timestamp::Now(); |
|
|
|
|
MutexLock lock(&lb_policy_->mu_); |
|
|
|
|
if (lb_policy_->is_shutdown_) { |
|
|
|
@ -1077,8 +1077,8 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) { |
|
|
|
|
// If the entry has non-expired data, use it.
|
|
|
|
|
if (entry->data_expiration_time() >= now) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this < < < < |
|
|
|
|
": using cache entry " << entry; |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this |
|
|
|
|
<< ": using cache entry " << entry; |
|
|
|
|
return entry->Pick(args); |
|
|
|
|
} |
|
|
|
|
// If the entry is in backoff, then use the default target if set,
|
|
|
|
@ -1092,25 +1092,25 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::Pick(PickArgs args) { |
|
|
|
|
} |
|
|
|
|
// RLS call pending. Queue the pick.
|
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this < < < < |
|
|
|
|
": RLS request pending; queuing pick"; |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this |
|
|
|
|
<< ": RLS request pending; queuing pick"; |
|
|
|
|
return PickResult::Queue(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail( |
|
|
|
|
const char* reason, PickArgs args, absl::Status status) { |
|
|
|
|
if (default_child_policy_ != nullptr) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_.get() |
|
|
|
|
<< "] picker=" << this << ": " < < < < |
|
|
|
|
reason << "; using default target"; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this << ": " |
|
|
|
|
<< reason << "; using default target"; |
|
|
|
|
auto pick_result = default_child_policy_->Pick(args); |
|
|
|
|
lb_policy_->MaybeExportPickCount(kMetricDefaultTargetPicks, |
|
|
|
|
config_->default_target(), pick_result); |
|
|
|
|
return pick_result; |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_.get() |
|
|
|
|
<< "] picker=" << this << ": " < < < < |
|
|
|
|
reason << "; failing pick"; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] picker=" << this << ": " << reason |
|
|
|
|
<< "; failing pick"; |
|
|
|
|
auto& stats_plugins = |
|
|
|
|
lb_policy_->channel_control_helper()->GetStatsPluginGroup(); |
|
|
|
|
stats_plugins.AddCounter(kMetricFailedPicks, 1, |
|
|
|
@ -1200,8 +1200,8 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy, |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::Entry::Orphan() { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] cache entry=" << this < < < < |
|
|
|
|
" " << lru_iterator_->ToString() << ": cache entry evicted"; |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " " |
|
|
|
|
<< lru_iterator_->ToString() << ": cache entry evicted"; |
|
|
|
|
is_shutdown_ = true; |
|
|
|
|
lb_policy_->cache_.lru_list_.erase(lru_iterator_); |
|
|
|
|
lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
|
|
|
|
@ -1398,22 +1398,21 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) { |
|
|
|
|
map_.emplace(key, OrphanablePtr<Entry>(entry)); |
|
|
|
|
size_ += entry_size; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] key=" << key.ToString() < < < < |
|
|
|
|
": cache entry added, entry=" << entry; |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] key=" << key.ToString() |
|
|
|
|
<< ": cache entry added, entry=" << entry; |
|
|
|
|
return entry; |
|
|
|
|
} |
|
|
|
|
// Entry found, so use it.
|
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] key=" << key.ToString() < < < < |
|
|
|
|
": found cache entry " << it->second.get(); |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] key=" << key.ToString() |
|
|
|
|
<< ": found cache entry " << it->second.get(); |
|
|
|
|
it->second->MarkUsed(); |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RlsLb::Cache::Resize(size_t bytes) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] resizing cache to " << bytes < < < < |
|
|
|
|
" bytes"; |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes"; |
|
|
|
|
size_limit_ = bytes; |
|
|
|
|
MaybeShrinkSize(size_limit_); |
|
|
|
|
} |
|
|
|
@ -1498,9 +1497,9 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) { |
|
|
|
|
auto map_it = map_.find(*lru_it); |
|
|
|
|
CHECK(map_it != map_.end()); |
|
|
|
|
if (!map_it->second->CanEvict()) break; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << lb_policy_ |
|
|
|
|
<< "] LRU eviction: removing entry " < < < < |
|
|
|
|
map_it->second.get() << " " << lru_it->ToString(); |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_ << "] LRU eviction: removing entry " |
|
|
|
|
<< map_it->second.get() << " " << lru_it->ToString(); |
|
|
|
|
size_ -= map_it->second->Size(); |
|
|
|
|
map_.erase(map_it); |
|
|
|
|
} |
|
|
|
@ -1639,8 +1638,8 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy) |
|
|
|
|
|
|
|
|
|
void RlsLb::RlsChannel::Orphan() { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this < < < < |
|
|
|
|
", channel=" << channel_.get() << ": shutdown"; |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this |
|
|
|
|
<< ", channel=" << channel_.get() << ": shutdown"; |
|
|
|
|
is_shutdown_ = true; |
|
|
|
|
if (channel_ != nullptr) { |
|
|
|
|
// Remove channelz linkage.
|
|
|
|
@ -1705,8 +1704,8 @@ RlsLb::RlsRequest::RlsRequest( |
|
|
|
|
reason_(reason), |
|
|
|
|
stale_header_data_(std::move(stale_header_data)) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this < < < < |
|
|
|
|
": RLS request created for key " << key_.ToString(); |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this |
|
|
|
|
<< ": RLS request created for key " << key_.ToString(); |
|
|
|
|
GRPC_CLOSURE_INIT(&call_complete_cb_, OnRlsCallComplete, this, nullptr); |
|
|
|
|
ExecCtx::Run( |
|
|
|
|
DEBUG_LOCATION, |
|
|
|
@ -1720,8 +1719,8 @@ RlsLb::RlsRequest::~RlsRequest() { CHECK_EQ(call_, nullptr); } |
|
|
|
|
void RlsLb::RlsRequest::Orphan() { |
|
|
|
|
if (call_ != nullptr) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this < < < < |
|
|
|
|
" " << key_.ToString() << ": cancelling RLS call"; |
|
|
|
|
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " " |
|
|
|
|
<< key_.ToString() << ": cancelling RLS call"; |
|
|
|
|
grpc_call_cancel_internal(call_); |
|
|
|
|
} |
|
|
|
|
Unref(DEBUG_LOCATION, "Orphan"); |
|
|
|
@ -2018,8 +2017,8 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
config_->default_target()); |
|
|
|
|
created_default_child = true; |
|
|
|
|
} else { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this < < < < |
|
|
|
|
"] using existing child for default target"; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] using existing child for default target"; |
|
|
|
|
default_child_policy_ = |
|
|
|
|
it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy"); |
|
|
|
|
} |
|
|
|
@ -2048,8 +2047,8 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
p.second->StartUpdate(); |
|
|
|
|
} |
|
|
|
|
} else if (created_default_child) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this < < < < |
|
|
|
|
"] starting default child policy update"; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] starting default child policy update"; |
|
|
|
|
default_child_policy_->StartUpdate(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -2067,8 +2066,8 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else if (created_default_child) { |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this < < < < |
|
|
|
|
"] finishing default child policy update"; |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] finishing default child policy update"; |
|
|
|
|
absl::Status status = default_child_policy_->MaybeFinishUpdate(); |
|
|
|
|
if (!status.ok()) { |
|
|
|
|
errors.emplace_back(absl::StrCat("target ", config_->default_target(), |
|
|
|
@ -2167,9 +2166,9 @@ void RlsLb::UpdatePickerLocked() { |
|
|
|
|
if (is_shutdown_) return; |
|
|
|
|
for (auto& p : child_policy_map_) { |
|
|
|
|
grpc_connectivity_state child_state = p.second->connectivity_state(); |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] target " |
|
|
|
|
<< p.second->target() < < < < |
|
|
|
|
" in state " << ConnectivityStateName(child_state); |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] target " << p.second->target() |
|
|
|
|
<< " in state " << ConnectivityStateName(child_state); |
|
|
|
|
if (child_state == GRPC_CHANNEL_READY) { |
|
|
|
|
state = GRPC_CHANNEL_READY; |
|
|
|
|
break; |
|
|
|
@ -2188,9 +2187,8 @@ void RlsLb::UpdatePickerLocked() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) |
|
|
|
|
<< "[rlslb " << this << "] reporting state " < < < < |
|
|
|
|
ConnectivityStateName(state); |
|
|
|
|
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] reporting state " |
|
|
|
|
<< ConnectivityStateName(state); |
|
|
|
|
absl::Status status; |
|
|
|
|
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
|
|
|
|
status = absl::UnavailableError("no children available"); |
|
|
|
|