|
|
|
@ -122,7 +122,6 @@ |
|
|
|
|
#include "src/core/lib/iomgr/resolved_address.h" |
|
|
|
|
#include "src/core/lib/iomgr/sockaddr.h" |
|
|
|
|
#include "src/core/lib/iomgr/socket_utils.h" |
|
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
|
#include "src/core/lib/json/json.h" |
|
|
|
|
#include "src/core/lib/json/json_args.h" |
|
|
|
|
#include "src/core/lib/json/json_object_loader.h" |
|
|
|
@ -495,7 +494,8 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
"entering fallback mode", |
|
|
|
|
parent_.get(), status.ToString().c_str()); |
|
|
|
|
parent_->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&parent_->lb_fallback_timer_); |
|
|
|
|
parent_->channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*parent_->lb_fallback_timer_handle_); |
|
|
|
|
parent_->fallback_mode_ = true; |
|
|
|
|
parent_->CreateOrUpdateChildPolicyLocked(); |
|
|
|
|
// Cancel the watch, since we don't care about the channel state once we
|
|
|
|
@ -516,14 +516,12 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
// Methods for dealing with fallback state.
|
|
|
|
|
void MaybeEnterFallbackModeAfterStartup(); |
|
|
|
|
static void OnFallbackTimer(void* arg, grpc_error_handle error); |
|
|
|
|
void OnFallbackTimerLocked(grpc_error_handle error); |
|
|
|
|
void OnFallbackTimerLocked(); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the balancer call.
|
|
|
|
|
void StartBalancerCallLocked(); |
|
|
|
|
void StartBalancerCallRetryTimerLocked(); |
|
|
|
|
static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error); |
|
|
|
|
void OnBalancerCallRetryTimerLocked(grpc_error_handle error); |
|
|
|
|
void OnBalancerCallRetryTimerLocked(); |
|
|
|
|
|
|
|
|
|
// Methods for dealing with the child policy.
|
|
|
|
|
ChannelArgs CreateChildPolicyArgsLocked( |
|
|
|
@ -536,8 +534,7 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
void CacheDeletedSubchannelLocked( |
|
|
|
|
RefCountedPtr<SubchannelInterface> subchannel); |
|
|
|
|
void StartSubchannelCacheTimerLocked(); |
|
|
|
|
static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error); |
|
|
|
|
void OnSubchannelCacheTimerLocked(grpc_error_handle error); |
|
|
|
|
void OnSubchannelCacheTimerLocked(); |
|
|
|
|
|
|
|
|
|
// Who the client is trying to communicate with.
|
|
|
|
|
std::string server_name_; |
|
|
|
@ -568,9 +565,7 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
const Duration lb_call_timeout_; |
|
|
|
|
// Balancer call retry state.
|
|
|
|
|
BackOff lb_call_backoff_; |
|
|
|
|
bool retry_timer_callback_pending_ = false; |
|
|
|
|
grpc_timer lb_call_retry_timer_; |
|
|
|
|
grpc_closure lb_on_call_retry_; |
|
|
|
|
absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_; |
|
|
|
|
|
|
|
|
|
// The deserialized response from the balancer. May be nullptr until one
|
|
|
|
|
// such response has arrived.
|
|
|
|
@ -588,8 +583,7 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
// we have not received a serverlist from the balancer.
|
|
|
|
|
const Duration fallback_at_startup_timeout_; |
|
|
|
|
bool fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer lb_fallback_timer_; |
|
|
|
|
grpc_closure lb_on_fallback_; |
|
|
|
|
absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_; |
|
|
|
|
|
|
|
|
|
// The child policy to use for the backends.
|
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
@ -601,9 +595,7 @@ class GrpcLb : public LoadBalancingPolicy { |
|
|
|
|
std::map<Timestamp /*deletion time*/, |
|
|
|
|
std::vector<RefCountedPtr<SubchannelInterface>>> |
|
|
|
|
cached_subchannels_; |
|
|
|
|
grpc_timer subchannel_cache_timer_; |
|
|
|
|
grpc_closure on_subchannel_cache_timer_; |
|
|
|
|
bool subchannel_cache_timer_pending_ = false; |
|
|
|
|
absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1263,7 +1255,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { |
|
|
|
|
} |
|
|
|
|
if (grpclb_policy()->fallback_at_startup_checks_pending_) { |
|
|
|
|
grpclb_policy()->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_); |
|
|
|
|
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*grpclb_policy()->lb_fallback_timer_handle_); |
|
|
|
|
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked(); |
|
|
|
|
} |
|
|
|
|
// Update the serverlist in the GrpcLb instance. This serverlist
|
|
|
|
@ -1281,7 +1274,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { |
|
|
|
|
grpclb_policy()); |
|
|
|
|
if (grpclb_policy()->fallback_at_startup_checks_pending_) { |
|
|
|
|
grpclb_policy()->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_); |
|
|
|
|
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*grpclb_policy()->lb_fallback_timer_handle_); |
|
|
|
|
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked(); |
|
|
|
|
} |
|
|
|
|
grpclb_policy()->fallback_mode_ = true; |
|
|
|
@ -1348,7 +1342,8 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( |
|
|
|
|
"serverlist; entering fallback mode", |
|
|
|
|
grpclb_policy()); |
|
|
|
|
grpclb_policy()->fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_); |
|
|
|
|
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*grpclb_policy()->lb_fallback_timer_handle_); |
|
|
|
|
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked(); |
|
|
|
|
grpclb_policy()->fallback_mode_ = true; |
|
|
|
|
grpclb_policy()->CreateOrUpdateChildPolicyLocked(); |
|
|
|
@ -1500,29 +1495,25 @@ GrpcLb::GrpcLb(Args args) |
|
|
|
|
"[grpclb %p] Will use '%s' as the server name for LB request.", |
|
|
|
|
this, server_name_.c_str()); |
|
|
|
|
} |
|
|
|
|
// Closure Initialization
|
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this, |
|
|
|
|
nullptr); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::ShutdownLocked() { |
|
|
|
|
shutting_down_ = true; |
|
|
|
|
lb_calld_.reset(); |
|
|
|
|
if (subchannel_cache_timer_pending_) { |
|
|
|
|
subchannel_cache_timer_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&subchannel_cache_timer_); |
|
|
|
|
if (subchannel_cache_timer_handle_.has_value()) { |
|
|
|
|
channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*subchannel_cache_timer_handle_); |
|
|
|
|
subchannel_cache_timer_handle_.reset(); |
|
|
|
|
} |
|
|
|
|
cached_subchannels_.clear(); |
|
|
|
|
if (retry_timer_callback_pending_) { |
|
|
|
|
grpc_timer_cancel(&lb_call_retry_timer_); |
|
|
|
|
if (lb_call_retry_timer_handle_.has_value()) { |
|
|
|
|
channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*lb_call_retry_timer_handle_); |
|
|
|
|
} |
|
|
|
|
if (fallback_at_startup_checks_pending_) { |
|
|
|
|
fallback_at_startup_checks_pending_ = false; |
|
|
|
|
grpc_timer_cancel(&lb_fallback_timer_); |
|
|
|
|
channel_control_helper()->GetEventEngine()->Cancel( |
|
|
|
|
*lb_fallback_timer_handle_); |
|
|
|
|
CancelBalancerChannelConnectivityWatchLocked(); |
|
|
|
|
} |
|
|
|
|
if (child_policy_ != nullptr) { |
|
|
|
@ -1583,9 +1574,18 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
if (is_initial_update) { |
|
|
|
|
fallback_at_startup_checks_pending_ = true; |
|
|
|
|
// Start timer.
|
|
|
|
|
Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_; |
|
|
|
|
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
|
|
|
|
|
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); |
|
|
|
|
lb_fallback_timer_handle_ = |
|
|
|
|
channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
|
fallback_at_startup_timeout_, |
|
|
|
|
[self = static_cast<RefCountedPtr<GrpcLb>>( |
|
|
|
|
Ref(DEBUG_LOCATION, "on_fallback_timer"))]() mutable { |
|
|
|
|
ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
auto self_ptr = self.get(); |
|
|
|
|
self_ptr->work_serializer()->Run( |
|
|
|
|
[self = std::move(self)]() { self->OnFallbackTimerLocked(); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
}); |
|
|
|
|
// Start watching the channel's connectivity state. If the channel
|
|
|
|
|
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
|
|
|
|
|
// fallback mode even if the fallback timeout has not elapsed.
|
|
|
|
@ -1675,10 +1675,9 @@ void GrpcLb::StartBalancerCallLocked() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::StartBalancerCallRetryTimerLocked() { |
|
|
|
|
Timestamp next_try = lb_call_backoff_.NextAttemptTime(); |
|
|
|
|
Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now(); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); |
|
|
|
|
Duration timeout = next_try - Timestamp::Now(); |
|
|
|
|
if (timeout > Duration::Zero()) { |
|
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.", |
|
|
|
|
this, timeout.millis()); |
|
|
|
@ -1687,33 +1686,30 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() { |
|
|
|
|
this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// TODO(roth): We currently track this ref manually. Once the
|
|
|
|
|
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
|
|
|
|
|
// with the callback.
|
|
|
|
|
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
|
|
|
|
self.release(); |
|
|
|
|
retry_timer_callback_pending_ = true; |
|
|
|
|
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) { |
|
|
|
|
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
|
|
|
|
grpclb_policy->work_serializer()->Run( |
|
|
|
|
[grpclb_policy, error]() { |
|
|
|
|
grpclb_policy->OnBalancerCallRetryTimerLocked(error); |
|
|
|
|
lb_call_retry_timer_handle_ = |
|
|
|
|
channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
|
timeout, |
|
|
|
|
[self = static_cast<RefCountedPtr<GrpcLb>>( |
|
|
|
|
Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"))]() mutable { |
|
|
|
|
ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
auto self_ptr = self.get(); |
|
|
|
|
self_ptr->work_serializer()->Run( |
|
|
|
|
[self = std::move(self)]() { |
|
|
|
|
self->OnBalancerCallRetryTimerLocked(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) { |
|
|
|
|
retry_timer_callback_pending_ = false; |
|
|
|
|
if (!shutting_down_ && error.ok() && lb_calld_ == nullptr) { |
|
|
|
|
void GrpcLb::OnBalancerCallRetryTimerLocked() { |
|
|
|
|
lb_call_retry_timer_handle_.reset(); |
|
|
|
|
if (!shutting_down_ && lb_calld_ == nullptr) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this); |
|
|
|
|
} |
|
|
|
|
StartBalancerCallLocked(); |
|
|
|
|
} |
|
|
|
|
Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1738,17 +1734,10 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) { |
|
|
|
|
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); |
|
|
|
|
grpclb_policy->work_serializer()->Run( |
|
|
|
|
[grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) { |
|
|
|
|
void GrpcLb::OnFallbackTimerLocked() { |
|
|
|
|
// If we receive a serverlist after the timer fires but before this callback
|
|
|
|
|
// actually runs, don't fall back.
|
|
|
|
|
if (fallback_at_startup_checks_pending_ && !shutting_down_ && error.ok()) { |
|
|
|
|
if (fallback_at_startup_checks_pending_ && !shutting_down_) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[grpclb %p] No response from balancer after fallback timeout; " |
|
|
|
|
"entering fallback mode", |
|
|
|
@ -1758,7 +1747,6 @@ void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) { |
|
|
|
|
fallback_mode_ = true; |
|
|
|
|
CreateOrUpdateChildPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
Unref(DEBUG_LOCATION, "on_fallback_timer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1781,7 +1769,8 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked( |
|
|
|
|
LoadBalancingPolicy::Args lb_policy_args; |
|
|
|
|
lb_policy_args.work_serializer = work_serializer(); |
|
|
|
|
lb_policy_args.args = args; |
|
|
|
|
lb_policy_args.channel_control_helper = std::make_unique<Helper>(Ref()); |
|
|
|
|
lb_policy_args.channel_control_helper = |
|
|
|
|
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
|
|
|
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
|
|
|
|
&grpc_lb_glb_trace); |
|
|
|
@ -1804,9 +1793,10 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { |
|
|
|
|
bool is_backend_from_grpclb_load_balancer = false; |
|
|
|
|
if (fallback_mode_) { |
|
|
|
|
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
|
|
|
|
|
// received any serverlist from the balancer, we use the fallback backends
|
|
|
|
|
// returned by the resolver. Note that the fallback backend list may be
|
|
|
|
|
// empty, in which case the new child policy will fail the picks.
|
|
|
|
|
// received any serverlist from the balancer, we use the fallback
|
|
|
|
|
// backends returned by the resolver. Note that the fallback backend
|
|
|
|
|
// list may be empty, in which case the new child policy will fail the
|
|
|
|
|
// picks.
|
|
|
|
|
update_args.addresses = fallback_backend_addresses_; |
|
|
|
|
if (fallback_backend_addresses_.ok() && |
|
|
|
|
fallback_backend_addresses_->empty()) { |
|
|
|
@ -1845,28 +1835,32 @@ void GrpcLb::CacheDeletedSubchannelLocked( |
|
|
|
|
RefCountedPtr<SubchannelInterface> subchannel) { |
|
|
|
|
Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_; |
|
|
|
|
cached_subchannels_[deletion_time].push_back(std::move(subchannel)); |
|
|
|
|
if (!subchannel_cache_timer_pending_) { |
|
|
|
|
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release(); |
|
|
|
|
subchannel_cache_timer_pending_ = true; |
|
|
|
|
if (!subchannel_cache_timer_handle_.has_value()) { |
|
|
|
|
StartSubchannelCacheTimerLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::StartSubchannelCacheTimerLocked() { |
|
|
|
|
GPR_ASSERT(!cached_subchannels_.empty()); |
|
|
|
|
grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first, |
|
|
|
|
&on_subchannel_cache_timer_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) { |
|
|
|
|
auto* self = static_cast<GrpcLb*>(arg); |
|
|
|
|
self->work_serializer()->Run( |
|
|
|
|
[self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); }, |
|
|
|
|
subchannel_cache_timer_handle_ = |
|
|
|
|
channel_control_helper()->GetEventEngine()->RunAfter( |
|
|
|
|
cached_subchannels_.begin()->first - Timestamp::Now(), |
|
|
|
|
[self = static_cast<RefCountedPtr<GrpcLb>>( |
|
|
|
|
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer"))]() mutable { |
|
|
|
|
ApplicationCallbackExecCtx callback_exec_ctx; |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
auto* self_ptr = self.get(); |
|
|
|
|
self_ptr->work_serializer()->Run( |
|
|
|
|
[self = std::move(self)]() mutable { |
|
|
|
|
self->OnSubchannelCacheTimerLocked(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) { |
|
|
|
|
if (subchannel_cache_timer_pending_ && error.ok()) { |
|
|
|
|
void GrpcLb::OnSubchannelCacheTimerLocked() { |
|
|
|
|
if (subchannel_cache_timer_handle_.has_value()) { |
|
|
|
|
subchannel_cache_timer_handle_.reset(); |
|
|
|
|
auto it = cached_subchannels_.begin(); |
|
|
|
|
if (it != cached_subchannels_.end()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { |
|
|
|
@ -1880,9 +1874,7 @@ void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) { |
|
|
|
|
StartSubchannelCacheTimerLocked(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
subchannel_cache_timer_pending_ = false; |
|
|
|
|
} |
|
|
|
|
Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|